-
-
Notifications
You must be signed in to change notification settings - Fork 251
Expand file tree
/
Copy pathclient.py
More file actions
136 lines (113 loc) · 4.7 KB
/
client.py
File metadata and controls
136 lines (113 loc) · 4.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import json
from datetime import datetime
import logging
import boto3
from cachetools import TTLCache
from requests import request
from sp_api.auth import AccessTokenClient, AccessTokenResponse
from .ApiResponse import ApiResponse
from .base_client import BaseClient
from .exceptions import get_exception_for_code, SellingApiBadRequestException
from .marketplaces import Marketplaces
from sp_api.base import AWSSigV4
log = logging.getLogger(__name__)
role_cache = TTLCache(maxsize=10, ttl=3600)
class Client(BaseClient):
boto3_client = None
grantless_scope = ''
def __init__(
self,
marketplace: Marketplaces = Marketplaces.US,
*,
refresh_token=None,
account='default',
credentials=None
):
super().__init__(account, credentials)
self.boto3_client = boto3.client(
'sts',
aws_access_key_id=self.credentials.aws_access_key,
aws_secret_access_key=self.credentials.aws_secret_key
)
self.endpoint = marketplace.endpoint
self.marketplace_id = marketplace.marketplace_id
self.region = marketplace.region
self._auth = AccessTokenClient(refresh_token=refresh_token, account=account, credentials=credentials)
def set_role(self):
role = self.boto3_client.assume_role(
RoleArn=self.credentials.role_arn,
RoleSessionName='guid'
)
role_cache['role'] = role
return role
@property
def headers(self):
return {
'host': self.endpoint[8:],
'user-agent': self.user_agent,
'x-amz-access-token': self.auth.access_token,
'x-amz-date': datetime.utcnow().strftime('%Y%m%dT%H%M%SZ'),
'content-type': 'application/json'
}
@property
def auth(self) -> AccessTokenResponse:
return self._auth.get_auth()
@property
def grantless_auth(self) -> AccessTokenResponse:
if not self.grantless_scope:
raise Exception("Grantless operations require scope")
return self._auth.get_grantless_auth(self.grantless_scope)
@property
def role(self):
try:
role = role_cache['role']
except KeyError:
role = self.set_role()
return role.get('Credentials')
def _sign_request(self):
role = self.role
return AWSSigV4('execute-api',
aws_access_key_id=role.get('AccessKeyId'),
aws_secret_access_key=role.get('SecretAccessKey'),
region=self.region,
aws_session_token=role.get('SessionToken')
)
def _request(self, path: str, *, data: dict = None, params: dict = None, headers=None,
add_marketplace=True) -> ApiResponse:
if params is None:
params = {}
if data is None:
data = {}
self.method = params.pop('method', data.pop('method', 'GET'))
if add_marketplace:
self._add_marketplaces(data if self.method in ('POST', 'PUT') else params)
res = request(self.method, self.endpoint + path, params=params,
data=json.dumps(data) if data and self.method in ('POST', 'PUT', 'PATCH') else None, headers=headers or self.headers,
auth=self._sign_request())
return self._check_response(res)
@staticmethod
def _check_response(res) -> ApiResponse:
error = res.json().get('errors', None)
if error:
exception = get_exception_for_code(res.status_code)
raise exception(error)
return ApiResponse(**res.json(), headers=res.headers)
def _add_marketplaces(self, data):
POST = ['marketplaceIds', 'MarketplaceIds']
GET = ['MarketplaceId', 'MarketplaceIds', 'marketplace_ids', 'marketplaceIds']
if self.method == 'POST':
if any(x in data.keys() for x in POST):
return
return data.update({k: self.marketplace_id if not k.endswith('s') else [self.marketplace_id] for k in POST})
if any(x in data.keys() for x in GET):
return
return data.update({k: self.marketplace_id if not k.endswith('s') else [self.marketplace_id] for k in GET})
def _request_grantless_operation(self, path: str, *, data: dict = None, params: dict = None):
headers = {
'host': self.endpoint[8:],
'user-agent': self.user_agent,
'x-amz-access-token': self.grantless_auth.access_token,
'x-amz-date': datetime.utcnow().strftime('%Y%m%dT%H%M%SZ'),
'content-type': 'application/json'
}
return self._request(path, data=data, params=params, headers=headers)