Conversation
7856de6 to
efa0d48
Compare
ce51ac6 to
b438c4b
Compare
| # TODO: change how we get metadata_filename | ||
| output_folder = os.getenv("AWS_S3_OUTPUT_FOLDER", "metadata") | ||
| metadata_filename = f"{output_folder}/{recording.id}-metadata.json" |
There was a problem hiding this comment.
What should we do with this ?
bf54fea to
80cca39
Compare
b90bee2 to
8e6e17d
Compare
|
| metadata_filename: str, | ||
| email: str, | ||
| sub: str, | ||
| received_at: float, |
There was a problem hiding this comment.
received_at previously and currently unused. Remove ?
|
NB: Changes in Payload make this a breaking change. |
lebaudantoine
left a comment
There was a problem hiding this comment.
Unfinished review
| ): | ||
| try: | ||
| MetadataCollectorService().start(recording) | ||
| logger.info("Started MetadataCollectorService") |
There was a problem hiding this comment.
Hum, I’m not sure we should introduce a logger at the info level just yet. It might be better to wait until we have a more consistent logging policy across the application. I assume you mainly added it for debugging purposes, to confirm that the backend was being triggered, right?
There was a problem hiding this comment.
my bad. setting it to debug
| def save(self): | ||
| """Serialize collected events and upload as JSON to S3.""" | ||
| logger.info("Persisting metadata…") | ||
| logger.info("Persisting metadata...") |
There was a problem hiding this comment.
probably unnecessary changes
| environ_name="METADATA_COLLECTOR_AGENT_NAME", | ||
| environ_prefix=None, | ||
| ) | ||
| METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER = values.Value( |
There was a problem hiding this comment.
| METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER = values.Value( | |
| METADATA_COLLECTOR_OUTPUT_FOLDER = values.Value( |
to match the naming pattern RECORDING_OUTPUT_FOLDER
| AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000 | ||
| AWS_S3_ACCESS_KEY_ID: meet | ||
| AWS_S3_SECRET_ACCESS_KEY: password | ||
| AWS_S3_SECURE_ACCESS: False | ||
| AWS_STORAGE_BUCKET_NAME: meet-media-storage | ||
| AWS_S3_OUTPUT_FOLDER: recordings |
There was a problem hiding this comment.
this env variable are not relevant with the context of this pr.
There was a problem hiding this comment.
not sure why this is here. removing
| filename: str | ||
| recording_filename: str | ||
| metadata_filename: str | ||
| email: str | ||
| sub: str | ||
| version: Optional[int] = 2 | ||
| room: Optional[str] | ||
| recording_date: Optional[str] | ||
| recording_time: Optional[str] | ||
| owner_timezone: Optional[str] | ||
| language: Optional[str] | ||
| download_link: Optional[str] | ||
| context_language: Optional[str] = None | ||
| recording_start_at: Optional[str] = None | ||
| recording_end_at: Optional[str] = None |
There was a problem hiding this comment.
I wonder whether this might have any impact on @FloChehab’s Task V2 endpoints.
| def _get_recording_timestamps(worker_id): | ||
| """Fetch FileInfo.started_at and ended_at from LiveKit's egress API. | ||
|
|
||
| FileInfo.started_at is more accurate than EgressInfo.started_at because it | ||
| reflects when file recording actually began, not when the egress | ||
| process was initialized. | ||
|
|
||
| Returns: | ||
| Tuple of (started_at, ended_at) datetimes, either may be None. | ||
| """ |
There was a problem hiding this comment.
Smart approach. I initially assumed you’d be reading it directly from the manifest file that LiveKit Egress saves alongside the recording output.
If you check the LiveKit Egress API documentation (https://docs.livekit.io/reference/other/egress/api/) and search for manifest, you’ll find details on how to enable or disable it, as well as how to configure its output location.
It’s also worth exploring how LiveKit Egress behaves in practice using DeepWiki (https://deepwiki.com/search/is-a-manifest-file-still-saved_2f341382-4aee-4f0c-a6d1-d655a52d3902?mode=fast). It’s a useful tool that parses open-source repositories and helps answer implementation-level questions, which can be handy for understanding how the manifest is actually generated and used.
There was a problem hiding this comment.
For future reference:
"Timestamps
There seems to be multiple timestamps we can use when trying to anchor and synchronize each track. The two that I've tried are:
- EngressInfo.started_at - when the egress started
- FileInfo.started_at - when the file recording started
EgressInfo isn't correct as an egress may have started, but a noticeable delay can happen until file writes. This is especially apparent when writing videos.
FileInfo.started_at is passable under good network conditions -- there are small synchronization deviations (100s of milliseconds or lower) but generally hard to detect. However, under bad network conditions, the composite file becomes out of sync.
"
|
|
||
| # Minimum fraction of a speaker's total duration that must overlap with a | ||
| # participant's VAD to accept the assignment. | ||
| DEFAULT_OVERLAP_THRESHOLD = 0.5 |
There was a problem hiding this comment.
we might need to be more confident, 0.5 is quite low, isn't it?
There was a problem hiding this comment.
It may be useful to keep it low in the event of short speech patterns (VAD and transcription timestamps are not well aligned)
| for key, value in diarization.items(): | ||
| if key in ("segments", "word_segments"): | ||
| new_items = [] | ||
| for item in value: | ||
| new_item = _replace_speaker(item) | ||
| if key == "segments" and "words" in item: | ||
| new_item["words"] = [_replace_speaker(w) for w in item["words"]] | ||
| new_items.append(new_item) | ||
| result[key] = new_items | ||
| else: | ||
| result[key] = value |
| @@ -0,0 +1,292 @@ | |||
| """Assign WhisperX diarization speakers to participant identities. | |||
There was a problem hiding this comment.
The code organization in this file could be clearer and easier to follow.
Consider restructuring the class so that its responsibilities are more explicit, and grouping related public and private methods into a well-defined service.
| speaker_duration = _total_duration(speaker_intervals) | ||
| if speaker_duration == 0: | ||
| result.unassigned_speakers.append(speaker) | ||
| continue |
There was a problem hiding this comment.
could this situation really happens?
There was a problem hiding this comment.
with previous version of whisperx (3.3.1) in meet-whisperx, numbers didn't have a start / stop, therefore this could have happened. Can leave it to decouple of whisperx version
| for key, value in diarization.items(): | ||
| if key in ("segments", "word_segments"): | ||
| new_items = [] | ||
| for item in value: | ||
| new_item = _replace_speaker(item) | ||
| if key == "segments" and "words" in item: | ||
| new_item["words"] = [_replace_speaker(w) for w in item["words"]] | ||
| new_items.append(new_item) | ||
| result[key] = new_items | ||
| else: | ||
| result[key] = value | ||
| return result |
There was a problem hiding this comment.
pseudo code suggestion
| for key, value in diarization.items(): | |
| if key in ("segments", "word_segments"): | |
| new_items = [] | |
| for item in value: | |
| new_item = _replace_speaker(item) | |
| if key == "segments" and "words" in item: | |
| new_item["words"] = [_replace_speaker(w) for w in item["words"]] | |
| new_items.append(new_item) | |
| result[key] = new_items | |
| else: | |
| result[key] = value | |
| return result | |
| def _process_segment(item, include_words=False): | |
| new_item = _replace_speaker(item) | |
| if include_words and "words" in item: | |
| new_item["words"] = [_replace_speaker(w) for w in item["words"]] | |
| return new_item | |
| ... | |
| result = {} | |
| for key, value in diarization.items(): | |
| if key not in ("segments", "word_segments"): | |
| result[key] = value | |
| continue | |
| result[key] = [_process_segment(item, include_words=(key == "segments")) for item in value] | |
| return result |
|
@coderabbitai full review |
✅ Actions performedFull review triggered. |
WalkthroughThis pull request implements speaker-to-participant assignment for audio transcription workflows. It refactors the recording metadata pipeline to exchange simple date/time fields for precise ISO-formatted start/end timestamps retrieved from LiveKit's egress API, adds new S3 configuration settings for metadata file storage, introduces a speaker diarization-to-user mapping module with overlap-based heuristic scoring, and updates the transcription API, Celery task definitions, and transcript formatting to propagate these new parameters throughout the system. Supporting logging, configuration, and comprehensive unit tests are also included. Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 11
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@CHANGELOG.md`:
- Around line 11-14: Update the changelog entry for "✨(summary) add
speaker-to-participant assignment" to explicitly mark it as a breaking change
and add a short migration note; under the same "### Added" or a new "###
Breaking Changes" section, append "BREAKING: speaker-to-participant assignment
changes payload format" and a 1–2 line migration instruction describing what to
rename/reshape in the API consumer payload (e.g., map old speakerId field to
participantId or adjust nested object structure), so consumers know how to adapt
when using the new assignment format.
In `@src/backend/core/recording/event/notification.py`:
- Around line 137-185: _get_recording_timestamps synchronously blocks the
request thread by calling LiveKit via async_to_sync/wait_for inside the
notification path (_notify_summary_service); move this I/O off the request path
by removing the inline LiveKit call and instead obtain started_at/ended_at
asynchronously inside the recording processing task (e.g., the Celery task that
finalizes recordings) or read them from the existing egress
manifest/egress_ended payload; specifically, remove or stub out the synchronous
call in _get_recording_timestamps (and any use of
async_to_sync/create_livekit_client/wait_for there) and change callers (such as
_notify_summary_service) to accept optional started_at/ended_at parameters or to
enqueue a task that will call the existing LiveKit client asynchronously within
the worker context where blocking is acceptable.
- Around line 207-208: Guard against a missing
METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER by checking
settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER before building
metadata_filename (where metadata_filename =
f"{output_folder}/{recording.id}-metadata.json"); if the setting is falsy,
either raise a configuration error (e.g., log and raise/abort) or omit the
metadata_filename field from the payload so the summary worker falls back to the
no-assignment path—update the code around metadata_filename and the surrounding
function/class handling the payload to implement this conditional behavior and
ensure recording.id is only used when output_folder is present.
In `@src/summary/summary/api/route/tasks.py`:
- Around line 22-33: The model/schema currently makes metadata_filename a
required field (metadata_filename: str) which forces all v2 callers to supply
it; change metadata_filename to Optional[str] = None and update the downstream
metadata-loading logic (the worker/code that reads
METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER and performs speaker assignment) to
simply skip loading/assigning speakers when metadata_filename is None or when
METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER is unset; ensure any code paths that
assumed metadata_filename exists (speaker assignment routines) handle the None
case gracefully and do not raise a 422 or attempt to read S3.
In `@src/summary/summary/core/celery_worker.py`:
- Around line 288-308: The current branch calls assignment_result.apply with
only {"segments","word_segments"}, which causes AssignmentResult.apply to return
an object missing other Transcription fields (language, text, etc.), so replace
that call to pass the full transcription data structure (the complete
transcription mapping/dump) to assignment_result.apply (use the existing
Transcription export/dump method or the full dict representation) and reassign
transcription to the apply result; ensure subsequent code accesses transcription
fields consistently (e.g., transcription.language, transcription.text) as before
so the object shape remains the same whether assign_speakers ran or was skipped.
- Around line 291-308: The current try/except only catches FileServiceException
so errors inside assign_speakers or while parsing metadata (e.g., KeyError,
ValueError, TypeError from datetime.fromisoformat or missing keys) will crash
the task; widen the exception handling around the block that calls
file_service.read_json and assign_speakers (identify the try block around
metadata = file_service.read_json(metadata_filename) and the call to
assign_speakers/assignment_result.apply) to also catch parsing/processing errors
— either add specific exceptions (KeyError, ValueError, TypeError) or catch
Exception — and in the except branch log the full exception via logger.exception
or include exc info in logger.error and continue by skipping speaker assignment
so the rest of the transcription/summarization proceeds.
In `@src/summary/summary/core/file_service.py`:
- Around line 232-250: In read_json, catch JSON decoding and invalid payload
types so malformed or non-object JSON doesn't leak: after calling
self._minio_client.get_object(self._bucket_name, object_name) and reading the
response, wrap json.loads in a try/except that catches json.JSONDecodeError (and
TypeError/ValueError if needed) and raise FileServiceException with a clear
message; also validate that the parsed value is a dict and raise
FileServiceException if not; keep the existing exception handling for
MinioException/S3Error and retain the finally block that closes/release_conn on
response.
In `@src/summary/summary/core/transcript_formatter.py`:
- Around line 100-118: The _generate_title method can raise
ZoneInfoNotFoundError when ZoneInfo(owner_timezone) is given an invalid IANA
name; wrap the ZoneInfo lookup/astimezone call in a try/except catching
ZoneInfoNotFoundError (import ZoneInfo and ZoneInfoNotFoundError from zoneinfo)
and on failure fall back to using the parsed datetime (or convert to UTC) rather
than aborting; specifically, keep the datetime.fromisoformat(...) and then
attempt dt = dt.astimezone(ZoneInfo(owner_timezone)) inside a try block and
leave dt unchanged (or dt = dt.astimezone(ZoneInfo("UTC"))) in the except
ZoneInfoNotFoundError block so the title generation uses a safe fallback.
In `@src/summary/summary/core/user_assign.py`:
- Around line 258-290: The current tie-breaking uses strict `if score >
best_score` which keeps the first iterated participant on ties; update the loop
that iterates `participant_timelines` (and calls `_overlap_duration`) to also
track the runner-up score and id (e.g., maintain `second_best_score: float =
0.0` and `second_best_pid: str | None = None`), updating them whenever you
replace or beat the best (on `score > best_score` set `second_best_* = best_*`
before assigning new best; on `elif score > second_best_score` update second
best), then after the loop but before creating `SpeakerAssignment` check a small
epsilon (e.g., eps = 1e-6) and if `best_score - second_best_score <= eps` emit a
`logger.warning` that includes `speaker`, `best_pid`, `second_best_pid`,
`best_score`, and `second_best_score`; keep the existing assignment logic and
logging otherwise.
- Around line 163-181: The timestamp math is fragile because naive vs tz-aware
datetimes yield different POSIX timestamps; normalize both
recording_start_datetime and event timestamps to UTC before calling
.timestamp(). Specifically, ensure recording_start_datetime is converted to a
tz-aware UTC datetime (use datetime.fromisoformat(...); if it's naive, set
tzinfo=timezone.utc or call .astimezone(timezone.utc)), and when computing ts
from datetime.fromisoformat(event["timestamp"]) do the same normalization (if
the parsed event dt is naive set tzinfo=timezone.utc; otherwise
.astimezone(timezone.utc)), then compute .timestamp() and subtract ref_epoch;
add from datetime import timezone to imports if missing and update the loop that
builds open_starts/intervals (the code around recording_start_datetime,
datetime.fromisoformat(event["timestamp"]) and Interval(...)) to use these
normalized datetimes.
In `@src/summary/tests/unit/test_user_assign.py`:
- Around line 25-26: Tests in test_user_assign.py use naive datetimes but
production uses UTC-aware datetimes, so add a unit test that mirrors production
by constructing RECORDING_START and RECORDING_END as tz-aware UTC datetimes
(e.g., using datetime.fromisoformat or datetime.fromtimestamp with
tz=timezone.utc) and include a mix of aware and naive event timestamps to
exercise _build_participant_timelines in user_assign.py; update the fixture
constants (RECORDING_START/RECORDING_END) or add a new test case that passes
these aware datetimes into the existing test helpers and asserts the same
expected timelines to ensure timezone handling is covered.
🪄 Autofix (Beta)
❌ Autofix failed (check again to retry)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e3854390-f21e-457f-95b3-cb0aeeb8a84b
📒 Files selected for processing (14)
CHANGELOG.mdsrc/agents/metadata_collector.pysrc/backend/core/api/viewsets.pysrc/backend/core/recording/event/notification.pysrc/backend/meet/settings.pysrc/helm/env.d/dev-keycloak/values.meet.yaml.gotmplsrc/summary/summary/api/route/tasks.pysrc/summary/summary/core/analytics.pysrc/summary/summary/core/celery_worker.pysrc/summary/summary/core/file_service.pysrc/summary/summary/core/transcript_formatter.pysrc/summary/summary/core/user_assign.pysrc/summary/tests/api/test_api_tasks.pysrc/summary/tests/unit/test_user_assign.py
| ### Added | ||
|
|
||
| - ✨(summary) add speaker-to-participant assignment | ||
|
|
There was a problem hiding this comment.
Document the breaking payload change explicitly.
Line 13 adds the feature but does not flag that the payload format is breaking. Please mark this as a breaking change and include a short migration note so API consumers can adapt safely.
Proposed changelog adjustment
-### Added
-- ✨(summary) add speaker-to-participant assignment
+### Changed
+- 💥(summary) BREAKING: payload format changed for speaker-to-participant assignment.
+ Migration: update consumers parsing transcription speaker/participant fields.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ### Added | |
| - ✨(summary) add speaker-to-participant assignment | |
| ### Changed | |
| - 💥(summary) BREAKING: payload format changed for speaker-to-participant assignment. | |
| Migration: update consumers parsing transcription speaker/participant fields. | |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@CHANGELOG.md` around lines 11 - 14, Update the changelog entry for
"✨(summary) add speaker-to-participant assignment" to explicitly mark it as a
breaking change and add a short migration note; under the same "### Added" or a
new "### Breaking Changes" section, append "BREAKING: speaker-to-participant
assignment changes payload format" and a 1–2 line migration instruction
describing what to rename/reshape in the API consumer payload (e.g., map old
speakerId field to participantId or adjust nested object structure), so
consumers know how to adapt when using the new assignment format.
| @staticmethod | ||
| def _get_recording_timestamps(worker_id): | ||
| """Fetch FileInfo.started_at and ended_at from LiveKit's egress API. | ||
|
|
||
| FileInfo.started_at is more accurate than EgressInfo.started_at because it | ||
| reflects when file recording actually began, not when the egress | ||
| process was initialized. | ||
|
|
||
| Returns: | ||
| Tuple of (started_at, ended_at) datetimes, either may be None. | ||
| """ | ||
| if not worker_id: | ||
| return None, None | ||
|
|
||
| def _ns_to_utc(ns): | ||
| return datetime.fromtimestamp(ns / 1e9, tz=timezone.utc) if ns else None | ||
|
|
||
| @async_to_sync | ||
| async def _fetch(): | ||
| lkapi = utils.create_livekit_client() | ||
| try: | ||
| egress_list = await asyncio.wait_for( | ||
| lkapi.egress.list_egress( | ||
| livekit_api.ListEgressRequest(egress_id=worker_id) # pylint: disable=no-member | ||
| ), | ||
| timeout=10, | ||
| ) | ||
| except (livekit_api.TwirpError, OSError, asyncio.TimeoutError): | ||
| logger.exception("Could not fetch egress info for worker %s", worker_id) | ||
| return None, None | ||
| finally: | ||
| await lkapi.aclose() | ||
|
|
||
| if not egress_list.items or not egress_list.items[0].file_results: | ||
| logger.debug("No file_results for worker %s", worker_id) | ||
| return None, None | ||
|
|
||
| # If information exists, extract started_at, ended_at | ||
| file_results = egress_list.items[0].file_results | ||
| started_at = None | ||
| ended_at = None | ||
| if file_results[0].started_at: | ||
| started_at = _ns_to_utc(file_results[0].started_at) | ||
| if file_results[0].ended_at: | ||
| ended_at = _ns_to_utc(file_results[0].ended_at) | ||
|
|
||
| return started_at, ended_at | ||
|
|
||
| return _fetch() |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 🏗️ Heavy lift
Blocking LiveKit RPC on the request thread.
_get_recording_timestamps performs a synchronous LiveKit egress call (async_to_sync on a 10 s asyncio.wait_for) inline with _notify_summary_service. If this notification path runs on a Django request worker (e.g. a webhook handler), every recording finalization now adds up to 10 s of blocking I/O before the summary service is even notified. Consider moving the timestamp fetch into the Celery task that processes the recording, or piggy-backing on the LiveKit manifest/egress_ended payload that already carries started_at/ended_at (matching the past reviewer's suggestion about reading from the egress manifest).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/recording/event/notification.py` around lines 137 - 185,
_get_recording_timestamps synchronously blocks the request thread by calling
LiveKit via async_to_sync/wait_for inside the notification path
(_notify_summary_service); move this I/O off the request path by removing the
inline LiveKit call and instead obtain started_at/ended_at asynchronously inside
the recording processing task (e.g., the Celery task that finalizes recordings)
or read them from the existing egress manifest/egress_ended payload;
specifically, remove or stub out the synchronous call in
_get_recording_timestamps (and any use of
async_to_sync/create_livekit_client/wait_for there) and change callers (such as
_notify_summary_service) to accept optional started_at/ended_at parameters or to
enqueue a task that will call the existing LiveKit client asynchronously within
the worker context where blocking is acceptable.
| output_folder = settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER | ||
| metadata_filename = f"{output_folder}/{recording.id}-metadata.json" |
There was a problem hiding this comment.
Guard against missing METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER.
If the setting is unset, metadata_filename becomes "None/<id>-metadata.json" and the summary worker will silently fail to read metadata (and skip speaker assignment) on every recording. Either treat this case as configuration error or fall back to skipping the field on the payload so the worker takes the no-assignment path explicitly.
🛡️ Suggested guard
- output_folder = settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER
- metadata_filename = f"{output_folder}/{recording.id}-metadata.json"
+ output_folder = settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER
+ metadata_filename = (
+ f"{output_folder}/{recording.id}-metadata.json" if output_folder else None
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| output_folder = settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER | |
| metadata_filename = f"{output_folder}/{recording.id}-metadata.json" | |
| output_folder = settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER | |
| metadata_filename = ( | |
| f"{output_folder}/{recording.id}-metadata.json" if output_folder else None | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/backend/core/recording/event/notification.py` around lines 207 - 208,
Guard against a missing METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER by checking
settings.METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER before building
metadata_filename (where metadata_filename =
f"{output_folder}/{recording.id}-metadata.json"); if the setting is falsy,
either raise a configuration error (e.g., log and raise/abort) or omit the
metadata_filename field from the payload so the summary worker falls back to the
no-assignment path—update the code around metadata_filename and the surrounding
function/class handling the payload to implement this conditional behavior and
ensure recording.id is only used when output_folder is present.
There was a problem hiding this comment.
(Now renamed METADATA_COLLECTOR_OUTPUT_FOLDER)
Has a default value. I believe this current implementation addresses the issue:
METADATA_COLLECTOR_OUTPUT_FOLDER = values.Value(
"metadata",
environ_name="METADATA_COLLECTOR_OUTPUT_FOLDER",
environ_prefix=None,
)
| recording_filename: str | ||
| metadata_filename: str | ||
| email: str | ||
| sub: str | ||
| version: Optional[int] = 2 | ||
| room: Optional[str] | ||
| recording_date: Optional[str] | ||
| recording_time: Optional[str] | ||
| owner_timezone: Optional[str] | ||
| language: Optional[str] | ||
| download_link: Optional[str] | ||
| context_language: Optional[str] = None | ||
| recording_start_at: Optional[str] = None | ||
| recording_end_at: Optional[str] = None |
There was a problem hiding this comment.
metadata_filename is now strictly required — confirm backend invariant.
metadata_filename: str (no default) makes this a hard contract: any v2 caller that doesn't provide a metadata path now gets a 422. Backend currently always builds the path from METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER, but that setting can be unset (see notification.py:207). If you want speaker assignment to be opt-in, model this as Optional[str] = None and have the worker simply skip metadata loading when absent.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/summary/api/route/tasks.py` around lines 22 - 33, The
model/schema currently makes metadata_filename a required field
(metadata_filename: str) which forces all v2 callers to supply it; change
metadata_filename to Optional[str] = None and update the downstream
metadata-loading logic (the worker/code that reads
METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER and performs speaker assignment) to
simply skip loading/assigning speakers when metadata_filename is None or when
METADATA_COLLECTOR_AWS_S3_OUTPUT_FOLDER is unset; ensure any code paths that
assumed metadata_filename exists (speaker assignment routines) handle the None
case gracefully and do not raise a 422 or attempt to read S3.
| if recording_start_dt is not None and recording_end_dt is not None: | ||
| logger.debug("Running assign_speakers") | ||
|
|
||
| try: | ||
| metadata = file_service.read_json(metadata_filename) | ||
| assignment_result = assign_speakers( | ||
| metadata, | ||
| transcription, | ||
| recording_start_dt, | ||
| recording_end_dt, | ||
| ) | ||
| transcription = assignment_result.apply( | ||
| { | ||
| "segments": transcription.segments, | ||
| "word_segments": transcription.word_segments, | ||
| } | ||
| ) | ||
| except FileServiceException as exc: | ||
| logger.error("Error reading metadata: %s. Skipping speaker assignment", exc) | ||
| else: | ||
| logger.debug("Skipping assign_speakers") |
There was a problem hiding this comment.
assignment_result.apply is fed an incomplete dict and rebinds transcription, dropping fields.
You build a 2-key dict (segments, word_segments) from the Transcription, run apply on it, and then assign the result back to transcription. AssignmentResult.apply preserves only the keys it receives, so language, text, and any other attributes the original Transcription carried are silently lost on the assigned-speakers branch but kept on the skipped branch — making transcription a heterogeneous shape downstream. Pass the full transcription dump to apply (and access it consistently afterwards):
🛡️ Suggested fix
- try:
- metadata = file_service.read_json(metadata_filename)
- assignment_result = assign_speakers(
- metadata,
- transcription,
- recording_start_dt,
- recording_end_dt,
- )
- transcription = assignment_result.apply(
- {
- "segments": transcription.segments,
- "word_segments": transcription.word_segments,
- }
- )
- except FileServiceException as exc:
- logger.error("Error reading metadata: %s. Skipping speaker assignment", exc)
+ try:
+ metadata = file_service.read_json(metadata_filename)
+ assignment_result = assign_speakers(
+ metadata,
+ transcription,
+ recording_start_dt,
+ recording_end_dt,
+ )
+ transcription = assignment_result.apply(transcription.model_dump())
+ except FileServiceException as exc:
+ logger.error(
+ "Error reading metadata: %s. Skipping speaker assignment", exc
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if recording_start_dt is not None and recording_end_dt is not None: | |
| logger.debug("Running assign_speakers") | |
| try: | |
| metadata = file_service.read_json(metadata_filename) | |
| assignment_result = assign_speakers( | |
| metadata, | |
| transcription, | |
| recording_start_dt, | |
| recording_end_dt, | |
| ) | |
| transcription = assignment_result.apply( | |
| { | |
| "segments": transcription.segments, | |
| "word_segments": transcription.word_segments, | |
| } | |
| ) | |
| except FileServiceException as exc: | |
| logger.error("Error reading metadata: %s. Skipping speaker assignment", exc) | |
| else: | |
| logger.debug("Skipping assign_speakers") | |
| if recording_start_dt is not None and recording_end_dt is not None: | |
| logger.debug("Running assign_speakers") | |
| try: | |
| metadata = file_service.read_json(metadata_filename) | |
| assignment_result = assign_speakers( | |
| metadata, | |
| transcription, | |
| recording_start_dt, | |
| recording_end_dt, | |
| ) | |
| transcription = assignment_result.apply(transcription.model_dump()) | |
| except FileServiceException as exc: | |
| logger.error( | |
| "Error reading metadata: %s. Skipping speaker assignment", exc | |
| ) | |
| else: | |
| logger.debug("Skipping assign_speakers") |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/summary/core/celery_worker.py` around lines 288 - 308, The
current branch calls assignment_result.apply with only
{"segments","word_segments"}, which causes AssignmentResult.apply to return an
object missing other Transcription fields (language, text, etc.), so replace
that call to pass the full transcription data structure (the complete
transcription mapping/dump) to assignment_result.apply (use the existing
Transcription export/dump method or the full dict representation) and reassign
transcription to the apply result; ensure subsequent code accesses transcription
fields consistently (e.g., transcription.language, transcription.text) as before
so the object shape remains the same whether assign_speakers ran or was skipped.
There was a problem hiding this comment.
done with proposed change
| def read_json(self, object_name: str) -> dict: | ||
| """Read and parse a JSON file from MinIO storage.""" | ||
| logger.info("Reading JSON: %s", object_name) | ||
|
|
||
| if not object_name: | ||
| raise ValueError("Invalid object_name") | ||
|
|
||
| response = None | ||
| try: | ||
| response = self._minio_client.get_object(self._bucket_name, object_name) | ||
| return json.loads(response.read()) | ||
| except (MinioException, S3Error) as e: | ||
| raise FileServiceException( | ||
| "Unexpected error while reading JSON object." | ||
| ) from e | ||
| finally: | ||
| if response: | ||
| response.close() | ||
| response.release_conn() |
There was a problem hiding this comment.
Handle invalid JSON content and enforce dict payload shape.
At Line 242, JSON parsing/type errors are not wrapped and may leak as unexpected exceptions; also non-object JSON violates the dict contract. This can break downstream metadata processing paths.
Proposed fix
def read_json(self, object_name: str) -> dict:
"""Read and parse a JSON file from MinIO storage."""
logger.info("Reading JSON: %s", object_name)
@@
response = None
try:
response = self._minio_client.get_object(self._bucket_name, object_name)
- return json.loads(response.read())
+ payload = json.loads(response.read())
+ if not isinstance(payload, dict):
+ raise FileServiceException("Unexpected JSON root type; expected object.")
+ return payload
except (MinioException, S3Error) as e:
raise FileServiceException(
"Unexpected error while reading JSON object."
) from e
+ except (json.JSONDecodeError, UnicodeDecodeError) as e:
+ raise FileServiceException("Invalid JSON content.") from e
finally:
if response:
response.close()
response.release_conn()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def read_json(self, object_name: str) -> dict: | |
| """Read and parse a JSON file from MinIO storage.""" | |
| logger.info("Reading JSON: %s", object_name) | |
| if not object_name: | |
| raise ValueError("Invalid object_name") | |
| response = None | |
| try: | |
| response = self._minio_client.get_object(self._bucket_name, object_name) | |
| return json.loads(response.read()) | |
| except (MinioException, S3Error) as e: | |
| raise FileServiceException( | |
| "Unexpected error while reading JSON object." | |
| ) from e | |
| finally: | |
| if response: | |
| response.close() | |
| response.release_conn() | |
| def read_json(self, object_name: str) -> dict: | |
| """Read and parse a JSON file from MinIO storage.""" | |
| logger.info("Reading JSON: %s", object_name) | |
| if not object_name: | |
| raise ValueError("Invalid object_name") | |
| response = None | |
| try: | |
| response = self._minio_client.get_object(self._bucket_name, object_name) | |
| payload = json.loads(response.read()) | |
| if not isinstance(payload, dict): | |
| raise FileServiceException("Unexpected JSON root type; expected object.") | |
| return payload | |
| except (MinioException, S3Error) as e: | |
| raise FileServiceException( | |
| "Unexpected error while reading JSON object." | |
| ) from e | |
| except (json.JSONDecodeError, UnicodeDecodeError) as e: | |
| raise FileServiceException("Invalid JSON content.") from e | |
| finally: | |
| if response: | |
| response.close() | |
| response.release_conn() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/summary/core/file_service.py` around lines 232 - 250, In
read_json, catch JSON decoding and invalid payload types so malformed or
non-object JSON doesn't leak: after calling
self._minio_client.get_object(self._bucket_name, object_name) and reading the
response, wrap json.loads in a try/except that catches json.JSONDecodeError (and
TypeError/ValueError if needed) and raise FileServiceException with a clear
message; also validate that the parsed value is a dict and raise
FileServiceException if not; keep the existing exception handling for
MinioException/S3Error and retain the finally block that closes/release_conn on
response.
There was a problem hiding this comment.
implemented as suggested
| ref_epoch = recording_start_datetime.timestamp() | ||
|
|
||
| open_starts: dict[str, float] = {} | ||
| intervals: dict[str, list[Interval]] = {} | ||
|
|
||
| for event in events: | ||
| pid = event["participant_id"] | ||
| ts = datetime.fromisoformat(event["timestamp"]).timestamp() - ref_epoch | ||
| etype = event["type"] | ||
|
|
||
| if etype == "speech_start": | ||
| open_starts[pid] = max(ts, 0.0) | ||
| elif etype == "speech_end": | ||
| start = open_starts.pop(pid, None) | ||
| if start is not None: | ||
| end = max(ts, 0.0) | ||
| if end > start: | ||
| intervals.setdefault(pid, []).append(Interval(start, end)) | ||
|
|
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Look at how metadata event timestamps are produced to confirm tz handling
fd -e py metadata_collector
rg -nP --type=py -C3 '"timestamp"\s*:' -g '!**/tests/**'
rg -nP --type=py -C3 'speech_(start|end)' -g '!**/tests/**'Repository: suitenumerique/meet
Length of output: 3370
🏁 Script executed:
# Check the MetadataEvent class definition and timestamp field
rg -n "class MetadataEvent" -A 10 --type=py
# Check how timestamp is set in metadata_collector
head -150 src/agents/metadata_collector.py | tail -70
# Check notification.py line 152 for recording_start_at creation
sed -n '145,160p' src/backend/core/recording/services/notification.pyRepository: suitenumerique/meet
Length of output: 3448
🏁 Script executed:
# Find notification.py
find . -name "notification.py" -type f
# Check how MetadataEvent is serialized (asdict converts to dict)
# and look at the serialize() method
sed -n '59,75p' src/agents/metadata_collector.py
# Check how metadata dict is loaded/deserialized in user_assign.py
rg -n "def _build_participant_timelines" -A 25 src/summary/summary/core/user_assign.pyRepository: suitenumerique/meet
Length of output: 1839
🏁 Script executed:
# Check notification.py for recording_start_at
sed -n '1,200p' src/backend/core/recording/event/notification.py | grep -n -C 5 "recording_start"
# Check how metadata is loaded in user_assign.py or its callers
rg -n "metadata.*=" -B 3 -A 3 src/summary/summary/core/user_assign.py | head -50
# Search for where metadata dict is created/loaded from JSON
rg -n "json.load\|json.loads\|\.json()" -B 3 -A 3 --type=py | head -80Repository: suitenumerique/meet
Length of output: 45
🏁 Script executed:
# Search broadly for recording_start_datetime usage
rg -n "recording_start_datetime" --type=py
# Search for where _build_participant_timelines is called
rg -n "_build_participant_timelines" --type=py
# Look for metadata loading in summary module
rg -n "metadata" src/summary/summary/core/user_assign.py | head -20Repository: suitenumerique/meet
Length of output: 1454
🏁 Script executed:
# Read the function around line 240 to see where recording_start_datetime comes from
sed -n '210,250p' src/summary/summary/core/user_assign.py
# Search for calls to the public API (likely assign_speakers)
rg -n "def assign_speakers" -A 20 src/summary/summary/core/user_assign.py
# Search for where assign_speakers is called to see what values are passed
rg -n "assign_speakers\(" --type=py -B 2 -A 2Repository: suitenumerique/meet
Length of output: 6926
🏁 Script executed:
# Check celery_worker.py to see how recording_start_datetime is created
sed -n '280,310p' src/summary/summary/core/celery_worker.py
# Check test metadata to see if timestamps are tz-aware or naive
rg -n "RECORDING_START\|METADATA_" src/summary/tests/unit/test_user_assign.py | head -20
# Look at actual test metadata definitions
sed -n '1,100p' src/summary/tests/unit/test_user_assign.pyRepository: suitenumerique/meet
Length of output: 3853
🏁 Script executed:
# Check where recording_start_at comes from in celery_worker
sed -n '260,290p' src/summary/summary/core/celery_worker.py
# Check the parameters passed to the celery task
rg -n "def.*process\|recording_start_at\|recording_end_at" src/summary/summary/core/celery_worker.py | head -20
# Check if there's any JSON schema or deserialization in file_service
rg -n "read_json" src/summary --type=pyRepository: suitenumerique/meet
Length of output: 1235
🏁 Script executed:
# Check file_service.read_json to see if it does any special deserialization
sed -n '232,250p' src/summary/summary/core/file_service.py
# Check the celery task signature to see where recording_start_at comes from
rg -n "def.*task\|recording_start_at" src/summary/summary/core/celery_worker.py | head -10
# Search for where this celery task is called from the backend
rg -n "process_recording\|celery" src/backend --type=py | head -20Repository: suitenumerique/meet
Length of output: 764
🏁 Script executed:
# Check how metadata is written/serialized by metadata_collector
rg -n "serialize\|json.dump" src/agents/metadata_collector.py -A 5
# Check if there's any JSON encoding that strips timezone info
rg -n "JSONEncoder\|json.dump" src/agents --type=py -B 2 -A 5
# Verify what format is actually stored in the JSON - search for how events list is built
rg -n "self.events\|serialize" src/agents/metadata_collector.py -B 2 -A 3Repository: suitenumerique/meet
Length of output: 45
🏁 Script executed:
# Look for where metadata is converted to JSON and stored
fd -e py metadata_collector | xargs cat | grep -n "json\|serialize" | head -30
# Check the full metadata_collector.py for serialization
cat src/agents/metadata_collector.py | grep -n "serialize\|json\|dump" | head -20
# Read the entire metadata_collector.py to understand the flow
cat src/agents/metadata_collector.pyRepository: suitenumerique/meet
Length of output: 13458
🏁 Script executed:
# Check notification.py for how recording_start_at is documented or sourced
cat src/backend/core/recording/event/notification.py | head -100
# Search for where recording_start_at is set/created in the backend
rg -n "recording_start_at\|recording_start_datetime" src/backend --type=py | head -20
# Check test constants for RECORDING_START to see if they include timezone
grep -n "RECORDING_START\|RECORDING_END" src/summary/tests/unit/test_user_assign.pyRepository: suitenumerique/meet
Length of output: 4430
🏁 Script executed:
# Check if RECORDING_START has timezone info when parsed
python3 << 'EOF'
from datetime import datetime
dt = datetime.fromisoformat("2026-03-17T15:30:33.000001")
print(f"tzinfo: {dt.tzinfo}")
print(f"timestamp(): {dt.timestamp()}")
EOF
# Check a sample metadata event timestamp from test
python3 << 'EOF'
from datetime import datetime
event_ts = datetime.fromisoformat("2026-03-17T15:30:36.039456")
print(f"Event tzinfo: {event_ts.tzinfo}")
print(f"Event timestamp(): {event_ts.timestamp()}")
EOF
# Check what isoformat() produces for tz-aware UTC datetime
python3 << 'EOF'
from datetime import datetime, timezone
dt_aware = datetime.now(timezone.utc)
iso = dt_aware.isoformat()
print(f"isoformat with UTC: {iso}")
dt_parsed = datetime.fromisoformat(iso)
print(f"Parsed tzinfo: {dt_parsed.tzinfo}")
EOF
# Search backend code to see how recording_start_at is created
rg -n "egress_ended\|FileInfo\|recording_start" src/backend --type=py | head -20Repository: suitenumerique/meet
Length of output: 235
🏁 Script executed:
# Check what LiveKit FileInfo provides for start_time
rg -n "FileInfo\|start_time\|recording_start" src/backend/core/recording/event/notification.py -B 3 -A 3
# Look for the actual webhook payload or FileInfo documentation references
rg -n "webhook\|egress_ended\|recording_start_at" src/backend --type=py -B 2 -A 5 | head -50Repository: suitenumerique/meet
Length of output: 45
🏁 Script executed:
# Check if there's any custom JSON encoder that might strip timezone
rg -n "json.dump\|JSONEncoder\|default=" src/backend src/summary --type=py
# Look at the actual backend code that creates recording_start_at from LiveKit
rg -n "class.*Recording\|def.*egress\|start_time\|FileInfo" src/backend/core/recording/models.py --type=py | head -30
# Check if recording_start_at is documented anywhere
rg -n "recording_start_at\|start_time" src/backend/core/recording/models.py -B 2 -A 2Repository: suitenumerique/meet
Length of output: 217
🏁 Script executed:
# Find where recording_start_at is used in the backend
find src/backend -name "*.py" -type f | xargs grep -l "recording_start_at" | head -5
# List the structure of src/backend/core/recording
ls -la src/backend/core/recording/
# Search for where recording_start_at is set from LiveKit data
rg -n "recording_start_at\|start_time" src/backend --type=py -B 3 -A 3 | head -40Repository: suitenumerique/meet
Length of output: 511
🏁 Script executed:
# Search for where recording_start_at comes from in notification.py
rg -n "recording_start_at" src/backend/core/recording/event/notification.py -B 5 -A 5
# Check the complete flow in celery_worker.py for recording_start_at
rg -n "recording_start_at\|recording_start_dt" src/summary/summary/core/celery_worker.py -B 5 -A 5
# Check task signature to see what parameters are passed
sed -n '240,280p' src/summary/summary/core/celery_worker.pyRepository: suitenumerique/meet
Length of output: 2437
Normalize datetime timezone info before computing POSIX timestamps to prevent silent misalignment.
In production, recording_start_datetime is parsed via datetime.fromisoformat(recording_start_at) where recording_start_at is the ISO 8601 string from LiveKit FileInfo.started_at (celery_worker.py:277). Metadata event timestamps are created via datetime.now(timezone.utc) (metadata_collector.py:88), serialized with isoformat() (which includes +00:00), and deserialized back as tz-aware UTC strings.
However, calling .timestamp() on a naive datetime interprets it as local time, while on a tz-aware datetime it uses UTC. If awareness differs (due to missing or inconsistent timezone info from LiveKit or custom timezone handling), ts - ref_epoch silently shifts by the worker host's UTC offset—hours of misalignment.
The test suite masks this: test constants use naive strings ("2026-03-17T15:30:33.000001" without +00:00), so both sides are naive and mismatch never occurs. Production may work today if LiveKit always provides tz-aware UTC, but the code is fragile.
Normalize both sides defensively before computing epoch offset:
Suggested fix
- ref_epoch = recording_start_datetime.timestamp()
+ def _to_utc(dt: datetime) -> datetime:
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=timezone.utc)
+ return dt.astimezone(timezone.utc)
+
+ ref_epoch = _to_utc(recording_start_datetime).timestamp()
@@
- ts = datetime.fromisoformat(event["timestamp"]).timestamp() - ref_epoch
+ ts = _to_utc(datetime.fromisoformat(event["timestamp"])).timestamp() - ref_epoch(Add from datetime import timezone to imports if not present.)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/summary/core/user_assign.py` around lines 163 - 181, The
timestamp math is fragile because naive vs tz-aware datetimes yield different
POSIX timestamps; normalize both recording_start_datetime and event timestamps
to UTC before calling .timestamp(). Specifically, ensure
recording_start_datetime is converted to a tz-aware UTC datetime (use
datetime.fromisoformat(...); if it's naive, set tzinfo=timezone.utc or call
.astimezone(timezone.utc)), and when computing ts from
datetime.fromisoformat(event["timestamp"]) do the same normalization (if the
parsed event dt is naive set tzinfo=timezone.utc; otherwise
.astimezone(timezone.utc)), then compute .timestamp() and subtract ref_epoch;
add from datetime import timezone to imports if missing and update the loop that
builds open_starts/intervals (the code around recording_start_datetime,
datetime.fromisoformat(event["timestamp"]) and Interval(...)) to use these
normalized datetimes.
| best_pid: str | None = None | ||
| best_score: float = 0.0 | ||
|
|
||
| for pid, part_intervals in participant_timelines.items(): | ||
| overlap = _overlap_duration(speaker_intervals, part_intervals) | ||
| score = overlap / speaker_duration | ||
| if score > best_score: | ||
| best_score = score | ||
| best_pid = pid | ||
|
|
||
| if best_pid is not None and best_score >= overlap_threshold: | ||
| result.assignments.append( | ||
| SpeakerAssignment( | ||
| speaker_label=speaker, | ||
| participant_id=best_pid, | ||
| participant_name=participant_names.get(best_pid, best_pid), | ||
| score=best_score, | ||
| ) | ||
| ) | ||
| logger.info( | ||
| "Assigned %s -> %s (score=%.3f)", | ||
| speaker, | ||
| participant_names.get(best_pid, best_pid), | ||
| best_score, | ||
| ) | ||
| else: | ||
| result.unassigned_speakers.append(speaker) | ||
| logger.info( | ||
| "Speaker %s unassigned (best=%.3f, threshold=%.3f)", | ||
| speaker, | ||
| best_score, | ||
| overlap_threshold, | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Tie-breaking on score is sensitive to participant ordering.
if score > best_score keeps the first iterated participant on ties, and participant_timelines iteration order depends on insertion order from metadata["participants"]. For practical inputs ties at the threshold are rare, but consider logging a warning when the top two scores are within a small epsilon, so multi-tenant/multi-speaker recordings don't silently flip assignments based on payload ordering.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/summary/core/user_assign.py` around lines 258 - 290, The current
tie-breaking uses strict `if score > best_score` which keeps the first iterated
participant on ties; update the loop that iterates `participant_timelines` (and
calls `_overlap_duration`) to also track the runner-up score and id (e.g.,
maintain `second_best_score: float = 0.0` and `second_best_pid: str | None =
None`), updating them whenever you replace or beat the best (on `score >
best_score` set `second_best_* = best_*` before assigning new best; on `elif
score > second_best_score` update second best), then after the loop but before
creating `SpeakerAssignment` check a small epsilon (e.g., eps = 1e-6) and if
`best_score - second_best_score <= eps` emit a `logger.warning` that includes
`speaker`, `best_pid`, `second_best_pid`, `best_score`, and `second_best_score`;
keep the existing assignment logic and logging otherwise.
There was a problem hiding this comment.
the probability of this happening seems too low to bother
| RECORDING_START = datetime.fromisoformat("2026-03-17T15:30:33.000001") | ||
| RECORDING_END = datetime.fromisoformat("2026-03-17T15:31:33.000001") |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Test fixtures use naive datetimes, but production wires tz-aware UTC datetimes.
RECORDING_START/RECORDING_END and every metadata event timestamp here are naive. In production, recording_start_at/recording_end_at come from datetime.fromtimestamp(ns/1e9, tz=timezone.utc).isoformat() (notification.py), so datetime.fromisoformat(...) in the worker yields tz-aware datetimes. The current tests therefore never exercise the tz-aware path through _build_participant_timelines. Add at least one test that mirrors the production wiring (aware RECORDING_START/RECORDING_END with aware/naive event timestamps) so the timezone semantics in user_assign.py are pinned down.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/summary/tests/unit/test_user_assign.py` around lines 25 - 26, Tests in
test_user_assign.py use naive datetimes but production uses UTC-aware datetimes,
so add a unit test that mirrors production by constructing RECORDING_START and
RECORDING_END as tz-aware UTC datetimes (e.g., using datetime.fromisoformat or
datetime.fromtimestamp with tz=timezone.utc) and include a mix of aware and
naive event timestamps to exercise _build_participant_timelines in
user_assign.py; update the fixture constants (RECORDING_START/RECORDING_END) or
add a new test case that passes these aware datetimes into the existing test
helpers and asserts the same expected timelines to ensure timezone handling is
covered.
|
Note Autofix is a beta feature. Expect some limitations and changes as we gather feedback and continue to improve it. ❌ Cannot run autofix: This PR has merge conflicts. Please resolve the conflicts with the base branch and try again. Alternatively, use |
|
Note Autofix is a beta feature. Expect some limitations and changes as we gather feedback and continue to improve it. ❌ Cannot run autofix: This PR has merge conflicts. Please resolve the conflicts with the base branch and try again. Alternatively, use |
| egress_list = await asyncio.wait_for( | ||
| lkapi.egress.list_egress( | ||
| livekit_api.ListEgressRequest(egress_id=worker_id) # pylint: disable=no-member | ||
| ), | ||
| timeout=10, | ||
| ) |
There was a problem hiding this comment.
I guess you don't need to wrap it in asyncio.wait_for and just pass the timeout while instantiating the client.
Wdyt?
| def _ns_to_utc(ns): | ||
| return datetime.fromtimestamp(ns / 1e9, tz=timezone.utc) if ns else None | ||
|
|
||
| @async_to_sync | ||
| async def _fetch(): |
There was a problem hiding this comment.
I'd suggest hoisting both helpers out of the method body.
| recording_start_dt = None | ||
| recording_end_dt = None | ||
| if recording_start_at: | ||
| recording_start_dt = datetime.fromisoformat(recording_start_at) | ||
| if recording_end_at: | ||
| recording_end_dt = datetime.fromisoformat(recording_end_at) |
There was a problem hiding this comment.
make them inliner
a = datetime.fromisoformat(a) if a else None
| recording_start_dt = None | ||
| recording_end_dt = None | ||
| if recording_start_at: | ||
| recording_start_dt = datetime.fromisoformat(recording_start_at) | ||
| if recording_end_at: | ||
| recording_end_dt = datetime.fromisoformat(recording_end_at) | ||
|
|
||
| logger.debug( | ||
| "recording_start_dt: %s ; recording_end_dt: %s", | ||
| recording_start_dt, | ||
| recording_end_dt, | ||
| ) | ||
| if recording_start_dt is not None and recording_end_dt is not None: | ||
| logger.debug("Running assign_speakers") | ||
|
|
||
| try: | ||
| metadata = file_service.read_json(metadata_filename) | ||
| assignment_result = assign_speakers( | ||
| metadata, | ||
| transcription, | ||
| recording_start_dt, | ||
| recording_end_dt, | ||
| ) | ||
| transcription = assignment_result.rewrite_diarization( | ||
| transcription = assignment_result.apply(transcription.model_dump()) | ||
| ) | ||
| except FileServiceException as exc: | ||
| logger.error( | ||
| "Error reading metadata for task %s; skipping speaker assignment. Error: %s", | ||
| task_id, exc | ||
| ) | ||
| except Exception as exc: # noqa: BLE001 | ||
| logger.exception( | ||
| "assign_speakers failed for task %s; skipping speaker assignment. Error: %s", | ||
| task_id, exc | ||
| ) | ||
| else: | ||
| logger.debug("Skipping assign_speakers") |
There was a problem hiding this comment.
this would benefit from being encapsulated in a well scope service, with early return, loggin etc.. to avoid add again too much logic in the celery worker function.
|



Purpose
Users
Proposal
Build upon: #741
overlap / speaker_duration). Assign name for largest score (if score > threshold).charlie (SPEAKER_2))Example Output
Two speakers, single user/microphone (named Cameledev)