Skip to content

Commit 1d728e4

Browse files
committed
♻️(backend) refactor backend recording state management
Instead of relying on the egress_started event—which fires when egress is starting, not actually started—I now rely on egress_updated for more accurate status updates. This is especially important for the active status, which triggers after egress has truly joined the room. Using this avoids prematurely stopping client-side listening to room.isRecording updates. A further refactoring may remove reliance on room updates entirely. The goal is to minimize handling metadata in the mediator class. egress_starting is still used for simplicity, but egress_started could be considered in the future. Note: if the API to start egress hasn’t responded yet, the webhook may fail to find the recording because it currently matches by worker ID. This is unstable. A better approach would be to pass the database ID in the egress metadata and recover the recording from it in the webhook.
1 parent a9f50e5 commit 1d728e4

5 files changed

Lines changed: 49 additions & 41 deletions

File tree

src/backend/core/recording/services/recording_events.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
"""Recording-related LiveKit Events Service"""
22

3+
# pylint: disable=no-member
4+
35
from logging import getLogger
46

7+
from livekit import api
8+
59
from core import models, utils
610

711
logger = getLogger(__name__)
@@ -14,6 +18,27 @@ class RecordingEventsError(Exception):
1418
class RecordingEventsService:
1519
"""Handles recording-related Livekit webhook events."""
1620

21+
@staticmethod
22+
def handle_update(recording, egress_status):
23+
"""Handle egress status updates and sync recording state to room metadata."""
24+
25+
room_name = str(recording.room.id)
26+
27+
status_mapping = {
28+
api.EgressStatus.EGRESS_ACTIVE: "started",
29+
api.EgressStatus.EGRESS_ENDING: "saving",
30+
api.EgressStatus.EGRESS_ABORTED: "aborted",
31+
}
32+
33+
recording_status = status_mapping.get(egress_status)
34+
if recording_status:
35+
try:
36+
utils.update_room_metadata(
37+
room_name, {"recording_status": recording_status}
38+
)
39+
except utils.MetadataUpdateException as e:
40+
logger.exception("Failed to update room's metadata: %s", e)
41+
1742
@staticmethod
1843
def handle_limit_reached(recording):
1944
"""Stop recording and notify participants when limit is reached."""

src/backend/core/recording/worker/mediator.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,4 @@ def stop(self, recording: Recording):
105105
finally:
106106
recording.save()
107107

108-
try:
109-
room_name = str(recording.room.id)
110-
utils.update_room_metadata(room_name, {"recording_status": "saving"})
111-
except utils.MetadataUpdateException as e:
112-
logger.exception("Failed to update room's metadata: %s", e)
113-
114108
logger.info("Worker stopped for room %s", recording.room)

src/backend/core/services/livekit_events.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -138,23 +138,19 @@ def receive(self, request):
138138
# pylint: disable=not-callable
139139
handler(data)
140140

141-
def _handle_egress_started(self, data):
142-
"""Handle 'egress_started' event."""
141+
def _handle_egress_updated(self, data):
142+
"""Handle 'egress_updated' event."""
143143

144+
egress_id = data.egress_info.egress_id
144145
try:
145-
recording = models.Recording.objects.get(
146-
worker_id=data.egress_info.egress_id
147-
)
146+
recording = models.Recording.objects.get(worker_id=egress_id)
148147
except models.Recording.DoesNotExist as err:
149148
raise ActionFailedError(
150-
f"Recording with worker ID {data.egress_info.egress_id} does not exist"
149+
f"Recording with worker ID {egress_id} does not exist"
151150
) from err
152151

153-
try:
154-
room_name = str(recording.room.id)
155-
utils.update_room_metadata(room_name, {"recording_status": "started"})
156-
except utils.MetadataUpdateException as e:
157-
logger.exception("Failed to update room's metadata: %s", e)
152+
egress_status = data.egress_info.status
153+
self.recording_events.handle_update(recording, egress_status)
158154

159155
def _handle_egress_ended(self, data):
160156
"""Handle 'egress_ended' event."""

