Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 82498ee

Browse files
authored
Move server command handling out of TCP protocol (#7187)
This completes the merging of server and client command processing.
1 parent 7195313 commit 82498ee

4 files changed

Lines changed: 237 additions & 269 deletions

File tree

changelog.d/7187.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Move server command handling out of TCP protocol.

synapse/replication/tcp/handler.py

Lines changed: 159 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,23 @@
1919

2020
from prometheus_client import Counter
2121

22+
from synapse.metrics import LaterGauge
2223
from synapse.replication.tcp.client import ReplicationClientFactory
2324
from synapse.replication.tcp.commands import (
25+
ClearUserSyncsCommand,
2426
Command,
2527
FederationAckCommand,
2628
InvalidateCacheCommand,
2729
PositionCommand,
2830
RdataCommand,
2931
RemoteServerUpCommand,
3032
RemovePusherCommand,
33+
ReplicateCommand,
3134
SyncCommand,
3235
UserIpCommand,
3336
UserSyncCommand,
3437
)
38+
from synapse.replication.tcp.protocol import AbstractConnection
3539
from synapse.replication.tcp.streams import STREAMS_MAP, Stream
3640
from synapse.util.async_helpers import Linearizer
3741

@@ -42,6 +46,13 @@
4246
inbound_rdata_count = Counter(
4347
"synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
4448
)
49+
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
50+
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
51+
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
52+
invalidate_cache_counter = Counter(
53+
"synapse_replication_tcp_resource_invalidate_cache", ""
54+
)
55+
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
4556

4657

4758
class ReplicationCommandHandler:
@@ -52,6 +63,10 @@ class ReplicationCommandHandler:
5263
def __init__(self, hs):
5364
self._replication_data_handler = hs.get_replication_data_handler()
5465
self._presence_handler = hs.get_presence_handler()
66+
self._store = hs.get_datastore()
67+
self._notifier = hs.get_notifier()
68+
self._clock = hs.get_clock()
69+
self._instance_id = hs.get_instance_id()
5570

5671
# Set of streams that we've caught up with.
5772
self._streams_connected = set() # type: Set[str]
@@ -69,8 +84,26 @@ def __init__(self, hs):
6984
# The factory used to create connections.
7085
self._factory = None # type: Optional[ReplicationClientFactory]
7186

72-
# The current connection. None if we are currently (re)connecting
73-
self._connection = None
87+
# The currently connected connections.
88+
self._connections = [] # type: List[AbstractConnection]
89+
90+
LaterGauge(
91+
"synapse_replication_tcp_resource_total_connections",
92+
"",
93+
[],
94+
lambda: len(self._connections),
95+
)
96+
97+
self._is_master = hs.config.worker_app is None
98+
99+
self._federation_sender = None
100+
if self._is_master and not hs.config.send_federation:
101+
self._federation_sender = hs.get_federation_sender()
102+
103+
self._server_notices_sender = None
104+
if self._is_master:
105+
self._server_notices_sender = hs.get_server_notices_sender()
106+
self._notifier.add_remote_server_up_callback(self.send_remote_server_up)
74107

75108
def start_replication(self, hs):
76109
"""Helper method to start a replication connection to the remote server
@@ -82,6 +115,70 @@ def start_replication(self, hs):
82115
port = hs.config.worker_replication_port
83116
hs.get_reactor().connectTCP(host, port, self._factory)
84117

118+
async def on_REPLICATE(self, cmd: ReplicateCommand):
119+
# We only want to announce positions by the writer of the streams.
120+
# Currently this is just the master process.
121+
if not self._is_master:
122+
return
123+
124+
for stream_name, stream in self._streams.items():
125+
current_token = stream.current_token()
126+
self.send_command(PositionCommand(stream_name, current_token))
127+
128+
async def on_USER_SYNC(self, cmd: UserSyncCommand):
129+
user_sync_counter.inc()
130+
131+
if self._is_master:
132+
await self._presence_handler.update_external_syncs_row(
133+
cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
134+
)
135+
136+
async def on_CLEAR_USER_SYNC(self, cmd: ClearUserSyncsCommand):
137+
if self._is_master:
138+
await self._presence_handler.update_external_syncs_clear(cmd.instance_id)
139+
140+
async def on_FEDERATION_ACK(self, cmd: FederationAckCommand):
141+
federation_ack_counter.inc()
142+
143+
if self._federation_sender:
144+
self._federation_sender.federation_ack(cmd.token)
145+
146+
async def on_REMOVE_PUSHER(self, cmd: RemovePusherCommand):
147+
remove_pusher_counter.inc()
148+
149+
if self._is_master:
150+
await self._store.delete_pusher_by_app_id_pushkey_user_id(
151+
app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id
152+
)
153+
154+
self._notifier.on_new_replication_data()
155+
156+
async def on_INVALIDATE_CACHE(self, cmd: InvalidateCacheCommand):
157+
invalidate_cache_counter.inc()
158+
159+
if self._is_master:
160+
# We invalidate the cache locally, but then also stream that to other
161+
# workers.
162+
await self._store.invalidate_cache_and_stream(
163+
cmd.cache_func, tuple(cmd.keys)
164+
)
165+
166+
async def on_USER_IP(self, cmd: UserIpCommand):
167+
user_ip_cache_counter.inc()
168+
169+
if self._is_master:
170+
await self._store.insert_client_ip(
171+
cmd.user_id,
172+
cmd.access_token,
173+
cmd.ip,
174+
cmd.user_agent,
175+
cmd.device_id,
176+
cmd.last_seen,
177+
)
178+
179+
if self._server_notices_sender:
180+
await self._server_notices_sender.on_user_ip(cmd.user_id)
181+
85182
async def on_RDATA(self, cmd: RdataCommand):
86183
stream_name = cmd.stream_name
87184
inbound_rdata_count.labels(stream_name).inc()
@@ -174,36 +271,73 @@ async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
174271
""""Called when get a new REMOTE_SERVER_UP command."""
175272
self._replication_data_handler.on_remote_server_up(cmd.data)
176273

274+
if self._is_master:
275+
self._notifier.notify_remote_server_up(cmd.data)
276+
177277
def get_currently_syncing_users(self):
178278
"""Get the list of currently syncing users (if any). This is called
179279
when a connection has been established and we need to send the
180280
currently syncing users.
181281
"""
182282
return self._presence_handler.get_currently_syncing_users()
183283

184-
def update_connection(self, connection):
185-
"""Called when a connection has been established (or lost with None).
284+
def new_connection(self, connection: AbstractConnection):
285+
"""Called when we have a new connection.
186286
"""
187-
self._connection = connection
287+
self._connections.append(connection)
288+
289+
# If we are connected to replication as a client (rather than a server)
290+
# we need to reset the reconnection delay on the client factory (which
291+
# is used to do exponential back off when the connection drops).
292+
#
293+
# Ideally we would reset the delay when we've "fully established" the
294+
# connection (for some definition thereof) to stop us from tightlooping
295+
# on reconnection if something fails after this point and we drop the
296+
# connection. Unfortunately, we don't really have a better definition of
297+
# "fully established" than the connection being established.
298+
if self._factory:
299+
self._factory.resetDelay()
300+
301+
# Tell the server if we have any users currently syncing (should only
302+
# happen on synchrotrons)
303+
currently_syncing = self.get_currently_syncing_users()
304+
now = self._clock.time_msec()
305+
for user_id in currently_syncing:
306+
connection.send_command(
307+
UserSyncCommand(self._instance_id, user_id, True, now)
308+
)
188309

189-
def finished_connecting(self):
190-
"""Called when we have successfully subscribed and caught up to all
191-
streams we're interested in.
310+
def lost_connection(self, connection: AbstractConnection):
311+
"""Called when a connection is closed/lost.
192312
"""
193-
logger.info("Finished connecting to server")
313+
try:
314+
self._connections.remove(connection)
315+
except ValueError:
316+
pass
194317

195-
# We don't reset the delay any earlier as otherwise if there is a
196-
# problem during start up we'll end up tight looping connecting to the
197-
# server.
198-
if self._factory:
199-
self._factory.resetDelay()
318+
def connected(self) -> bool:
319+
"""Do we have any replication connections open?
320+
321+
Is used by e.g. `ReplicationStreamer` to no-op if nothing is connected.
322+
"""
323+
return bool(self._connections)
200324

201325
def send_command(self, cmd: Command):
202-
"""Send a command to master (when we get establish a connection if we
203-
don't have one already.)
326+
"""Send a command to all connected connections.
204327
"""
205-
if self._connection:
206-
self._connection.send_command(cmd)
328+
if self._connections:
329+
for connection in self._connections:
330+
try:
331+
connection.send_command(cmd)
332+
except Exception:
333+
# We probably want to catch some types of exceptions here
334+
# and log them as warnings (e.g. connection gone), but I
335+
# can't find what those exception types they would be.
336+
logger.exception(
337+
"Failed to write command %s to connection %s",
338+
cmd.NAME,
339+
connection,
340+
)
207341
else:
208342
logger.warning("Dropping command as not connected: %r", cmd.NAME)
209343

@@ -250,3 +384,10 @@ def send_user_ip(
250384

251385
def send_remote_server_up(self, server: str):
252386
self.send_command(RemoteServerUpCommand(server))
387+
388+
def stream_update(self, stream_name: str, token: str, data: Any):
389+
"""Called when a new update is available to stream to clients.
390+
391+
We need to check if the client is interested in the stream or not
392+
"""
393+
self.send_command(RdataCommand(stream_name, token, data))

0 commit comments

Comments
 (0)