forked from Azure/azure-sdk-for-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathazd_cli.py
More file actions
245 lines (199 loc) · 9.95 KB
/
azd_cli.py
File metadata and controls
245 lines (199 loc) · 9.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
from datetime import datetime
import json
import os
import re
import shutil
import subprocess
import sys
from typing import Any, Dict, List, Optional
from azure.core.credentials import AccessToken
from azure.core.exceptions import ClientAuthenticationError
from .. import CredentialUnavailableError
from .._internal import resolve_tenant, within_dac
from .._internal.decorators import log_get_token
CLI_NOT_FOUND = (
"Azure Developer CLI could not be found. "
"Please visit https://aka.ms/azure-dev for installation instructions and then,"
"once installed, authenticate to your Azure account using 'azd auth login'."
)
COMMAND_LINE = "azd auth token --output json --scope {}"
EXECUTABLE_NAME = "azd"
NOT_LOGGED_IN = "Please run 'azd auth login' from a command prompt to authenticate before using this credential."
class AzureDeveloperCliCredential:
"""Authenticates by requesting a token from the Azure Developer CLI.
Azure Developer CLI is a command-line interface tool that allows developers to create, manage, and deploy
resources in Azure. It's built on top of the Azure CLI and provides additional functionality specific
to Azure developers. It allows users to authenticate as a user and/or a service principal against
`Azure Active Directory (Azure AD) <"https://learn.microsoft.com/azure/active-directory/fundamentals/">`__.
The AzureDeveloperCliCredential authenticates in a development environment and acquires a token on behalf of
the logged-in user or service principal in Azure Developer CLI. It acts as the Azure Developer CLI logged-in user
or service principal and executes an Azure CLI command underneath to authenticate the application against
Azure Active Directory.
To use this credential, the developer needs to authenticate locally in Azure Developer CLI using one of the
commands below:
* Run "azd auth login" in Azure Developer CLI to authenticate interactively as a user.
* Run "azd auth login --client-id 'client_id' --client-secret 'client_secret' --tenant-id 'tenant_id'"
to authenticate as a service principal.
You may need to repeat this process after a certain time period, depending on the refresh token validity in your
organization. Generally, the refresh token validity period is a few weeks to a few months.
AzureDeveloperCliCredential will prompt you to sign in again.
:keyword str tenant_id: Optional tenant to include in the token request.
:keyword List[str] additionally_allowed_tenants: Specifies tenants in addition to the specified "tenant_id"
for which the credential may acquire tokens. Add the wildcard value "*" to allow the credential to
acquire tokens for any tenant the application can access.
:keyword int process_timeout: Seconds to wait for the Azure Developer CLI process to respond. Defaults
to 10 seconds.
.. admonition:: Example:
.. literalinclude:: ../samples/credential_creation_code_snippets.py
:start-after: [START azure_developer_cli_credential]
:end-before: [END azure_developer_cli_credential]
:language: python
:dedent: 4
:caption: Create an AzureDeveloperCliCredential.
"""
def __init__(
self,
*,
tenant_id: str = "",
additionally_allowed_tenants: Optional[List[str]] = None,
process_timeout: int = 10,
) -> None:
self.tenant_id = tenant_id
self._additionally_allowed_tenants = additionally_allowed_tenants or []
self._process_timeout = process_timeout
def __enter__(self) -> "AzureDeveloperCliCredential":
return self
def __exit__(self, *args: Any) -> None:
pass
def close(self) -> None:
"""Calling this method is unnecessary."""
@log_get_token("AzureDeveloperCliCredential")
def get_token(
self,
*scopes: str,
claims: Optional[str] = None, # pylint:disable=unused-argument
tenant_id: Optional[str] = None,
**kwargs: Any,
) -> AccessToken:
"""Request an access token for `scopes`.
This method is called automatically by Azure SDK clients. Applications calling this method directly must
also handle token caching because this credential doesn't cache the tokens it acquires.
:param str scopes: desired scope for the access token. This credential allows only one scope per request.
For more information about scopes, see
https://learn.microsoft.com/azure/active-directory/develop/scopes-oidc.
:keyword str claims: not used by this credential; any value provided will be ignored.
:keyword str tenant_id: optional tenant to include in the token request.
:return: An access token with the desired scopes.
:rtype: ~azure.core.credentials.AccessToken
:raises ~azure.identity.CredentialUnavailableError: the credential was unable to invoke
the Azure Developer CLI.
:raises ~azure.core.exceptions.ClientAuthenticationError: the credential invoked
the Azure Developer CLI but didn't receive an access token.
"""
if not scopes:
raise ValueError("Missing scope in request. \n")
commandString = " --scope ".join(scopes)
command = COMMAND_LINE.format(commandString)
tenant = resolve_tenant(
default_tenant=self.tenant_id,
tenant_id=tenant_id,
additionally_allowed_tenants=self._additionally_allowed_tenants,
**kwargs,
)
if tenant:
command += " --tenant-id " + tenant
output = _run_command(command, self._process_timeout)
token = parse_token(output)
if not token:
sanitized_output = sanitize_output(output)
message = (
f"Unexpected output from Azure CLI: '{sanitized_output}'. \n"
f"To mitigate this issue, please refer to the troubleshooting guidelines here at "
f"https://aka.ms/azsdk/python/identity/azdevclicredential/troubleshoot."
)
if within_dac.get():
raise CredentialUnavailableError(message=message)
raise ClientAuthenticationError(message=message)
return token
def parse_token(output: str) -> Optional[AccessToken]:
"""Parse to an AccessToken.
In particular, convert the "expiresOn" value to epoch seconds. This value is a naive local datetime as returned by
datetime.fromtimestamp.
:param str output: The output of the Azure Developer CLI command.
:return: An AccessToken or None if the output isn't valid.
:rtype: azure.core.credentials.AccessToken or None
"""
try:
token = json.loads(output)
dt = datetime.strptime(token["expiresOn"], "%Y-%m-%dT%H:%M:%SZ")
expires_on = dt.timestamp()
return AccessToken(token["token"], int(expires_on))
except (KeyError, ValueError):
return None
def get_safe_working_dir() -> str:
"""Invoke 'azd' from a directory controlled by the OS, not the executing program's directory.
:return: The path to the directory.
:rtype: str
:raises ~azure.identity.CredentialUnavailableError: the SYSTEMROOT environment variable is not set.
"""
if sys.platform.startswith("win"):
path = os.environ.get("SYSTEMROOT")
if not path:
raise CredentialUnavailableError(
message="Azure Developer CLI credential" + " expects a 'SystemRoot' environment variable"
)
return path
return "/bin"
def sanitize_output(output: str) -> str:
"""Redact tokens from CLI output to prevent error messages revealing them.
:param str output: The output of the Azure Developer CLI command.
:return: The output with tokens redacted.
:rtype: str
"""
return re.sub(r"\"token\": \"(.*?)(\"|$)", "****", output)
def _run_command(command: str, timeout: int) -> str:
# Ensure executable exists in PATH first. This avoids a subprocess call that would fail anyway.
if shutil.which(EXECUTABLE_NAME) is None:
raise CredentialUnavailableError(message=CLI_NOT_FOUND)
if sys.platform.startswith("win"):
args = ["cmd", "/c", command]
else:
args = ["/bin/sh", "-c", command]
try:
working_directory = get_safe_working_dir()
kwargs: Dict[str, Any] = {
"stderr": subprocess.PIPE,
"stdin": subprocess.DEVNULL,
"cwd": working_directory,
"universal_newlines": True,
"env": dict(os.environ, NO_COLOR="true"),
"timeout": timeout,
}
return subprocess.check_output(args, **kwargs)
except subprocess.CalledProcessError as ex:
# non-zero return from shell
# Fallback check in case the executable is not found while executing subprocess.
if ex.returncode == 127 or ex.stderr.startswith("'azd' is not recognized"):
raise CredentialUnavailableError(message=CLI_NOT_FOUND) from ex
if "not logged in, run `azd auth login` to login" in ex.stderr:
raise CredentialUnavailableError(message=NOT_LOGGED_IN) from ex
# return code is from the CLI -> propagate its output
if ex.stderr:
message = sanitize_output(ex.stderr)
else:
message = "Failed to invoke Azure Developer CLI"
if within_dac.get():
raise CredentialUnavailableError(message=message) from ex
raise ClientAuthenticationError(message=message) from ex
except OSError as ex:
# failed to execute 'cmd' or '/bin/sh'
error = CredentialUnavailableError(message="Failed to execute '{}'".format(args[0]))
raise error from ex
except Exception as ex: # pylint:disable=broad-except
# could be a timeout, for example
error = CredentialUnavailableError(message="Failed to invoke the Azure Developer CLI")
raise error from ex