Skip to content

Commit ddbeac8

Browse files
committed
assign users
1 parent df24aaa commit ddbeac8

13 files changed

Lines changed: 1000 additions & 56 deletions

File tree

src/agents/metadata_collector.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,9 @@ async def on_chat_message_received(
156156
):
157157
"""Read a complete chat message and record it as an event."""
158158
full_text = await reader.read_all()
159-
logger.info("Received chat message from %s", participant_identity)
159+
logger.info(
160+
"Received chat message from %s: '%s'", participant_identity, full_text
161+
)
160162

161163
self.events.append(
162164
MetadataEvent(
@@ -177,7 +179,7 @@ def handle_chat_stream(self, reader, participant_identity):
177179

178180
def save(self):
179181
"""Serialize collected events and upload as JSON to S3."""
180-
logger.info("Persisting metadata")
182+
logger.info("Persisting metadata...")
181183

182184
participants = []
183185
for k, v in self.participants.items():
@@ -372,7 +374,7 @@ async def entrypoint(ctx: JobContext):
372374
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
373375

374376
async def cleanup():
375-
logger.info("Shutting down metadata collector")
377+
logger.info("Shutting down metadata collector...")
376378
await metadata_collector.aclose()
377379

378380
ctx.add_shutdown_callback(cleanup)

src/backend/core/api/viewsets.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ def start_room_recording(self, request, pk=None): # pylint: disable=unused-argu
347347
):
348348
try:
349349
MetadataCollectorService().start(recording)
350+
logger.info("Started MetadataCollectorService")
350351
except MetadataCollectorException:
351352
logger.warning("Failed to start MetadataCollectorService")
352353

src/backend/core/models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,6 @@ class Recording(BaseModel):
590590
verbose_name=_("Recording options"),
591591
help_text=_("Recording options"),
592592
)
593-
594593
class Meta:
595594
db_table = "meet_recording"
596595
ordering = ("-created_at",)

src/backend/core/recording/event/notification.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
"""Service to notify external services when a new recording is ready."""
22

33
import logging
4+
import os
45
import smtplib
6+
from datetime import datetime, timezone
57

68
from django.conf import settings
79
from django.core.mail import send_mail
@@ -10,8 +12,10 @@
1012
from django.utils.translation import gettext_lazy as _
1113

1214
import requests
15+
from asgiref.sync import async_to_sync
16+
from livekit import api as livekit_api
1317

14-
from core import models
18+
from core import models, utils
1519

1620
logger = logging.getLogger(__name__)
1721

@@ -130,6 +134,49 @@ def _notify_user_by_email(recording) -> bool:
130134

131135
return not has_failures
132136

137+
@staticmethod
138+
def _get_recording_timestamps(worker_id):
139+
"""Fetch FileInfo.started_at and ended_at from LiveKit's egress API.
140+
141+
FileInfo.started_at is more accurate than EgressInfo.started_at because it
142+
reflects when file recording actually began, not when the egress
143+
process was initialized.
144+
145+
Returns:
146+
Tuple of (started_at, ended_at) datetimes, either may be None.
147+
"""
148+
if not worker_id:
149+
return None, None
150+
151+
@async_to_sync
152+
async def _fetch():
153+
lkapi = utils.create_livekit_client()
154+
try:
155+
egress_list = await lkapi.egress.list_egress(
156+
livekit_api.ListEgressRequest(egress_id=worker_id)
157+
)
158+
if egress_list.items:
159+
file_results = egress_list.items[0].file_results
160+
if file_results:
161+
started_at = None
162+
ended_at = None
163+
if file_results[0].started_at:
164+
started_at = datetime.fromtimestamp(
165+
file_results[0].started_at / 1e9, tz=timezone.utc
166+
)
167+
if file_results[0].ended_at:
168+
ended_at = datetime.fromtimestamp(
169+
file_results[0].ended_at / 1e9, tz=timezone.utc
170+
)
171+
return started_at, ended_at
172+
except Exception:
173+
logger.exception("Could not fetch egress info for worker %s", worker_id)
174+
finally:
175+
await lkapi.aclose()
176+
return None, None
177+
178+
return _fetch()
179+
133180
@staticmethod
134181
def _notify_summary_service(recording):
135182
"""Notify summary service about a new recording."""
@@ -150,24 +197,32 @@ def _notify_summary_service(recording):
150197
.first()
151198
)
152199

