|
3 | 3 |
|
4 | 4 | import aiohttp |
5 | 5 |
|
6 | | -from eyepop.compute.context import ComputeContext |
| 6 | +from eyepop.compute.context import ComputeContext, PipelineStatus |
| 7 | +from eyepop.compute.responses import ComputeApiSessionResponse |
7 | 8 | from eyepop.exceptions import ComputeHealthCheckException |
8 | 9 |
|
9 | 10 | log = logging.getLogger("eyepop.compute") |
10 | 11 |
|
| 12 | +_TERMINAL_STATES = {PipelineStatus.FAILED, PipelineStatus.ERROR, PipelineStatus.STOPPED} |
| 13 | + |
11 | 14 |
|
12 | 15 | async def wait_for_session( |
13 | 16 | compute_config: ComputeContext, client_session: aiohttp.ClientSession |
@@ -39,25 +42,40 @@ async def wait_for_session( |
39 | 42 | attempt += 1 |
40 | 43 | try: |
41 | 44 | async with client_session.get(health_url, headers=headers) as response: |
42 | | - log.debug(f"GET /health - status: {response.status} (attempt {attempt})") |
| 45 | + if response.status != 200: |
| 46 | + last_message = f"HTTP {response.status}" |
| 47 | + log.debug(f"GET /health - status: {response.status} (attempt {attempt})") |
| 48 | + await asyncio.sleep(interval) |
| 49 | + continue |
| 50 | + |
| 51 | + session_response = ComputeApiSessionResponse(**(await response.json())) |
| 52 | + status = session_response.session_status |
| 53 | + log.debug(f"GET /health - status: 200, pipeline: {status.value} (attempt {attempt})") |
43 | 54 |
|
44 | | - if response.status == 200: |
| 55 | + if status == PipelineStatus.RUNNING: |
45 | 56 | return True |
46 | 57 |
|
47 | | - last_message = f"Health check returned status {response.status}" |
| 58 | + if status in _TERMINAL_STATES: |
| 59 | + raise ComputeHealthCheckException( |
| 60 | + f"Session in terminal state: {status.value}. " |
| 61 | + f"Message: {session_response.session_message}", |
| 62 | + session_endpoint=compute_config.session_endpoint, |
| 63 | + last_status=status.value, |
| 64 | + ) |
| 65 | + |
| 66 | + last_message = f"Pipeline status: {status.value}" |
48 | 67 | await asyncio.sleep(interval) |
49 | | - continue |
50 | 68 |
|
51 | 69 | except ComputeHealthCheckException: |
52 | 70 | raise |
53 | 71 | except aiohttp.ClientResponseError as e: |
54 | 72 | last_message = f"HTTP {e.status}: {e.message}" |
55 | 73 | log.debug(f"GET /health - error: {last_message} (attempt {attempt})") |
| 74 | + await asyncio.sleep(interval) |
56 | 75 | except Exception as e: |
57 | 76 | last_message = str(e) |
58 | 77 | log.debug(f"GET /health - error: {last_message} (attempt {attempt})") |
59 | | - |
60 | | - await asyncio.sleep(interval) |
| 78 | + await asyncio.sleep(interval) |
61 | 79 |
|
62 | 80 | log.error(f"Session timed out after {timeout}s. Last message: {last_message}") |
63 | 81 | raise TimeoutError(f"Session timed out after {timeout}s. Last message: {last_message}") |
0 commit comments