@@ -398,12 +398,6 @@ def _reset(self):
398398 # map room IDs to sets of users currently typing
399399 self ._room_typing = {}
400400
401- def stream_positions (self ):
402- # We must update this typing token from the response of the previous
403- # sync. In particular, the stream id may "reset" back to zero/a low
404- # value which we *must* use for the next replication request.
405- return {"typing" : self ._latest_room_serial }
406-
407401 def process_replication_rows (self , token , rows ):
408402 if self ._latest_room_serial > token :
409403 # The master has gone backwards. To prevent inconsistent data, just
@@ -643,13 +637,6 @@ async def on_rdata(self, stream_name, token, rows):
643637 )
644638 await self .process_and_notify (stream_name , token , rows )
645639
646- def get_streams_to_replicate (self ):
647- args = super (GenericWorkerReplicationHandler , self ).get_streams_to_replicate ()
648- args .update (self .typing_handler .stream_positions ())
649- if self .send_handler :
650- args .update (self .send_handler .stream_positions ())
651- return args
652-
653640 async def process_and_notify (self , stream_name , token , rows ):
654641 try :
655642 if self .send_handler :
@@ -784,9 +771,6 @@ def on_start(self):
784771 def wake_destination (self , server : str ):
785772 self .federation_sender .wake_destination (server )
786773
787- def stream_positions (self ):
788- return {"federation" : self .federation_position }
789-
790774 async def process_replication_rows (self , stream_name , token , rows ):
791775 # The federation stream contains things that we want to send out, e.g.
792776 # presence, typing, etc.
0 commit comments