-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathendpoint.py
More file actions
344 lines (304 loc) · 13.8 KB
/
endpoint.py
File metadata and controls
344 lines (304 loc) · 13.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import asyncio
import logging
import time
from types import TracebackType
from typing import Any, Awaitable, Callable, Optional, Type
import aiohttp
from eyepop.client_session import ClientSession
from eyepop.metrics import MetricCollector
from eyepop.periodic import Periodic
from eyepop.request_tracer import RequestTracer
from eyepop.settings import settings
log = logging.getLogger('eyepop')
log_requests = logging.getLogger('eyepop.requests')
log_metrics = logging.getLogger('eyepop.metrics')
async def response_check_with_error_body(response: aiohttp.ClientResponse):
if not response.ok:
message = await response.text()
if message is None or len(message) == 0:
message = response.reason
raise aiohttp.ClientResponseError(
request_info=response.request_info, # type: ignore
history=response.history, # type: ignore
status=response.status,
message=message or "",
headers=response.headers,
)
class Endpoint(ClientSession):
"""Abstract EyePop Endpoint."""
secret_key: str | None
api_key: str | None
provided_access_token: str | None
eyepop_url: str
token: dict[str, Any] | None
expire_token_time: float | None
compute_ctx: Any | None
request_tracer: RequestTracer | None
event_sender: Periodic | None
retry_handlers: dict[int, Callable[[int, int], Awaitable[bool]]]
client_session: aiohttp.ClientSession | None
tasks: set[asyncio.Task]
sem: asyncio.Semaphore
metrics_collector: MetricCollector | None
def __init__(
self,
secret_key: str | None,
access_token: str | None,
eyepop_url: str,
job_queue_length: int,
request_tracer_max_buffer: int,
api_key: str | None = None,
session_uuid: str | None = None,
):
self.secret_key = secret_key
self.api_key = api_key
self.permanent_session_uuid = session_uuid
if access_token is not None and access_token.lower().startswith("Bearer "):
self.provided_access_token = access_token[len("Bearer "):]
else:
self.provided_access_token = access_token
self.eyepop_url = eyepop_url
self.token = None
self.expire_token_time = None
self.compute_ctx = None
if api_key is not None:
from eyepop.compute import ComputeContext
self.compute_ctx = ComputeContext(
compute_url=eyepop_url,
api_key=api_key
)
log.debug("Compute API will be used, session will be fetched in _reconnect()")
if request_tracer_max_buffer > 0:
self.request_tracer = RequestTracer(max_events=request_tracer_max_buffer)
self.event_sender = Periodic(self.send_trace_recordings, settings.send_trace_threshold_secs / 2)
else:
self.request_tracer = None
self.event_sender = None
self.retry_handlers = dict()
if self.secret_key is not None:
self.retry_handlers[401] = self._retry_401
elif self.compute_ctx is not None:
self.retry_handlers[401] = self._retry_401_compute
self.retry_handlers[500] = self._retry_50x
self.retry_handlers[502] = self._retry_50x
self.retry_handlers[503] = self._retry_50x
self.retry_handlers[504] = self._retry_50x
self.client_session = None
self.tasks = set()
self.sem = asyncio.Semaphore(job_queue_length)
if log_metrics.getEffectiveLevel() == logging.DEBUG:
self.metrics_collector = MetricCollector()
else:
self.metrics_collector = None
def add_retry_handler(self, status_code: int, handler: Callable[[int, int], Awaitable[bool]]):
self.retry_handlers[status_code] = handler
def __enter__(self) -> None:
"""Not implemented, use async with instead."""
raise TypeError("Use async with instead")
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType], ) -> None:
"""Not implemented, use async with instead."""
pass # pragma: no cover
async def __aenter__(self) -> "Endpoint":
"""Connect."""
try:
await self.connect()
except aiohttp.ClientError as e:
await self.disconnect()
raise e
return self
async def __aexit__(self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType], ) -> None:
"""Disconnect."""
await self.disconnect()
async def _authorization_header(self) -> str | None:
access_token = await self.__get_access_token()
if access_token is None:
return None
return f'Bearer {access_token}'
async def disconnect(self, timeout: float | None = None) -> None:
if timeout is None:
await self._disconnect()
await self._cleanup()
else:
try:
async with asyncio.timeout(timeout):
await self._disconnect(timeout)
await self._cleanup()
except asyncio.TimeoutError:
log_requests.info(f"timeout after {timeout} seconds in disconnect, ignored")
async def _cleanup(self) -> None:
tasks = list(self.tasks)
if len(tasks) > 0:
await asyncio.gather(*tasks)
if self.request_tracer and self.client_session and self.event_sender:
await self.event_sender.stop()
if self.compute_ctx is None:
await self.request_tracer.send_and_reset(f'{self.eyepop_url}/events', await self._authorization_header(),
None)
if self.client_session:
try:
await self.client_session.close()
except Exception as e:
log.exception("error at disconnect", e)
finally:
self.client_session = None
if self.metrics_collector:
log_metrics.debug('endpoint disconnected, collected session metrics:')
log_metrics.debug('total number of jobs: %d', self.metrics_collector.total_number_of_jobs)
log_metrics.debug(f'max concurrent number of jobs: {self.metrics_collector.max_number_of_jobs_by_state}')
log_metrics.debug(f'average wait time until state: {self.metrics_collector.get_average_times()}')
async def connect(self):
trace_configs = [self.request_tracer.get_trace_config()] if self.request_tracer else None
self.client_session = aiohttp.ClientSession(
raise_for_status=response_check_with_error_body,
trace_configs=trace_configs
)
assert self.client_session is not None
try:
await self._reconnect()
except Exception as e:
await self.client_session.close()
self.client_session = None
raise e
if self.event_sender is not None:
await self.event_sender.start()
async def session(self) -> dict | None:
token = await self.__get_access_token()
if token is None:
return None
return {
'eyepopUrl': self.eyepop_url, 'accessToken': token,
'validUntil': None if self.expire_token_time is None else self.expire_token_time * 1000
}
async def _reconnect(self):
raise NotImplementedError
async def _disconnect(self, timeout: float | None = None):
raise NotImplementedError
async def __get_access_token(self) -> str | None:
if self.provided_access_token is not None:
return self.provided_access_token
if self.compute_ctx is not None:
if self.compute_ctx.m2m_access_token:
return self.compute_ctx.m2m_access_token
else:
assert self.client_session is not None
authenticate_url = f'{self.compute_ctx.compute_url}/v1/auth/authenticate'
api_auth_header = {
'Authorization': f'Bearer {self.compute_ctx.api_key}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
async with self.client_session.post(authenticate_url, headers=api_auth_header) as response:
log.debug(f"POST /v1/auth/authenticate - status: {response.status}")
response_json = await response.json()
self.compute_ctx.m2m_access_token = response_json['access_token']
return self.compute_ctx.m2m_access_token
if self.secret_key is None:
return None
assert self.client_session is not None
now = time.time()
if self.token is None or self.expire_token_time is None or self.expire_token_time < now:
body = {'secret_key': self.secret_key}
post_url = f'{self.eyepop_url}/authentication/token'
log_requests.debug('before POST %s', post_url)
async with self.client_session.post(post_url, json=body) as response:
token = await response.json()
assert token is not None
self.token = token
self.expire_token_time = time.time() + token['expires_in'] - 60
assert self.token is not None
log_requests.debug('after POST %s expires_in=%d token_type=%s', post_url, self.token['expires_in'],
self.token['token_type'])
assert self.token is not None and self.expire_token_time is not None
log.debug('using access token, valid for at least %d seconds', self.expire_token_time - now)
return self.token['access_token']
def _task_done(self, task):
self.tasks.discard(task)
self.sem.release()
async def _task_start(self, coro):
await self.sem.acquire()
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self._task_done)
async def _retry_401(self, status_code: int, failed_attempts: int) -> bool:
if failed_attempts > 1:
return False
else:
log_requests.debug('retry handler: after 401, about to retry with fresh access token')
self.token = None
self.expire_token_time = None
return True
async def _retry_401_compute(self, status_code: int, failed_attempts: int) -> bool:
if failed_attempts > 1:
return False
else:
log_requests.debug('retry handler: after 401, about to refresh compute API token')
if self.compute_ctx is None:
log_requests.error('retry handler: compute_ctx is None, cannot refresh token')
return False
try:
from eyepop.compute.api import refresh_compute_token
assert self.client_session is not None
self.compute_ctx = await refresh_compute_token(self.compute_ctx, self.client_session)
log_requests.debug('retry handler: compute token refreshed successfully')
return True
except Exception as e:
log_requests.error(f'retry handler: failed to refresh compute token: {e}')
return False
async def _retry_50x(self, status_code: int, failed_attempts: int) -> bool:
if failed_attempts > 3:
return False
else:
wait_time = 2 ** (failed_attempts - 1)
log_requests.info('retry handler: after %d, about to retry after %f seconds',
status_code, wait_time)
await asyncio.sleep(wait_time)
return True
async def request_with_retry(
self,
method: str,
url: str,
accept: str | None = None,
data: Any = None,
content_type: str | None = None,
timeout: aiohttp.ClientTimeout | None = None,
extra_headers: dict[str, str] | None = None,
) -> aiohttp.ClientResponse:
assert self.client_session is not None
failed_attempts = 0
while True:
headers = {}
authorization_header = await self._authorization_header()
if authorization_header is not None:
headers['Authorization'] = authorization_header
if accept is not None:
headers['Accept'] = accept
if content_type is not None:
headers['Content-Type'] = content_type
if extra_headers is not None:
headers.update(extra_headers)
try:
log_requests.debug('before %s %s', method, url)
if isinstance(data, Callable):
data = data()
response = await self.client_session.request(method, url, headers=headers, data=data, timeout=timeout)
log_requests.debug('after %s %s', method, url)
return response
except aiohttp.ClientResponseError as e:
failed_attempts += 1
if e.status not in self.retry_handlers:
raise e
if not await self.retry_handlers[e.status](e.status, failed_attempts):
raise e
except aiohttp.ClientConnectionError as e:
failed_attempts += 1
if 404 not in self.retry_handlers:
raise e
if not await self.retry_handlers[404](404, failed_attempts):
raise e
async def send_trace_recordings(self):
if self.request_tracer is not None:
await self.request_tracer.send_and_reset(f'{self.eyepop_url}/events',
await self._authorization_header(),
settings.send_trace_threshold_secs)