4242from synapse .http .server import JsonResource
4343from synapse .http .servlet import RestServlet , parse_json_object_from_request
4444from synapse .http .site import SynapseSite
45- from synapse .logging .context import LoggingContext , run_in_background
45+ from synapse .logging .context import LoggingContext
4646from synapse .metrics import METRICS_PREFIX , MetricsResource , RegistryProxy
4747from synapse .metrics .background_process_metrics import run_as_background_process
4848from synapse .replication .slave .storage ._base import BaseSlavedStore , __func__
@@ -635,7 +635,7 @@ async def on_rdata(self, stream_name, token, rows):
635635 await super (GenericWorkerReplicationHandler , self ).on_rdata (
636636 stream_name , token , rows
637637 )
638- run_in_background ( self .process_and_notify , stream_name , token , rows )
638+ await self .process_and_notify ( stream_name , token , rows )
639639
640640 def get_streams_to_replicate (self ):
641641 args = super (GenericWorkerReplicationHandler , self ).get_streams_to_replicate ()
@@ -650,7 +650,9 @@ def get_currently_syncing_users(self):
650650 async def process_and_notify (self , stream_name , token , rows ):
651651 try :
652652 if self .send_handler :
653- self .send_handler .process_replication_rows (stream_name , token , rows )
653+ await self .send_handler .process_replication_rows (
654+ stream_name , token , rows
655+ )
654656
655657 if stream_name == EventsStream .NAME :
656658 # We shouldn't get multiple rows per token for events stream, so
@@ -782,22 +784,20 @@ def wake_destination(self, server: str):
782784 def stream_positions (self ):
783785 return {"federation" : self .federation_position }
784786
785- def process_replication_rows (self , stream_name , token , rows ):
787+ async def process_replication_rows (self , stream_name , token , rows ):
786788 # The federation stream contains things that we want to send out, e.g.
787789 # presence, typing, etc.
788790 if stream_name == "federation" :
789791 send_queue .process_rows_for_federation (self .federation_sender , rows )
790- run_in_background ( self .update_token , token )
792+ await self .update_token ( token )
791793
792794 # We also need to poke the federation sender when new events happen
793795 elif stream_name == "events" :
794796 self .federation_sender .notify_new_events (token )
795797
796798 # ... and when new receipts happen
797799 elif stream_name == ReceiptsStream .NAME :
798- run_as_background_process (
799- "process_receipts_for_federation" , self ._on_new_receipts , rows
800- )
800+ await self ._on_new_receipts (rows )
801801
802802 # ... as well as device updates and messages
803803 elif stream_name == DeviceListsStream .NAME :
0 commit comments