1818
1919import txredisapi
2020
21- from synapse .logging .context import PreserveLoggingContext
21+ from synapse .logging .context import make_deferred_yieldable
2222from synapse .metrics .background_process_metrics import run_as_background_process
2323from synapse .replication .tcp .commands import (
2424 Command ,
4141class RedisSubscriber (txredisapi .SubscriberProtocol , AbstractConnection ):
4242 """Connection to redis subscribed to replication stream.
4343
44- Parses incoming messages from redis into replication commands, and passes
45- them to `ReplicationCommandHandler`
44+ This class fulfils two functions:
45+
46+ (a) it implements the twisted Protocol API, where it handles the SUBSCRIBEd redis
47+ connection, parsing *incoming* messages into replication commands, and passing them
48+ to `ReplicationCommandHandler`
49+
50+ (b) it implements the AbstractConnection API, where it sends *outgoing* commands
51+ onto outbound_redis_connection.
4652
4753 Due to the vagaries of `txredisapi` we don't want to have a custom
4854 constructor, so instead we expect the defined attributes below to be set
4955 immediately after initialisation.
5056
5157 Attributes:
5258 handler: The command handler to handle incoming commands.
53- stream_name: The *redis* stream name to subscribe to (not anything to
54- do with Synapse replication streams).
59+ stream_name: The *redis* stream name to subscribe to and publish from
60+ (not anything to do with Synapse replication streams).
5561 outbound_redis_connection: The connection to redis to use to send
5662 commands.
5763 """
@@ -61,12 +67,22 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
6167 outbound_redis_connection = None # type: txredisapi.RedisProtocol
6268
6369 def connectionMade (self ):
64- logger .info ("Connected to redis instance" )
65- self .subscribe (self .stream_name )
66- self .send_command (ReplicateCommand ())
67-
70+ logger .info ("Connected to redis" )
71+ run_as_background_process ("subscribe-replication" , self ._send_subscribe )
6872 self .handler .new_connection (self )
6973
74+ async def _send_subscribe (self ):
75+ # it's important to make sure that we only send the REPLICATE command once we
76+ # have successfully subscribed to the stream - otherwise we might miss the
77+ # POSITION response sent back by the other end.
78+ logger .info ("Sending redis SUBSCRIBE for %s" , self .stream_name )
79+ await make_deferred_yieldable (self .subscribe (self .stream_name ))
80+ logger .info (
81+ "Successfully subscribed to redis stream, sending REPLICATE command"
82+ )
83+ await self ._async_send_command (ReplicateCommand ())
84+ logger .info ("REPLICATE successfully sent" )
85+
7086 def messageReceived (self , pattern : str , channel : str , message : str ):
7187 """Received a message from redis.
7288 """
@@ -119,7 +135,7 @@ async def handle_command(self, cmd: Command):
119135 logger .warning ("Unhandled command: %r" , cmd )
120136
121137 def connectionLost (self , reason ):
122- logger .info ("Lost connection to redis instance " )
138+ logger .info ("Lost connection to redis" )
123139 self .handler .lost_connection (self )
124140
125141 def send_command (self , cmd : Command ):
@@ -128,6 +144,10 @@ def send_command(self, cmd: Command):
128144 Args:
129145 cmd (Command)
130146 """
147+ run_as_background_process ("send-cmd" , self ._send_command , cmd )
148+
149+ async def _async_send_command (self , cmd : Command ):
150+ """Encode a replication command and send it over our outbound connection"""
131151 string = "%s %s" % (cmd .NAME , cmd .to_line ())
132152 if "\n " in string :
133153 raise Exception ("Unexpected newline in command: %r" , string )
@@ -138,15 +158,9 @@ def send_command(self, cmd: Command):
138158 # remote instances.
139159 tcp_outbound_commands_counter .labels (cmd .NAME , "redis" ).inc ()
140160
141- async def _send ():
142- with PreserveLoggingContext ():
143- # Note that we use the other connection as we can't send
144- # commands using the subscription connection.
145- await self .outbound_redis_connection .publish (
146- self .stream_name , encoded_string
147- )
148-
149- run_as_background_process ("send-cmd" , _send )
161+ await make_deferred_yieldable (
162+ self .outbound_redis_connection .publish (self .stream_name , encoded_string )
163+ )
150164
151165
152166class RedisDirectTcpReplicationClientFactory (txredisapi .SubscriberFactory ):
0 commit comments