Skip to content

Commit b602d39

Browse files
erikjohnstonphil-flex
authored andcommitted
Fix replication metrics when using redis (matrix-org#7325)
1 parent 1ee7215 commit b602d39

3 files changed

Lines changed: 30 additions & 37 deletions

File tree

changelog.d/7325.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for running replication over Redis when using workers.

synapse/replication/tcp/protocol.py

Lines changed: 16 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,7 @@
5050
import fcntl
5151
import logging
5252
import struct
53-
from collections import defaultdict
54-
from typing import TYPE_CHECKING, DefaultDict, List
55-
56-
from six import iteritems
53+
from typing import TYPE_CHECKING, List
5754

5855
from prometheus_client import Counter
5956

@@ -86,6 +83,18 @@
8683
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
8784
)
8885

86+
tcp_inbound_commands_counter = Counter(
87+
"synapse_replication_tcp_protocol_inbound_commands",
88+
"Number of commands received from replication, by command and name of process connected to",
89+
["command", "name"],
90+
)
91+
92+
tcp_outbound_commands_counter = Counter(
93+
"synapse_replication_tcp_protocol_outbound_commands",
94+
"Number of commands sent to replication, by command and name of process connected to",
95+
["command", "name"],
96+
)
97+
8998
# A list of all connected protocols. This allows us to send metrics about the
9099
# connections.
91100
connected_connections = []
@@ -151,9 +160,6 @@ def __init__(self, clock: Clock, handler: "ReplicationCommandHandler"):
151160
# The LoopingCall for sending pings.
152161
self._send_ping_loop = None
153162

154-
self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
155-
self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
156-
157163
def connectionMade(self):
158164
logger.info("[%s] Connection established", self.id())
159165

@@ -224,9 +230,7 @@ def lineReceived(self, line: bytes):
224230

225231
self.last_received_command = self.clock.time_msec()
226232

227-
self.inbound_commands_counter[cmd.NAME] = (
228-
self.inbound_commands_counter[cmd.NAME] + 1
229-
)
233+
tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
230234

231235
# Now lets try and call on_<CMD_NAME> function
232236
run_as_background_process(
@@ -292,9 +296,8 @@ def send_command(self, cmd, do_buffer=True):
292296
self._queue_command(cmd)
293297
return
294298

295-
self.outbound_commands_counter[cmd.NAME] = (
296-
self.outbound_commands_counter[cmd.NAME] + 1
297-
)
299+
tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
300+
298301
string = "%s %s" % (cmd.NAME, cmd.to_line())
299302
if "\n" in string:
300303
raise Exception("Unexpected newline in command: %r", string)
@@ -546,26 +549,3 @@ def transport_kernel_read_buffer_size(protocol, read=True):
546549
for p in connected_connections
547550
},
548551
)
549-
550-
551-
tcp_inbound_commands = LaterGauge(
552-
"synapse_replication_tcp_protocol_inbound_commands",
553-
"",
554-
["command", "name"],
555-
lambda: {
556-
(k, p.name): count
557-
for p in connected_connections
558-
for k, count in iteritems(p.inbound_commands_counter)
559-
},
560-
)
561-
562-
tcp_outbound_commands = LaterGauge(
563-
"synapse_replication_tcp_protocol_outbound_commands",
564-
"",
565-
["command", "name"],
566-
lambda: {
567-
(k, p.name): count
568-
for p in connected_connections
569-
for k, count in iteritems(p.outbound_commands_counter)
570-
},
571-
)

synapse/replication/tcp/redis.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@
2525
ReplicateCommand,
2626
parse_command_from_line,
2727
)
28-
from synapse.replication.tcp.protocol import AbstractConnection
28+
from synapse.replication.tcp.protocol import (
29+
AbstractConnection,
30+
tcp_inbound_commands_counter,
31+
tcp_outbound_commands_counter,
32+
)
2933

3034
if TYPE_CHECKING:
3135
from synapse.replication.tcp.handler import ReplicationCommandHandler
@@ -79,6 +83,10 @@ def messageReceived(self, pattern: str, channel: str, message: str):
7983
)
8084
return
8185

86+
# We use "redis" as the name here as we don't have 1:1 connections to
87+
# remote instances.
88+
tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()
89+
8290
# Now lets try and call on_<CMD_NAME> function
8391
run_as_background_process(
8492
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
@@ -126,6 +134,10 @@ def send_command(self, cmd: Command):
126134

127135
encoded_string = string.encode("utf-8")
128136

137+
# We use "redis" as the name here as we don't have 1:1 connections to
138+
# remote instances.
139+
tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
140+
129141
async def _send():
130142
with PreserveLoggingContext():
131143
# Note that we use the other connection as we can't send

0 commit comments

Comments
 (0)