Skip to content

Commit 49f57ad

Browse files
authored
Fix sync group member playing out of sync after concurrent group changes (#4189)
1 parent 35af34d commit 49f57ad

3 files changed

Lines changed: 141 additions & 4 deletions

File tree

music_assistant/providers/sync_group/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@
1515
# from the user without disrupting the live sync session.
1616
IDLE_GRACE_SECONDS: Final[float] = 10.0
1717

18+
# Maximum seconds to wait for the sync leader to confirm it has actually started
19+
# playing after a (re)form. The group's playback lock is held for the duration so
20+
# a concurrent (un)group command can't race a start that is still in flight at the
21+
# device — which would otherwise strand a player playing outside the group.
22+
PLAYBACK_START_TIMEOUT: Final[float] = 5.0
23+
1824
CONF_ENTRY_SGP_NOTE = ConfigEntry(
1925
key="sgp_note",
2026
type=ConfigEntryType.ALERT,

music_assistant/providers/sync_group/player.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import asyncio
6+
from contextlib import asynccontextmanager
67
from typing import TYPE_CHECKING, Any, cast
78

89
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
@@ -25,10 +26,13 @@
2526
CONF_ENTRY_SGP_NOTE,
2627
EXTRA_FEATURES_FROM_MEMBERS,
2728
IDLE_GRACE_SECONDS,
29+
PLAYBACK_START_TIMEOUT,
2830
PROVIDERS_WITH_DYNAMIC_LEADER_SWITCH,
2931
)
3032

3133
if TYPE_CHECKING:
34+
from collections.abc import AsyncIterator
35+
3236
from music_assistant_models.player import PlayerSource
3337

3438
from .provider import SyncGroupProvider
@@ -395,9 +399,13 @@ async def play(self) -> None:
395399
# formed (e.g. after _dissolve_and_reform left us powered with no leader).
396400
# _form_syncgroup is idempotent so calling it here is cheap when already formed.
397401
await self._form_syncgroup()
398-
await self.mass.players.cmd_resume(
399-
self.player_id, self._attr_active_source, self._attr_current_media
400-
)
402+
# Hold the group's playback lock until the leader actually reports playing
403+
# so a concurrent (un)group command can't race the in-flight start — which
404+
# would otherwise leave a player streaming outside the group.
405+
async with self._await_leader_playback():
406+
await self.mass.players.cmd_resume(
407+
self.player_id, self._attr_active_source, self._attr_current_media
408+
)
401409

402410
async def poll(self) -> None:
403411
"""Poll player for state updates."""
@@ -415,7 +423,10 @@ async def play_media(self, media: PlayerMedia) -> None:
415423
if sync_leader := self.sync_leader:
416424
# Use internal handler to target the sync leader directly,
417425
# bypassing group/sync redirect that would loop back to this player.
418-
await self.mass.players._handle_play_media(sync_leader.player_id, media)
426+
# Hold the group's playback lock until the leader confirms playback
427+
# (see play()) so a concurrent (un)group command can't race the start.
428+
async with self._await_leader_playback():
429+
await self.mass.players._handle_play_media(sync_leader.player_id, media)
419430
else:
420431
raise RuntimeError("An empty group cannot play media, consider adding members first")
421432

@@ -644,6 +655,28 @@ async def _form_syncgroup(self) -> None:
644655
self.sync_leader, player_ids_to_add=members_to_sync
645656
)
646657

658+
@asynccontextmanager
659+
async def _await_leader_playback(self) -> AsyncIterator[None]:
660+
"""
661+
Wait for the sync leader to confirm playback for the command run in the body.
662+
663+
Wrap the play/resume call that targets the leader in this context manager.
664+
The group's playback lock (held by the caller) then stays acquired until the
665+
leader actually reports playing, so a concurrent (un)group command cannot
666+
race a start that has not yet taken effect at the device. A no-op when there
667+
is no leader to wait on.
668+
"""
669+
if (leader := self.sync_leader) is None:
670+
yield
671+
return
672+
async with self.mass.players.wait_for_player_update(
673+
leader.player_id,
674+
attribute_name="playback_state",
675+
attribute_value=PlaybackState.PLAYING,
676+
timeout=PLAYBACK_START_TIMEOUT,
677+
):
678+
yield
679+
647680
async def _dissolve_syncgroup(self) -> None:
648681
"""Dissolve the current syncgroup by ungrouping all members."""
649682
# a dissolve is happening now — any pending grace timer is no longer needed

