|
1 | 1 | """Service to notify external services when a new recording is ready.""" |
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | import logging |
4 | 5 | import os |
5 | 6 | import smtplib |
@@ -148,32 +149,38 @@ def _get_recording_timestamps(worker_id): |
148 | 149 | if not worker_id: |
149 | 150 | return None, None |
150 | 151 |
|
| 152 | + def _ns_to_utc(ns): |
| 153 | + return datetime.fromtimestamp(ns / 1e9, tz=timezone.utc) if ns else None |
| 154 | + |
151 | 155 | @async_to_sync |
152 | 156 | async def _fetch(): |
153 | 157 | lkapi = utils.create_livekit_client() |
154 | 158 | try: |
155 | | - egress_list = await lkapi.egress.list_egress( |
156 | | - livekit_api.ListEgressRequest(egress_id=worker_id) |
| 159 | + egress_list = await asyncio.wait_for( |
| 160 | + lkapi.egress.list_egress( |
| 161 | + livekit_api.ListEgressRequest(egress_id=worker_id) |
| 162 | + ), |
| 163 | + timeout=10, |
157 | 164 | ) |
158 | | - if egress_list.items: |
159 | | - file_results = egress_list.items[0].file_results |
160 | | - if file_results: |
161 | | - started_at = None |
162 | | - ended_at = None |
163 | | - if file_results[0].started_at: |
164 | | - started_at = datetime.fromtimestamp( |
165 | | - file_results[0].started_at / 1e9, tz=timezone.utc |
166 | | - ) |
167 | | - if file_results[0].ended_at: |
168 | | - ended_at = datetime.fromtimestamp( |
169 | | - file_results[0].ended_at / 1e9, tz=timezone.utc |
170 | | - ) |
171 | | - return started_at, ended_at |
172 | | - except Exception: |
| 165 | + except (livekit_api.TwirpError, OSError, asyncio.TimeoutError): |
173 | 166 | logger.exception("Could not fetch egress info for worker %s", worker_id) |
174 | 167 | finally: |
175 | 168 | await lkapi.aclose() |
176 | | - return None, None |
| 169 | + |
| 170 | + if not egress_list.items or not egress_list.items[0].file_results: |
| 171 | + logger.debug("No file_results for worker %s", worker_id) |
| 172 | + return None, None |
| 173 | + |
| 174 | + # If information exists, extract started_at, ended_at |
| 175 | + file_results = egress_list.items[0].file_results |
| 176 | + started_at = None |
| 177 | + ended_at = None |
| 178 | + if file_results[0].started_at: |
| 179 | + started_at = _ns_to_utc(file_results[0].started_at) |
| 180 | + if file_results[0].ended_at: |
| 181 | + ended_at = _ns_to_utc(file_results[0].ended_at) |
| 182 | + |
| 183 | + return started_at, ended_at |
177 | 184 |
|
178 | 185 | return _fetch() |
179 | 186 |
|
@@ -209,8 +216,6 @@ def _notify_summary_service(recording): |
209 | 216 | recording.worker_id |
210 | 217 | ) |
211 | 218 |
|
212 | | - logger.debug("test test %s , %s", started_at, ended_at) |
213 | | - |
214 | 219 | payload = { |
215 | 220 | "owner_id": str(owner_access.user.id), |
216 | 221 | "recording_filename": recording.key, |
|
0 commit comments