forked from Azure/azure-sdk-for-python
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_client_secret_credential.py
More file actions
92 lines (73 loc) · 3.02 KB
/
test_client_secret_credential.py
File metadata and controls
92 lines (73 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# ------------------------------------
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# ------------------------------------
import time
from azure.core.credentials import AccessToken
from azure.core.pipeline.policies import ContentDecodePolicy, SansIOHTTPPolicy
from azure.identity import ClientSecretCredential
from helpers import build_aad_response, mock_response, Request, validating_transport
try:
from unittest.mock import Mock
except ImportError: # python < 3.3
from mock import Mock # type: ignore
def test_policies_configurable():
policy = Mock(spec_set=SansIOHTTPPolicy, on_request=Mock())
def send(*_, **__):
return mock_response(json_payload=build_aad_response(access_token="**"))
credential = ClientSecretCredential(
"tenant-id", "client-id", "client-secret", policies=[ContentDecodePolicy(), policy], transport=Mock(send=send)
)
credential.get_token("scope")
assert policy.on_request.called
def test_client_secret_credential():
client_id = "fake-client-id"
secret = "fake-client-secret"
tenant_id = "fake-tenant-id"
access_token = "***"
transport = validating_transport(
requests=[Request(url_substring=tenant_id, required_data={"client_id": client_id, "client_secret": secret})],
responses=[
mock_response(
json_payload={
"token_type": "Bearer",
"expires_in": 42,
"ext_expires_in": 42,
"access_token": access_token,
}
)
],
)
token = ClientSecretCredential(tenant_id, client_id, secret, transport=transport).get_token("scope")
# not validating expires_on because doing so requires monkeypatching time, and this is tested elsewhere
assert token.token == access_token
def test_cache():
expired = "this token's expired"
now = int(time.time())
expired_on = now - 3600
expired_token = AccessToken(expired, expired_on)
token_payload = {
"access_token": expired,
"expires_in": 0,
"ext_expires_in": 0,
"expires_on": expired_on,
"not_before": now,
"token_type": "Bearer",
}
mock_send = Mock(return_value=mock_response(json_payload=token_payload))
scope = "scope"
credential = ClientSecretCredential(
tenant_id="some-guid", client_id="client_id", client_secret="secret", transport=Mock(send=mock_send)
)
# get_token initially returns the expired token because the credential
# doesn't check whether tokens it receives from the service have expired
token = credential.get_token(scope)
assert token == expired_token
access_token = "new token"
token_payload["access_token"] = access_token
token_payload["expires_on"] = now + 3600
valid_token = AccessToken(access_token, now + 3600)
# second call should observe the cached token has expired, and request another
token = credential.get_token(scope)
assert token == valid_token
assert mock_send.call_count == 2