Skip to content

Commit b438c4b

Browse files
committed
plug in user_assign
1 parent 71cd9e6 commit b438c4b

9 files changed

Lines changed: 193 additions & 52 deletions

File tree

src/backend/core/api/viewsets.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,7 @@ def start_room_recording(self, request, pk=None): # pylint: disable=unused-argu
346346
):
347347
try:
348348
MetadataCollectorService().start(recording)
349+
logger.info("Started MetadataCollectorService")
349350
except MetadataCollectorException:
350351
logger.warning("Failed to start MetadataCollectorService")
351352

src/backend/core/models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,6 @@ class Recording(BaseModel):
590590
verbose_name=_("Recording options"),
591591
help_text=_("Recording options"),
592592
)
593-
594593
class Meta:
595594
db_table = "meet_recording"
596595
ordering = ("-created_at",)

src/backend/core/recording/event/notification.py

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
"""Service to notify external services when a new recording is ready."""
22

33
import logging
4+
import os
45
import smtplib
6+
from datetime import datetime, timezone
57

8+
import requests
9+
from asgiref.sync import async_to_sync
610
from django.conf import settings
711
from django.core.mail import send_mail
812
from django.template.loader import render_to_string
913
from django.utils.translation import get_language, override
1014
from django.utils.translation import gettext_lazy as _
15+
from livekit import api as livekit_api
1116

12-
import requests
13-
14-
from core import models
17+
from core import models, utils
1518

1619
logger = logging.getLogger(__name__)
1720

@@ -130,6 +133,51 @@ def _notify_user_by_email(recording) -> bool:
130133

131134
return not has_failures
132135

136+
@staticmethod
137+
def _get_recording_timestamps(worker_id):
138+
"""Fetch FileInfo.started_at and ended_at from LiveKit's egress API.
139+
140+
started_at is more accurate than EgressInfo.started_at because it
141+
reflects when file recording actually began, not when the egress
142+
process was initialized.
143+
144+
Returns:
145+
Tuple of (started_at, ended_at) datetimes, either may be None.
146+
"""
147+
if not worker_id:
148+
return None, None
149+
150+
@async_to_sync
151+
async def _fetch():
152+
lkapi = utils.create_livekit_client()
153+
try:
154+
egress_list = await lkapi.egress.list_egress(
155+
livekit_api.ListEgressRequest(egress_id=worker_id)
156+
)
157+
if egress_list.items:
158+
file_results = egress_list.items[0].file_results
159+
if file_results:
160+
started = None
161+
ended = None
162+
if file_results[0].started_at:
163+
started = datetime.fromtimestamp(
164+
file_results[0].started_at / 1e9, tz=timezone.utc
165+
)
166+
if file_results[0].ended_at:
167+
ended = datetime.fromtimestamp(
168+
file_results[0].ended_at / 1e9, tz=timezone.utc
169+
)
170+
return started, ended
171+
except Exception:
172+
logger.exception(
173+
"Could not fetch egress info for worker %s", worker_id
174+
)
175+
finally:
176+
await lkapi.aclose()
177+
return None, None
178+
179+
return _fetch()
180+
133181
@staticmethod
134182
def _notify_summary_service(recording):
135183
"""Notify summary service about a new recording."""
@@ -150,24 +198,36 @@ def _notify_summary_service(recording):
150198
.first()
151199
)
152200

201+
# TODO: change how we get metadata_filename
202+
output_folder = os.getenv("AWS_S3_OUTPUT_FOLDER", "metadata")
203+
metadata_filename = f"{output_folder}/{recording.id}-metadata.json"
204+
153205
if not owner_access:
154206
logger.error("No owner found for recording %s", recording.id)
155207
return False
208+
209+
started_at, ended_at = NotificationService._get_recording_timestamps(
210+
recording.worker_id
211+
)
212+
156213
payload = {
157214
"owner_id": str(owner_access.user.id),
158-
"filename": recording.key,
215+
"recording_filename": recording.key,
216+
"metadata_filename": metadata_filename,
159217
"email": owner_access.user.email,
160218
"sub": owner_access.user.sub,
161219
"room": recording.room.name,
162220
"language": recording.options.get("language"),
163-
"recording_date": recording.created_at.astimezone(
164-
owner_access.user.timezone
165-
).strftime("%Y-%m-%d"),
166-
"recording_time": recording.created_at.astimezone(
167-
owner_access.user.timezone
168-
).strftime("%H:%M"),
221+
"worker_id": recording.worker_id,
222+
"owner_timezone": str(owner_access.user.timezone),
169223
"download_link": f"{get_recording_download_base_url()}/{recording.id}",
170224
"context_language": owner_access.user.language,
225+
"recording_started_at": (
226+
started_at.isoformat() if started_at else None
227+
),
228+
"recording_ended_at": (
229+
ended_at.isoformat() if ended_at else None
230+
),
171231
}
172232

