-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapi.py
More file actions
208 lines (172 loc) · 7.87 KB
/
api.py
File metadata and controls
208 lines (172 loc) · 7.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
import logging
from typing import Any
import aiohttp
from pydantic import TypeAdapter
from eyepop.compute.context import ComputeContext
from eyepop.compute.responses import ComputeApiSessionResponse
from eyepop.compute.status import wait_for_session
from eyepop.exceptions import ComputeSessionException, ComputeTokenException
log = logging.getLogger("eyepop.compute")
async def fetch_session_endpoint(
compute_ctx: ComputeContext,
client_session: aiohttp.ClientSession,
permanent_session_uuid: str | None
) -> ComputeContext:
"""Fetch or create a compute API session, then poll until ready."""
if permanent_session_uuid is None:
compute_context = await fetch_new_compute_session(compute_ctx, client_session)
got_session = await wait_for_session(compute_context, client_session)
if got_session:
return compute_context
else:
return await fetch_permanent_compute_session(
compute_ctx=compute_ctx,
client_session=client_session,
permanent_session_uuid=permanent_session_uuid,
)
raise ComputeSessionException(
"Failed to fetch session endpoint", session_uuid=compute_context.session_uuid
)
async def fetch_new_compute_session(
compute_ctx: ComputeContext,
client_session: aiohttp.ClientSession
) -> ComputeContext:
headers = {
"Authorization": f"Bearer {compute_ctx.api_key}",
"Accept": "application/json",
}
sessions_url = f"{compute_ctx.compute_url}/v1/sessions"
res = None
create_reason = None
try:
async with client_session.get(sessions_url, headers=headers) as get_response:
log.debug(f"GET /v1/sessions - status: {get_response.status}")
if get_response.status == 404:
create_reason = "no sessions endpoint (404)"
else:
get_response.raise_for_status()
res = await get_response.json()
if not res:
create_reason = "empty response"
elif isinstance(res, list) and len(res) == 0:
create_reason = "no existing sessions"
elif isinstance(res, dict) and not res.get("session_uuid"):
create_reason = "response missing session_uuid"
except aiohttp.ClientResponseError as e:
if e.status == 404:
create_reason = "no sessions endpoint (404)"
else:
raise ComputeSessionException(
f"Failed to fetch existing sessions: {e.message}",
) from e
except Exception as e:
raise ComputeSessionException(f"Unexpected error fetching sessions: {str(e)}") from e
if create_reason:
log.debug(f"Creating new session: {create_reason}")
try:
body = {}
if compute_ctx.pipeline_image:
body["pipeline_image"] = compute_ctx.pipeline_image
if compute_ctx.pipeline_version:
body["pipeline_version"] = compute_ctx.pipeline_version
if compute_ctx.session_opts:
body.update(compute_ctx.session_opts)
post_headers = {**headers, **compute_ctx.session_headers} if compute_ctx.session_headers else headers
async with client_session.post(
f'{sessions_url}?wait=true',
headers=post_headers,
json=body if body else None,
) as post_response:
post_response.raise_for_status()
res = await post_response.json()
log.debug(f"POST /v1/sessions - status: {post_response.status}")
except aiohttp.ClientResponseError as e:
raise ComputeSessionException(
f"Failed to create new session: {e.message}",
) from e
except Exception as e:
raise ComputeSessionException(
f"No existing session and failed to create new one: {str(e)}"
) from e
if isinstance(res, list):
if len(res) > 1:
log.warning(f"Session response gave multiple {len(res)} items, using first one")
if len(res) > 0:
res = res[0]
else:
res = None
_compute_context_from_response(compute_ctx, res)
return compute_ctx
def _compute_context_from_response(compute_ctx: ComputeContext, res: dict | None | Any):
try:
session_response = TypeAdapter(ComputeApiSessionResponse).validate_python(res)
except Exception as e:
raise ComputeSessionException(f"Invalid session response format: {str(e)}") from e
compute_ctx.session_endpoint = session_response.session_endpoint
compute_ctx.session_uuid = session_response.session_uuid
compute_ctx.m2m_access_token = session_response.access_token
compute_ctx.access_token_expires_at = session_response.access_token_expires_at
compute_ctx.access_token_expires_in = session_response.access_token_expires_in
pipeline_id = ""
if len(session_response.pipelines) > 0:
pipeline_id = session_response.pipelines[0].get("id", None)
if not pipeline_id:
pipeline_id = session_response.pipelines[0].get("pipeline_id", "")
compute_ctx.pipeline_id = pipeline_id
if not session_response.access_token or len(session_response.access_token.strip()) == 0:
raise ComputeSessionException(
"No M2M access_token received from compute API session response. "
"M2M authentication is not configured properly.",
session_uuid=compute_ctx.session_uuid,
)
async def refresh_compute_token(
compute_ctx: ComputeContext, client_session: aiohttp.ClientSession
) -> ComputeContext:
if not compute_ctx.api_key:
raise ComputeTokenException(
"Cannot refresh token: no api_key in compute_ctx",
session_uuid=compute_ctx.session_uuid,
)
headers = {"Authorization": f"Bearer {compute_ctx.api_key}", "Accept": "application/json"}
refresh_url = f"{compute_ctx.compute_url}/v1/auth/authenticate"
try:
async with client_session.post(refresh_url, headers=headers) as response:
response.raise_for_status()
token_response = await response.json()
log.debug(f"POST /v1/auth/authenticate - status: {response.status}")
compute_ctx.m2m_access_token = token_response.get("access_token", "")
compute_ctx.access_token_expires_at = token_response.get("expires_at", "")
compute_ctx.access_token_expires_in = token_response.get("expires_in", 0)
return compute_ctx
except aiohttp.ClientResponseError as e:
raise ComputeTokenException(
f"Token refresh failed: HTTP {e.status} - {e.message}",
session_uuid=compute_ctx.session_uuid,
) from e
except Exception as e:
raise ComputeTokenException(
f"Token refresh failed: {str(e)}", session_uuid=compute_ctx.session_uuid
) from e
async def fetch_permanent_compute_session(
compute_ctx: ComputeContext,
client_session: aiohttp.ClientSession,
permanent_session_uuid: str
) -> ComputeContext:
headers = {
"Authorization": f"Bearer {compute_ctx.api_key}",
"Accept": "application/json",
}
session_url = f"{compute_ctx.compute_url}/v1/sessions/{permanent_session_uuid}"
try:
async with client_session.get(session_url, headers=headers) as get_response:
get_response.raise_for_status()
res = await get_response.json()
log.debug(f"GET /v1/sessions/{permanent_session_uuid} - status: {get_response.status}")
_compute_context_from_response(compute_ctx, res)
return compute_ctx
except aiohttp.ClientResponseError as e:
raise ComputeSessionException(
f"Failed to fetch existing sessions: {e.message}",
) from e
except Exception as e:
raise ComputeSessionException(f"Unexpected error fetching sessions: {str(e)}") from e