src/backend/core/tests/recording/worker/test_mediator.py

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,7 @@ def test_mediator_start_recording_from_forbidden_status(
122122
mock_update_room_metadata.assert_not_called()
123123

124124

125-
@mock.patch("core.utils.update_room_metadata")
126-
def test_mediator_stop_recording_success(
127-
mock_update_room_metadata, mediator, mock_worker_service
128-
):
125+
def test_mediator_stop_recording_success(mediator, mock_worker_service):
129126
"""Test successful recording stop"""
130127
# Setup
131128
mock_recording = RecordingFactory(
@@ -143,15 +140,8 @@ def test_mediator_stop_recording_success(
143140
mock_recording.refresh_from_db()
144141
assert mock_recording.status == RecordingStatusChoices.STOPPED
145142

146-
mock_update_room_metadata.assert_called_once_with(
147-
str(mock_recording.room.id), {"recording_status": "saving"}
148-
)
149-
150143

151-
@mock.patch("core.utils.update_room_metadata")
152-
def test_mediator_stop_recording_aborted(
153-
mock_update_room_metadata, mediator, mock_worker_service
154-
):
144+
def test_mediator_stop_recording_aborted(mediator, mock_worker_service):
155145
"""Test recording stop when worker returns ABORTED"""
156146
# Setup
157147
mock_recording = RecordingFactory(
@@ -166,15 +156,10 @@ def test_mediator_stop_recording_aborted(
166156
mock_recording.refresh_from_db()
167157
assert mock_recording.status == RecordingStatusChoices.ABORTED
168158

169-
mock_update_room_metadata.assert_called_once_with(
170-
str(mock_recording.room.id), {"recording_status": "saving"}
171-
)
172-
173159

174160
@pytest.mark.parametrize("error_class", [WorkerConnectionError, WorkerResponseError])
175-
@mock.patch("core.utils.update_room_metadata")
176161
def test_mediator_stop_recording_worker_errors(
177-
mock_update_room_metadata, mediator, mock_worker_service, error_class
162+
mediator, mock_worker_service, error_class
178163
):
179164
"""Test handling of worker errors during stop"""
180165
# Setup
@@ -190,5 +175,3 @@ def test_mediator_stop_recording_worker_errors(
190175
# Verify recording updates
191176
mock_recording.refresh_from_db()
192177
assert mock_recording.status == RecordingStatusChoices.FAILED_TO_STOP
193-
194-
mock_update_room_metadata.assert_not_called()

src/backend/core/tests/services/test_livekit_events.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,29 @@ def test_handle_egress_ended_success(
9494
assert recording.status == "stopped"
9595

9696

97+
@pytest.mark.parametrize(
98+
("egress_status", "status"),
99+
(
100+
(EgressStatus.EGRESS_ACTIVE, "started"),
101+
(EgressStatus.EGRESS_ENDING, "saving"),
102+
(EgressStatus.EGRESS_ABORTED, "aborted"),
103+
),
104+
)
97105
@mock.patch("core.utils.update_room_metadata")
98-
def test_handle_egress_started_success(mock_update_room_metadata, service):
106+
def test_handle_egress_updated_success(
107+
mock_update_room_metadata, egress_status, status, service
108+
):
99109
"""Should successfully start recording and update room's metadata."""
100110

101111
recording = RecordingFactory(worker_id="worker-1", status="initiated")
102112
mock_data = mock.MagicMock()
103113
mock_data.egress_info.egress_id = recording.worker_id
104-
mock_data.egress_info.status = EgressStatus.EGRESS_ACTIVE
114+
mock_data.egress_info.status = egress_status
105115

106-
service._handle_egress_started(mock_data)
116+
service._handle_egress_updated(mock_data)
107117

108118
mock_update_room_metadata.assert_called_once_with(
109-
str(recording.room.id), {"recording_status": "started"}
119+
str(recording.room.id), {"recording_status": status}
110120
)
111121

112122

0 commit comments

Comments
 (0)