173233
headers = {

src/summary/summary/api/route/tasks.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,19 @@ class TranscribeSummarizeTaskCreation(BaseModel):
1919
"""Transcription and summarization parameters."""
2020

2121
owner_id: str
22-
filename: str
22+
recording_filename: str
23+
metadata_filename: str
2324
email: str
2425
sub: str
2526
version: Optional[int] = 2
2627
room: Optional[str]
27-
recording_date: Optional[str]
28-
recording_time: Optional[str]
28+
worker_id: Optional[str]
29+
owner_timezone: Optional[str]
2930
language: Optional[str]
3031
download_link: Optional[str]
3132
context_language: Optional[str] = None
33+
recording_started_at: Optional[str] = None
34+
recording_ended_at: Optional[str] = None
3235

3336
@field_validator("language")
3437
@classmethod
@@ -51,16 +54,19 @@ async def create_transcribe_summarize_task(request: TranscribeSummarizeTaskCreat
5154
task = process_audio_transcribe_summarize_v2.apply_async(
5255
args=[
5356
request.owner_id,
54-
request.filename,
57+
request.recording_filename,
58+
request.metadata_filename,
5559
request.email,
5660
request.sub,
5761
time.time(),
5862
request.room,
59-
request.recording_date,
60-
request.recording_time,
63+
request.worker_id,
64+
request.owner_timezone,
6165
request.language,
6266
request.download_link,
6367
request.context_language,
68+
request.recording_started_at,
69+
request.recording_ended_at,
6470
],
6571
queue=settings.transcribe_queue,
6672
)

src/summary/summary/core/analytics.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ def create(self, task_id, task_args):
112112
if self._is_disabled or self.has_task_id(task_id):
113113
return
114114

115-
_, filename, email, _, received_at, *_ = task_args
115+
# Positional args mirror process_audio_transcribe_summarize_v2 signature:
116+
# owner_id, recording_filename, metadata_filename, email, sub, received_at, ...
117+
_, filename, _, email, _, received_at, *_ = task_args
116118

117119
start_time = time.time()
118120
initial_metadata = {

src/summary/summary/core/celery_worker.py

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import json
66
import time
7+
from datetime import datetime
78
from typing import Optional
89

910
import openai
@@ -28,6 +29,7 @@
2829
PROMPT_USER_PART,
2930
)
3031
from summary.core.transcript_formatter import TranscriptFormatter
32+
from summary.core.user_assign import assign_speakers
3133
from summary.core.webhook_service import submit_content
3234

3335
settings = get_settings()
@@ -58,7 +60,7 @@ def init_sentry(**_kwargs):
5860
file_service = FileService()
5961

6062

61-
def transcribe_audio(task_id, filename, language):
63+
def transcribe_audio(task_id, recording_filename, language):
6264
"""Transcribe an audio file using WhisperX.
6365
6466
Downloads the audio from MinIO, sends it to WhisperX for transcription,
@@ -75,9 +77,13 @@ def transcribe_audio(task_id, filename, language):
7577

7678
# Transcription
7779
try:
78-
with file_service.prepare_audio_file(filename) as (audio_file, metadata):
80+
with file_service.prepare_audio_file(recording_filename) as (
81+
audio_file,
82+
metadata,
83+
):
7984
metadata_manager.track(task_id, {"audio_length": metadata["duration"]})
8085

86+
# Compute language parameter
8187
if language is None:
8288
language = settings.whisperx_default_language
8389
logger.info(
@@ -90,22 +96,25 @@ def transcribe_audio(task_id, filename, language):
9096
language,
9197
)
9298

99+
# Call remote service for transcription
93100
transcription_start_time = time.time()
94-
95101
transcription = whisperx_client.audio.transcriptions.create(
96102
model=settings.whisperx_asr_model, file=audio_file, language=language
97103
)
98104

99-
transcription_time = round(time.time() - transcription_start_time, 2)
105+
# Logging
106+
transcription_duration = round(time.time() - transcription_start_time, 2)
100107
metadata_manager.track(
101108
task_id,
102-
{"transcription_time": transcription_time},
109+
{"transcription_time": transcription_duration},
110+
)
111+
logger.info(
112+
"Transcription received in %.2f seconds.", transcription_duration
103113
)
104-
logger.info("Transcription received in %.2f seconds.", transcription_time)
105114
logger.debug("Transcription: \n %s", transcription)
106115

107116
except FileServiceException:
108-
logger.exception("Unexpected error for filename: %s", filename)
117+
logger.exception("Unexpected error for recording: %s", recording_filename)
109118
return None
110119

111120
metadata_manager.track_transcription_metadata(task_id, transcription)
@@ -117,8 +126,8 @@ def format_transcript(
117126
context_language,
118127
language,
119128
room,
120-
recording_date,
121-
recording_time,
129+
recording_datetime,
130+
owner_timezone,
122131
download_link,
123132
):
124133
"""Format a transcription into readable content with a title.
@@ -134,8 +143,8 @@ def format_transcript(
134143
return formatter.format(
135144
transcription,
136145
room=room,
137-
recording_date=recording_date,
138-
recording_time=recording_time,
146+
recording_datetime=recording_datetime,
147+
owner_timezone=owner_timezone,
139148
download_link=download_link,
140149
)
141150

@@ -167,16 +176,19 @@ def format_actions(llm_output: dict) -> str:
167176
def process_audio_transcribe_summarize_v2(
168177
self,
169178
owner_id: str,
170-
filename: str,
179+
recording_filename: str,
180+
metadata_filename: str,
171181
email: str,
172182
sub: str,
173183
received_at: float,
174184
room: Optional[str],
175-
recording_date: Optional[str],
176-
recording_time: Optional[str],
185+
worker_id: Optional[str],
186+
owner_timezone: Optional[str],
177187
language: Optional[str],
178188
download_link: Optional[str],
179189
context_language: Optional[str] = None,
190+
recording_started_at: Optional[str] = None,
191+
recording_ended_at: Optional[str] = None,
180192
):
181193
"""Process an audio file by transcribing it and generating a summary.
182194
@@ -189,16 +201,21 @@ def process_audio_transcribe_summarize_v2(
189201
Args:
190202
self: Celery task instance (passed on with bind=True)
191203
owner_id: Unique identifier of the recording owner.
192-
filename: Name of the audio file in MinIO storage.
204+
recording_filename: Name of the audio file in MinIO storage.
205+
metadata_filename: Name of the audio file in MinIO storage.
193206
email: Email address of the recording owner.
194207
sub: OIDC subject identifier of the recording owner.
195208
received_at: Unix timestamp when the recording was received.
196209
room: room name where the recording took place.
197-
recording_date: Date of the recording (localized display string).
198-
recording_time: Time of the recording (localized display string).
210+
worker_id: LiveKit egress ID used to fetch the egress JSON from S3.
211+
owner_timezone: IANA timezone of the recording owner (e.g. "Europe/Paris").
199212
language: ISO 639-1 language code for transcription.
200213
download_link: URL to download the original recording.
201214
context_language: ISO 639-1 language code of the meeting summary context text.
215+
recording_started_at: ISO 8601 timestamp of when file recording actually started
216+
(from LiveKit FileInfo.started_at via the egress_ended webhook).
217+
recording_ended_at: ISO 8601 timestamp of when file recording ended
218+
(from LiveKit FileInfo.ended_at via the egress_ended webhook).
202219
"""
203220
logger.info(
204221
"Notification received | Owner: %s | Room: %s",
@@ -208,17 +225,44 @@ def process_audio_transcribe_summarize_v2(
208225

209226
task_id = self.request.id
210227

211-
transcription = transcribe_audio(task_id, filename, language)
228+
# Transcribe the audio
229+
transcription = transcribe_audio(task_id, recording_filename, language)
212230
if transcription is None:
213231
return
214232

233+
# Use recording_started_at / recording_ended_at from the backend
234+
# (sourced from LiveKit FileInfo via the egress_ended webhook).
235+
recording_start_dt = None
236+
recording_end_dt = None
237+
if recording_start_dt:
238+
recording_start_dt = datetime.fromisoformat(recording_started_at)
239+
if recording_ended_at:
240+
recording_end_dt = datetime.fromisoformat(recording_ended_at)
241+
242+
# Assign user names from metadata and transcription
243+
try:
244+
metadata = file_service.read_json(metadata_filename)
245+
except Exception:
246+
logger.exception("Failed to read metadata file: %s", metadata_filename)
247+
metadata = None
248+
if metadata and recording_start_dt is not None and recording_end_dt is not None:
249+
assignment_result = assign_speakers(
250+
metadata,
251+
transcription,
252+
recording_start_dt,
253+
recording_end_dt,
254+
overlap_threshold=0.5,
255+
)
256+
transcription = assignment_result.apply({"segments": transcription.segments})
257+
258+
# Format output
215259
content, title = format_transcript(
216260
transcription,
217261
context_language,
218262
language,
219263
room,
220-
recording_date,
221-
recording_time,
264+
recording_started_at,
265+
owner_timezone,
222266
download_link,
223267
)
224268

0 commit comments

Comments
 (0)