1515import logging
1616
1717from synapse .module_api import ModuleApi
18+ from synapse .util .duration import Duration
1819
1920from famedly_control_synapse .client import DiffRecord , FamedlyControlClient , Membership
2021from famedly_control_synapse .config import FamedlyControlConfig
2425logger = logging .getLogger (__name__ )
2526
2627
28+ class NoTokenException (Exception ):
29+ """Raise when a sync token was not provided"""
30+
31+
2732class GroupMembershipSyncer :
2833 """Background task that polls Famedly Control for group membership changes
2934 and updates managed room memberships accordingly."""
@@ -57,6 +62,7 @@ def start(self) -> None:
5762 """
5863 if self ._is_running or not self .is_enabled :
5964 return
65+ # Mark the loop as starting early, to avoid multiple background loops
6066 self ._is_running = True
6167 self .api .run_as_background_process (
6268 "famedly_control_group_membership_sync" , self .start_sync_loop
@@ -67,24 +73,43 @@ async def start_sync_loop(self) -> None:
6773
6874 If the sync token hasn't been loaded yet, tries to read it from the
6975 database. If it's still not available (no managed room created yet),
70- returns and will be retried on the next api request. Otherwise sets
76+ returns and will be retried on the next api request. Otherwise, sets
7177 up the periodic ``_process_sync``.
7278 """
7379 entry = await self .repository .get_sync_token_entry ()
7480 if entry is None :
7581 self ._is_running = False
7682 return
83+
84+ logger .info ("Starting loop to poll /get_all_groups_diffs" )
85+
7786 self ._sync_token_user_id , self ._sync_token = entry
78- self .api .looping_background_call (
79- self ._process_sync ,
80- self .polling_interval_seconds * 1000 ,
81- desc = "famedly_control_group_membership_sync" ,
82- )
87+
88+ while self ._is_running :
89+ try :
90+ await self ._process_sync ()
91+ # Several kinds of exceptions can be raised here. Timeouts, Cancellations,
92+ # Network Exceptions, HTTP Exceptions, general exceptions from database or
93+ # IO, etc. I believe the CancelledError from twisted.defer can be raised
94+ # from the sleep() below. Perhaps in the future that should be caught, it
95+ # should only occur during server shutdown though and can safely ignored
96+ # (although it may look rather scary in the logs)
97+ except Exception as e :
98+ logger .error ("Exception during loop: %r" , e )
99+ await self .api ._hs .get_clock ().sleep (
100+ Duration (seconds = self .polling_interval_seconds )
101+ )
83102
84103 async def _process_sync (self ) -> None :
85- """Execute a single sync iteration: fetch diffs and apply membership changes."""
104+ """
105+ Execute a single sync iteration: fetch diffs and apply membership changes.
106+
107+ Long poll for the response. If a 'next_sync' token is returned, use that on the
108+ next request.
109+ If no token is returned, wait for the configured amount and try again.
110+ """
86111 response = await self .client .get_all_groups_diffs (
87- sync = self ._sync_token , timeout = 30
112+ sync = self ._sync_token , timeout = self . polling_interval_seconds
88113 )
89114
90115 sync_succeeded = True
@@ -111,8 +136,15 @@ async def _process_sync(self) -> None:
111136 )
112137 return
113138
139+ if not response .next_sync :
140+ # If there was not a 'next_sync' token, long polling is not used. Raise, and
141+ # the loop can handle the wait
142+ raise NoTokenException ("No token in response" )
143+
114144 self ._sync_token = response .next_sync
115- if self ._sync_token_user_id is not None :
145+ if self ._sync_token_user_id is None :
146+ logger .error ("Loop was running but there was no user id for the sync token" )
147+ else :
116148 await self .repository .set_sync_token (
117149 self ._sync_token_user_id , self ._sync_token
118150 )
0 commit comments