tests/providers/test_sync_group.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,3 +1166,101 @@ def _get_raw(_player_id: str, key: str, default: object = None) -> object:
11661166
mass.config.get_raw_player_config_value = MagicMock(side_effect=_get_raw)
11671167
sgp = _make_sync_group(mass)
11681168
assert PlayerFeature.POWER in sgp.supported_features
1169+
1170+
1171+
def _recording_wait(order: list[str]) -> MagicMock:
1172+
"""
1173+
Build a wait_for_player_update replacement that records call/enter/exit order.
1174+
1175+
The returned mock records ``wait_for:<player_id>:<value>`` when invoked,
1176+
``subscribe`` on context enter and ``await`` on context exit, so a test can
1177+
assert that the playback start was wrapped (subscribe before the command is
1178+
issued) rather than awaited after the fact.
1179+
"""
1180+
1181+
class _RecordingWait:
1182+
async def __aenter__(self) -> None:
1183+
order.append("subscribe")
1184+
1185+
async def __aexit__(self, *_exc: object) -> bool:
1186+
order.append("await")
1187+
return False
1188+
1189+
def _wait(player_id: str, **kwargs: Any) -> _RecordingWait:
1190+
order.append(f"wait_for:{player_id}:{kwargs.get('attribute_value')}")
1191+
return _RecordingWait()
1192+
1193+
return MagicMock(side_effect=_wait)
1194+
1195+
1196+
class TestLeaderPlaybackAwaited:
1197+
"""A (re)form must not return until the leader confirms it has started playing."""
1198+
1199+
@pytest.mark.asyncio
1200+
async def test_play_waits_for_leader_before_returning(self) -> None:
1201+
"""play() wraps the resume in a wait so the group lock isn't released early."""
1202+
mass = _make_mock_mass()
1203+
sgp = _make_sync_group(mass)
1204+
leader = _make_mock_player("leader", playback_state=PlaybackState.IDLE)
1205+
mass.players.get_player = _player_lookup({"leader": leader})
1206+
sgp.sync_leader = leader
1207+
sgp._attr_group_members = ["leader"]
1208+
1209+
order: list[str] = []
1210+
mass.players.wait_for_player_update = _recording_wait(order)
1211+
mass.players.cmd_resume = AsyncMock(side_effect=lambda *_a, **_k: order.append("resume"))
1212+
1213+
with patch.object(sgp, "_form_syncgroup", new=AsyncMock()):
1214+
await sgp.play()
1215+
1216+
# subscribe happens before the resume command, and the wait completes
1217+
# after — i.e. the start is wrapped, not fire-and-forget.
1218+
assert order == [
1219+
f"wait_for:leader:{PlaybackState.PLAYING}",
1220+
"subscribe",
1221+
"resume",
1222+
"await",
1223+
]
1224+
1225+
@pytest.mark.asyncio
1226+
async def test_play_media_waits_for_leader_before_returning(self) -> None:
1227+
"""play_media() wraps the leader start so a concurrent (un)group can't race it."""
1228+
mass = _make_mock_mass()
1229+
sgp = _make_sync_group(mass)
1230+
leader = _make_mock_player("leader", playback_state=PlaybackState.IDLE)
1231+
mass.players.get_player = _player_lookup({"leader": leader})
1232+
sgp.sync_leader = leader
1233+
sgp._attr_group_members = ["leader"]
1234+
1235+
order: list[str] = []
1236+
mass.players.wait_for_player_update = _recording_wait(order)
1237+
mass.players._handle_play_media = AsyncMock(
1238+
side_effect=lambda *_a, **_k: order.append("play_media")
1239+
)
1240+
1241+
media = MagicMock()
1242+
media.source_id = "syncgroup_test"
1243+
with patch.object(sgp, "_form_syncgroup", new=AsyncMock()):
1244+
await sgp.play_media(media)
1245+
1246+
assert order == [
1247+
f"wait_for:leader:{PlaybackState.PLAYING}",
1248+
"subscribe",
1249+
"play_media",
1250+
"await",
1251+
]
1252+
1253+
@pytest.mark.asyncio
1254+
async def test_no_wait_when_group_has_no_leader(self) -> None:
1255+
"""With no leader to wait on, play() resumes without arming a playback wait."""
1256+
mass = _make_mock_mass()
1257+
sgp = _make_sync_group(mass)
1258+
sgp.sync_leader = None
1259+
1260+
mass.players.cmd_resume = AsyncMock()
1261+
1262+
with patch.object(sgp, "_form_syncgroup", new=AsyncMock()):
1263+
await sgp.play()
1264+
1265+
mass.players.wait_for_player_update.assert_not_called()
1266+
mass.players.cmd_resume.assert_awaited_once()

0 commit comments

Comments
 (0)