Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/identity/azure-identity/HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
### 1.1.0b1 Unreleased
- Constructing `DefaultAzureCredential` no longer raises `ImportError` on Python
3.8 on Windows ([8294](https://github.com/Azure/azure-sdk-for-python/pull/8294))
- `InteractiveBrowserCredential` raises when unable to open a web browser
([8465](https://github.com/Azure/azure-sdk-for-python/pull/8465))
- `InteractiveBrowserCredential` prompts for account selection
([8470](https://github.com/Azure/azure-sdk-for-python/pull/8470))
- The credentials composing `DefaultAzureCredential` are configurable by keyword
arguments ([8514](https://github.com/Azure/azure-sdk-for-python/pull/8514))


### 2019-11-05 1.0.1
Expand Down
38 changes: 30 additions & 8 deletions sdk/identity/azure-identity/azure/identity/_credentials/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from azure.core.exceptions import ClientAuthenticationError

from .._constants import EnvironmentVariables
from .._constants import EnvironmentVariables, KnownAuthorities
from .browser import InteractiveBrowserCredential
from .chained import ChainedTokenCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
Expand All @@ -29,25 +30,46 @@ class DefaultAzureCredential(ChainedTokenCredential):
identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select
which identity to use. See :class:`~azure.identity.SharedTokenCacheCredential` for more details.

This default behavior is configurable with keyword arguments.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
:keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment
variables from the credential. Defaults to **False**.
:keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential.
Defaults to **False**.
:keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to
**False**.
:keyword bool exclude_interactive_browser_credential: Whether to exclude interactive browser authentication (see
:class:`~azure.identity.InteractiveBrowserCredential`). Defaults to **True**.
"""

def __init__(self, **kwargs):
authority = kwargs.pop("authority", None)
credentials = [EnvironmentCredential(authority=authority, **kwargs), ManagedIdentityCredential(**kwargs)]
authority = kwargs.pop("authority", KnownAuthorities.AZURE_PUBLIC_CLOUD)

username = kwargs.pop("username", os.environ.get(EnvironmentVariables.AZURE_USERNAME))

exclude_environment_credential = kwargs.pop("exclude_environment_credential", False)
exclude_managed_identity_credential = kwargs.pop("exclude_managed_identity_credential", False)
exclude_shared_token_cache_credential = kwargs.pop("exclude_shared_token_cache_credential", False)
exclude_interactive_browser_credential = kwargs.pop("exclude_interactive_browser_credential", True)

# SharedTokenCacheCredential is part of the default only on supported platforms.
if SharedTokenCacheCredential.supported():
credentials = []
if not exclude_environment_credential:
credentials.append(EnvironmentCredential(authority=authority, **kwargs))
if not exclude_managed_identity_credential:
credentials.append(ManagedIdentityCredential(**kwargs))
if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported():
try:
# username is only required to disambiguate, when the cache contains tokens for multiple identities
username = os.environ.get(EnvironmentVariables.AZURE_USERNAME)
shared_cache = SharedTokenCacheCredential(username=username, authority=authority, **kwargs)
credentials.append(shared_cache)
except Exception as ex: # pylint:disable=broad-except
# transitive dependency pywin32 doesn't support 3.8 (https://github.com/mhammond/pywin32/issues/1431)
_LOGGER.info("Shared token cache is unavailable: '%s'", ex)
if not exclude_interactive_browser_credential:
credentials.append(InteractiveBrowserCredential())

super(DefaultAzureCredential, self).__init__(*credentials)

Expand All @@ -56,7 +78,7 @@ def get_token(self, *scopes, **kwargs):
return super(DefaultAzureCredential, self).get_token(*scopes, **kwargs)
except ClientAuthenticationError as e:
raise ClientAuthenticationError(message="""
{}\n\nPlease visit the Azure identity Python SDK docs at
https://aka.ms/python-sdk-identity#defaultazurecredential
{}\n\nPlease visit the Azure identity Python SDK docs at
https://aka.ms/python-sdk-identity#defaultazurecredential
to learn what options DefaultAzureCredential supports"""
.format(e.message))
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import os

from ..._constants import EnvironmentVariables
from ..._constants import EnvironmentVariables, KnownAuthorities
from .chained import ChainedTokenCredential
from .environment import EnvironmentCredential
from .managed_identity import ManagedIdentityCredential
Expand All @@ -27,22 +27,36 @@ class DefaultAzureCredential(ChainedTokenCredential):
identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select
which identity to use. See :class:`~azure.identity.aio.SharedTokenCacheCredential` for more details.

This default behavior is configurable with keyword arguments.

:keyword str authority: Authority of an Azure Active Directory endpoint, for example 'login.microsoftonline.com',
the authority for Azure Public Cloud (which is the default). :class:`~azure.identity.KnownAuthorities`
defines authorities for other clouds. Managed identities ignore this because they reside in a single cloud.
:keyword bool exclude_environment_credential: Whether to exclude a service principal configured by environment
variables from the credential. Defaults to **False**.
:keyword bool exclude_managed_identity_credential: Whether to exclude managed identity from the credential.
Defaults to **False**.
:keyword bool exclude_shared_token_cache_credential: Whether to exclude the shared token cache. Defaults to
**False**.
"""

def __init__(self, **kwargs):
authority = kwargs.pop("authority", None)
credentials = [EnvironmentCredential(authority=authority, **kwargs), ManagedIdentityCredential(**kwargs)]
authority = kwargs.pop("authority", KnownAuthorities.AZURE_PUBLIC_CLOUD)

username = kwargs.pop("username", os.environ.get(EnvironmentVariables.AZURE_USERNAME))

exclude_environment_credential = kwargs.pop("exclude_environment_credential", False)
exclude_managed_identity_credential = kwargs.pop("exclude_managed_identity_credential", False)
exclude_shared_token_cache_credential = kwargs.pop("exclude_shared_token_cache_credential", False)

# SharedTokenCacheCredential is part of the default only on supported platforms.
if SharedTokenCacheCredential.supported():
credentials = []
if not exclude_environment_credential:
credentials.append(EnvironmentCredential(authority=authority, **kwargs))
if not exclude_managed_identity_credential:
credentials.append(ManagedIdentityCredential(**kwargs))
if not exclude_shared_token_cache_credential and SharedTokenCacheCredential.supported():
try:
# username is only required to disambiguate, when the cache contains tokens for multiple identities
username = os.environ.get(EnvironmentVariables.AZURE_USERNAME)
shared_cache = SharedTokenCacheCredential(username=username, authority=authority, **kwargs)
credentials.append(shared_cache)
credentials.append(SharedTokenCacheCredential(username=username, authority=authority, **kwargs))
except Exception as ex: # pylint:disable=broad-except
# transitive dependency pywin32 doesn't support 3.8 (https://github.com/mhammond/pywin32/issues/1431)
_LOGGER.info("Shared token cache is unavailable: '%s'", ex)
Expand Down
54 changes: 48 additions & 6 deletions sdk/identity/azure-identity/tests/test_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@
# Licensed under the MIT License.
# ------------------------------------
from azure.core.credentials import AccessToken
from azure.identity import DefaultAzureCredential, KnownAuthorities, SharedTokenCacheCredential
from azure.identity import (
DefaultAzureCredential,
InteractiveBrowserCredential,
KnownAuthorities,
SharedTokenCacheCredential,
)
from azure.identity._constants import EnvironmentVariables
from azure.identity._credentials.managed_identity import ImdsCredential, MsiCredential
from six.moves.urllib_parse import urlparse

from helpers import mock_response
Expand Down Expand Up @@ -32,11 +38,12 @@ def test_default_credential_authority():

def exercise_credentials(authority_kwarg, expected_authority=None):
expected_authority = expected_authority or authority_kwarg

def send(request, **_):
scheme, netloc, path, _, _, _ = urlparse(request.url)
assert scheme == "https"
assert netloc == expected_authority
assert path.startswith("/" + tenant_id)
url = urlparse(request.url)
assert url.scheme == "https"
assert url.netloc == expected_authority
assert url.path.startswith("/" + tenant_id)
return response

# environment credential configured with client secret should respect authority
Expand All @@ -46,7 +53,7 @@ def send(request, **_):
EnvironmentVariables.AZURE_TENANT_ID: tenant_id,
}
with patch("os.environ", environment):
transport=Mock(send=send)
transport = Mock(send=send)
access_token, _ = DefaultAzureCredential(authority=authority_kwarg, transport=transport).get_token("scope")
assert access_token == expected_access_token

Expand All @@ -59,3 +66,38 @@ def send(request, **_):
# all credentials not representing managed identities should use a specified authority or default to public cloud
exercise_credentials("authority.com")
exercise_credentials(None, KnownAuthorities.AZURE_PUBLIC_CLOUD)


def test_exclude_options():
def assert_credentials_not_present(chain, *excluded_credential_classes):
actual = {c.__class__ for c in chain.credentials}
assert len(actual)

# no unexpected credential is in the chain
excluded = set(excluded_credential_classes)
assert len(actual & excluded) == 0

# only excluded credentials have been excluded from the default
default = {c.__class__ for c in DefaultAzureCredential().credentials}
assert actual <= default # n.b. we know actual is non-empty
assert default - actual <= excluded

# with no environment variables set, ManagedIdentityCredential = ImdsCredential
with patch("os.environ", {}):
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
assert_credentials_not_present(credential, ImdsCredential, MsiCredential)

# with $MSI_ENDPOINT set, ManagedIdentityCredential = MsiCredential
with patch("os.environ", {"MSI_ENDPOINT": "spam"}):
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
assert_credentials_not_present(credential, ImdsCredential, MsiCredential)

if SharedTokenCacheCredential.supported():
credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
assert_credentials_not_present(credential, SharedTokenCacheCredential)

# interactive auth is excluded by default
credential = DefaultAzureCredential(exclude_interactive_browser_credential=False)
actual = {c.__class__ for c in credential.credentials}
default = {c.__class__ for c in DefaultAzureCredential().credentials}
assert actual - default == {InteractiveBrowserCredential}
39 changes: 35 additions & 4 deletions sdk/identity/azure-identity/tests/test_default_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from azure.core.credentials import AccessToken
from azure.identity import KnownAuthorities
from azure.identity.aio import DefaultAzureCredential, SharedTokenCacheCredential
from azure.identity.aio._credentials.managed_identity import ImdsCredential, MsiCredential
from azure.identity._constants import EnvironmentVariables
import pytest

Expand All @@ -35,11 +36,12 @@ async def test_default_credential_authority():

async def exercise_credentials(authority_kwarg, expected_authority=None):
expected_authority = expected_authority or authority_kwarg

async def send(request, **_):
scheme, netloc, path, _, _, _ = urlparse(request.url)
assert scheme == "https"
assert netloc == expected_authority
assert path.startswith("/" + tenant_id)
url = urlparse(request.url)
assert url.scheme == "https"
assert url.netloc == expected_authority
assert url.path.startswith("/" + tenant_id)
return response

# environment credential configured with client secret should respect authority
Expand Down Expand Up @@ -70,3 +72,32 @@ async def send(request, **_):
# all credentials not representing managed identities should use a specified authority or default to public cloud
await exercise_credentials("authority.com")
await exercise_credentials(None, KnownAuthorities.AZURE_PUBLIC_CLOUD)


def test_exclude_options():
def assert_credentials_not_present(chain, *credential_classes):
actual = {c.__class__ for c in chain.credentials}
assert len(actual)

# no unexpected credential is in the chain
excluded = set(credential_classes)
Comment thread
KieranBrantnerMagee marked this conversation as resolved.
Outdated
assert len(actual & excluded) == 0

# only excluded credentials have been excluded from the default
default = {c.__class__ for c in DefaultAzureCredential().credentials}
assert actual <= default # n.b. we know actual is non-empty
assert default - actual <= excluded

# with no environment variables set, ManagedIdentityCredential = ImdsCredential
with patch("os.environ", {}):
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
assert_credentials_not_present(credential, ImdsCredential, MsiCredential)

# with $MSI_ENDPOINT set, ManagedIdentityCredential = MsiCredential
with patch("os.environ", {"MSI_ENDPOINT": "spam"}):
credential = DefaultAzureCredential(exclude_managed_identity_credential=True)
assert_credentials_not_present(credential, ImdsCredential, MsiCredential)

if SharedTokenCacheCredential.supported():
credential = DefaultAzureCredential(exclude_shared_token_cache_credential=True)
assert_credentials_not_present(credential, SharedTokenCacheCredential)