4242from synapse .http .server import JsonResource
4343from synapse .http .servlet import RestServlet , parse_json_object_from_request
4444from synapse .http .site import SynapseSite
45- from synapse .logging .context import LoggingContext , run_in_background
45+ from synapse .logging .context import LoggingContext
4646from synapse .metrics import METRICS_PREFIX , MetricsResource , RegistryProxy
4747from synapse .metrics .background_process_metrics import run_as_background_process
4848from synapse .replication .slave .storage ._base import BaseSlavedStore , __func__
@@ -641,7 +641,7 @@ async def on_rdata(self, stream_name, token, rows):
641641 await super (GenericWorkerReplicationHandler , self ).on_rdata (
642642 stream_name , token , rows
643643 )
644- run_in_background ( self .process_and_notify , stream_name , token , rows )
644+ await self .process_and_notify ( stream_name , token , rows )
645645
646646 def get_streams_to_replicate (self ):
647647 args = super (GenericWorkerReplicationHandler , self ).get_streams_to_replicate ()
@@ -656,7 +656,9 @@ def get_currently_syncing_users(self):
656656 async def process_and_notify (self , stream_name , token , rows ):
657657 try :
658658 if self .send_handler :
659- self .send_handler .process_replication_rows (stream_name , token , rows )
659+ await self .send_handler .process_replication_rows (
660+ stream_name , token , rows
661+ )
660662
661663 if stream_name == EventsStream .NAME :
662664 # We shouldn't get multiple rows per token for events stream, so
@@ -788,22 +790,20 @@ def wake_destination(self, server: str):
788790 def stream_positions (self ):
789791 return {"federation" : self .federation_position }
790792
791- def process_replication_rows (self , stream_name , token , rows ):
793+ async def process_replication_rows (self , stream_name , token , rows ):
792794 # The federation stream contains things that we want to send out, e.g.
793795 # presence, typing, etc.
794796 if stream_name == "federation" :
795797 send_queue .process_rows_for_federation (self .federation_sender , rows )
796- run_in_background ( self .update_token , token )
798+ await self .update_token ( token )
797799
798800 # We also need to poke the federation sender when new events happen
799801 elif stream_name == "events" :
800802 self .federation_sender .notify_new_events (token )
801803
802804 # ... and when new receipts happen
803805 elif stream_name == ReceiptsStream .NAME :
804- run_as_background_process (
805- "process_receipts_for_federation" , self ._on_new_receipts , rows
806- )
806+ await self ._on_new_receipts (rows )
807807
808808 # ... as well as device updates and messages
809809 elif stream_name == DeviceListsStream .NAME :
0 commit comments