200+
# TODO: change how we get metadata_filename
201+
output_folder = os.getenv("AWS_S3_OUTPUT_FOLDER", "metadata")
202+
metadata_filename = f"{output_folder}/{recording.id}-metadata.json"
203+
153204
if not owner_access:
154205
logger.error("No owner found for recording %s", recording.id)
155206
return False
207+
208+
started_at, ended_at = NotificationService._get_recording_timestamps(
209+
recording.worker_id
210+
)
211+
156212
payload = {
157213
"owner_id": str(owner_access.user.id),
158-
"filename": recording.key,
214+
"recording_filename": recording.key,
215+
"metadata_filename": metadata_filename,
159216
"email": owner_access.user.email,
160217
"sub": owner_access.user.sub,
161218
"room": recording.room.name,
162219
"language": recording.options.get("language"),
163-
"recording_date": recording.created_at.astimezone(
164-
owner_access.user.timezone
165-
).strftime("%Y-%m-%d"),
166-
"recording_time": recording.created_at.astimezone(
167-
owner_access.user.timezone
168-
).strftime("%H:%M"),
220+
"worker_id": recording.worker_id,
221+
"owner_timezone": str(owner_access.user.timezone),
169222
"download_link": f"{get_recording_download_base_url()}/{recording.id}",
170223
"context_language": owner_access.user.language,
224+
"recording_start_at": (started_at.isoformat() if started_at else None),
225+
"recording_end_at": (ended_at.isoformat() if ended_at else None),
171226
}
172227

173228
headers = {

src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,12 @@ agentSubtitles:
308308
{{- end }}
309309
{{- end }}
310310
ENABLE_SILERO_VAD: "false"
311+
AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000
312+
AWS_S3_ACCESS_KEY_ID: meet
313+
AWS_S3_SECRET_ACCESS_KEY: password
314+
AWS_S3_SECURE_ACCESS: False
315+
AWS_STORAGE_BUCKET_NAME: meet-media-storage
316+
AWS_S3_OUTPUT_FOLDER: recordings
311317

312318
image:
313319
repository: localhost:5001/meet-agents
@@ -317,6 +323,7 @@ agentSubtitles:
317323
command:
318324
- "python"
319325
- "multi_user_transcriber.py"
326+
# - "metadata_collector.py" # TODO: FIX THIS
320327
- "start"
321328

322329
# Extra volume mounts to manage our local custom CA and avoid to disable ssl

src/summary/summary/api/route/tasks.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@ class TranscribeSummarizeTaskCreation(BaseModel):
1919
"""Transcription and summarization parameters."""
2020

2121
owner_id: str
22-
filename: str
22+
recording_filename: str
23+
metadata_filename: str
2324
email: str
2425
sub: str
2526
version: Optional[int] = 2
2627
room: Optional[str]
27-
recording_date: Optional[str]
28-
recording_time: Optional[str]
28+
worker_id: Optional[str]
29+
owner_timezone: Optional[str]
2930
language: Optional[str]
3031
download_link: Optional[str]
3132
context_language: Optional[str] = None
33+
recording_start_at: Optional[str] = None
34+
recording_end_at: Optional[str] = None
3235

3336
@field_validator("language")
3437
@classmethod
@@ -51,16 +54,19 @@ async def create_transcribe_summarize_task(request: TranscribeSummarizeTaskCreat
5154
task = process_audio_transcribe_summarize_v2.apply_async(
5255
args=[
5356
request.owner_id,
54-
request.filename,
57+
request.recording_filename,
58+
request.metadata_filename,
5559
request.email,
5660
request.sub,
5761
time.time(),
5862
request.room,
59-
request.recording_date,
60-
request.recording_time,
63+
request.worker_id,
64+
request.owner_timezone,
6165
request.language,
6266
request.download_link,
6367
request.context_language,
68+
request.recording_start_at,
69+
request.recording_end_at,
6470
],
6571
queue=settings.transcribe_queue,
6672
)

src/summary/summary/core/analytics.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ def create(self, task_id, task_args):
112112
if self._is_disabled or self.has_task_id(task_id):
113113
return
114114

115-
_, filename, email, _, received_at, *_ = task_args
115+
# Positional args mirror process_audio_transcribe_summarize_v2 signature:
116+
# owner_id, recording_filename, metadata_filename, email, sub, received_at, ...
117+
_, filename, _, email, _, received_at, *_ = task_args
116118

117119
start_time = time.time()
118120
initial_metadata = {

0 commit comments

Comments
 (0)