-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathstatus.py
More file actions
81 lines (64 loc) · 3.27 KB
/
status.py
File metadata and controls
81 lines (64 loc) · 3.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import asyncio
import logging
import aiohttp
from eyepop.compute.context import ComputeContext, PipelineStatus
from eyepop.compute.responses import ComputeApiSessionResponse
from eyepop.exceptions import ComputeHealthCheckException
log = logging.getLogger("eyepop.compute")
_TERMINAL_STATES = {PipelineStatus.FAILED, PipelineStatus.ERROR, PipelineStatus.STOPPED}
async def wait_for_session(
compute_config: ComputeContext, client_session: aiohttp.ClientSession
) -> bool:
timeout = compute_config.wait_for_session_timeout
interval = compute_config.wait_for_session_interval
if not compute_config.m2m_access_token or len(compute_config.m2m_access_token.strip()) == 0:
raise ComputeHealthCheckException(
"No access_token in compute_config. "
"Cannot perform session health check. "
"This should never happen - fetch_new_compute_session should have set it.",
session_endpoint=compute_config.session_endpoint,
)
headers = {
"Authorization": f"Bearer {compute_config.m2m_access_token}",
"Accept": "application/json",
}
health_url = f"{compute_config.session_endpoint}/health"
log.debug(f"Waiting for session ready: {health_url} (timeout={timeout}s, interval={interval}s)")
end_time = asyncio.get_event_loop().time() + timeout
last_message = "No message received"
attempt = 0
while asyncio.get_event_loop().time() < end_time:
attempt += 1
try:
async with client_session.get(health_url, headers=headers) as response:
if response.status != 200:
last_message = f"HTTP {response.status}"
log.debug(f"GET /health - status: {response.status} (attempt {attempt})")
await asyncio.sleep(interval)
continue
session_response = ComputeApiSessionResponse(**(await response.json()))
status = session_response.session_status
log.debug(f"GET /health - status: 200, pipeline: {status.value} (attempt {attempt})")
if status == PipelineStatus.RUNNING:
return True
if status in _TERMINAL_STATES:
raise ComputeHealthCheckException(
f"Session in terminal state: {status.value}. "
f"Message: {session_response.session_message}",
session_endpoint=compute_config.session_endpoint,
last_status=status.value,
)
last_message = f"Pipeline status: {status.value}"
await asyncio.sleep(interval)
except ComputeHealthCheckException:
raise
except aiohttp.ClientResponseError as e:
last_message = f"HTTP {e.status}: {e.message}"
log.debug(f"GET /health - error: {last_message} (attempt {attempt})")
await asyncio.sleep(interval)
except Exception as e:
last_message = str(e)
log.debug(f"GET /health - error: {last_message} (attempt {attempt})")
await asyncio.sleep(interval)
log.error(f"Session timed out after {timeout}s. Last message: {last_message}")
raise TimeoutError(f"Session timed out after {timeout}s. Last message: {last_message}")