Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 51f7eaf

Browse files
authored
Add ability to run replication protocol over redis. (#7040)
This is configured via the `redis` config options.
1 parent 5308239 commit 51f7eaf

12 files changed

Lines changed: 342 additions & 36 deletions

File tree

changelog.d/7040.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add support for running replication over Redis when using workers.

stubs/txredisapi.pyi

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2020 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""Contains *incomplete* type hints for txredisapi.
17+
"""
18+
19+
from typing import List, Optional, Union
20+
21+
class RedisProtocol:
22+
def publish(self, channel: str, message: bytes): ...
23+
24+
class SubscriberProtocol:
25+
def subscribe(self, channels: Union[str, List[str]]): ...
26+
27+
def lazyConnection(
28+
host: str = ...,
29+
port: int = ...,
30+
dbid: Optional[int] = ...,
31+
reconnect: bool = ...,
32+
charset: str = ...,
33+
password: Optional[str] = ...,
34+
connectTimeout: Optional[int] = ...,
35+
replyTimeout: Optional[int] = ...,
36+
convertNumbers: bool = ...,
37+
) -> RedisProtocol: ...
38+
39+
class SubscriberFactory:
40+
def buildProtocol(self, addr): ...

synapse/app/homeserver.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,12 @@ def _configure_named_resource(self, name, compress=False):
273273
def start_listening(self, listeners):
274274
config = self.get_config()
275275

276+
if config.redis_enabled:
277+
# If redis is enabled we connect via the replication command handler
278+
# in the same way as the workers (since we're effectively a client
279+
# rather than a server).
280+
self.get_tcp_replication().start_replication(self)
281+
276282
for listener in listeners:
277283
if listener["type"] == "http":
278284
self._listening_services.extend(self._listener_http(config, listener))

synapse/config/homeserver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from .password_auth_providers import PasswordAuthProviderConfig
3232
from .push import PushConfig
3333
from .ratelimiting import RatelimitConfig
34+
from .redis import RedisConfig
3435
from .registration import RegistrationConfig
3536
from .repository import ContentRepositoryConfig
3637
from .room_directory import RoomDirectoryConfig
@@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
8283
RoomDirectoryConfig,
8384
ThirdPartyRulesConfig,
8485
TracerConfig,
86+
RedisConfig,
8587
]

synapse/config/redis.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2020 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from synapse.config._base import Config
17+
from synapse.python_dependencies import check_requirements
18+
19+
20+
class RedisConfig(Config):
21+
section = "redis"
22+
23+
def read_config(self, config, **kwargs):
24+
redis_config = config.get("redis", {})
25+
self.redis_enabled = redis_config.get("enabled", False)
26+
27+
if not self.redis_enabled:
28+
return
29+
30+
check_requirements("redis")
31+
32+
self.redis_host = redis_config.get("host", "localhost")
33+
self.redis_port = redis_config.get("port", 6379)
34+
self.redis_dbid = redis_config.get("dbid")
35+
self.redis_password = redis_config.get("password")

synapse/python_dependencies.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
"sentry": ["sentry-sdk>=0.7.2"],
9999
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
100100
"jwt": ["pyjwt>=1.6.4"],
101+
"redis": ["txredisapi>=1.4.7"],
101102
}
102103

103104
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

synapse/replication/tcp/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
logger = logging.getLogger(__name__)
3131

3232

33-
class ReplicationClientFactory(ReconnectingClientFactory):
33+
class DirectTcpReplicationClientFactory(ReconnectingClientFactory):
3434
"""Factory for building connections to the master. Will reconnect if the
3535
connection is lost.
3636

synapse/replication/tcp/commands.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,3 +454,21 @@ class RemoteServerUpCommand(_SimpleCommand):
454454
ErrorCommand.NAME,
455455
RemoteServerUpCommand.NAME,
456456
)
457+
458+
459+
def parse_command_from_line(line: str) -> Command:
460+
"""Parses a command from a received line.
461+
462+
Line should already be stripped of whitespace and be checked if blank.
463+
"""
464+
465+
idx = line.index(" ")
466+
if idx >= 0:
467+
cmd_name = line[:idx]
468+
rest_of_line = line[idx + 1 :]
469+
else:
470+
cmd_name = line
471+
rest_of_line = ""
472+
473+
cmd_cls = COMMAND_MAP[cmd_name]
474+
return cmd_cls.from_line(rest_of_line)

synapse/replication/tcp/handler.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030

3131
from prometheus_client import Counter
3232

33+
from twisted.internet.protocol import ReconnectingClientFactory
34+
3335
from synapse.metrics import LaterGauge
34-
from synapse.replication.tcp.client import ReplicationClientFactory
36+
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
3537
from synapse.replication.tcp.commands import (
3638
ClearUserSyncsCommand,
3739
Command,
@@ -92,7 +94,7 @@ def __init__(self, hs):
9294
self._pending_batches = {} # type: Dict[str, List[Any]]
9395

9496
# The factory used to create connections.
95-
self._factory = None # type: Optional[ReplicationClientFactory]
97+
self._factory = None # type: Optional[ReconnectingClientFactory]
9698

9799
# The currently connected connections.
98100
self._connections = [] # type: List[AbstractConnection]
@@ -119,11 +121,45 @@ def start_replication(self, hs):
119121
"""Helper method to start a replication connection to the remote server
120122
using TCP.
121123
"""
122-
client_name = hs.config.worker_name
123-
self._factory = ReplicationClientFactory(hs, client_name, self)
124-
host = hs.config.worker_replication_host
125-
port = hs.config.worker_replication_port
126-
hs.get_reactor().connectTCP(host, port, self._factory)
124+
if hs.config.redis.redis_enabled:
125+
from synapse.replication.tcp.redis import (
126+
RedisDirectTcpReplicationClientFactory,
127+
)
128+
import txredisapi
129+
130+
logger.info(
131+
"Connecting to redis (host=%r port=%r DBID=%r)",
132+
hs.config.redis_host,
133+
hs.config.redis_port,
134+
hs.config.redis_dbid,
135+
)
136+
137+
# We need two connections to redis, one for the subscription stream and
138+
# one to send commands to (as you can't send further redis commands to a
139+
# connection after SUBSCRIBE is called).
140+
141+
# First create the connection for sending commands.
142+
outbound_redis_connection = txredisapi.lazyConnection(
143+
host=hs.config.redis_host,
144+
port=hs.config.redis_port,
145+
dbid=hs.config.redis_dbid,
146+
password=hs.config.redis.redis_password,
147+
reconnect=True,
148+
)
149+
150+
# Now create the factory/connection for the subscription stream.
151+
self._factory = RedisDirectTcpReplicationClientFactory(
152+
hs, outbound_redis_connection
153+
)
154+
hs.get_reactor().connectTCP(
155+
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
156+
)
157+
else:
158+
client_name = hs.config.worker_name
159+
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
160+
host = hs.config.worker_replication_host
161+
port = hs.config.worker_replication_port
162+
hs.get_reactor().connectTCP(host, port, self._factory)
127163

128164
async def on_REPLICATE(self, cmd: ReplicateCommand):
129165
# We only want to announce positions by the writer of the streams.

synapse/replication/tcp/protocol.py

Lines changed: 12 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
from synapse.metrics import LaterGauge
6464
from synapse.metrics.background_process_metrics import run_as_background_process
6565
from synapse.replication.tcp.commands import (
66-
COMMAND_MAP,
6766
VALID_CLIENT_COMMANDS,
6867
VALID_SERVER_COMMANDS,
6968
Command,
@@ -72,6 +71,7 @@
7271
PingCommand,
7372
ReplicateCommand,
7473
ServerCommand,
74+
parse_command_from_line,
7575
)
7676
from synapse.types import Collection
7777
from synapse.util import Clock
@@ -210,38 +210,24 @@ def lineReceived(self, line: bytes):
210210

211211
linestr = line.decode("utf-8")
212212

213-
# split at the first " ", handling one-word commands
214-
idx = linestr.index(" ")
215-
if idx >= 0:
216-
cmd_name = linestr[:idx]
217-
rest_of_line = linestr[idx + 1 :]
218-
else:
219-
cmd_name = linestr
220-
rest_of_line = ""
213+
try:
214+
cmd = parse_command_from_line(linestr)
215+
except Exception as e:
216+
logger.exception("[%s] failed to parse line: %r", self.id(), linestr)
217+
self.send_error("failed to parse line: %r (%r):" % (e, linestr))
218+
return
221219

222-
if cmd_name not in self.VALID_INBOUND_COMMANDS:
223-
logger.error("[%s] invalid command %s", self.id(), cmd_name)
224-
self.send_error("invalid command: %s", cmd_name)
220+
if cmd.NAME not in self.VALID_INBOUND_COMMANDS:
221+
logger.error("[%s] invalid command %s", self.id(), cmd.NAME)
222+
self.send_error("invalid command: %s", cmd.NAME)
225223
return
226224

227225
self.last_received_command = self.clock.time_msec()
228226

229-
self.inbound_commands_counter[cmd_name] = (
230-
self.inbound_commands_counter[cmd_name] + 1
227+
self.inbound_commands_counter[cmd.NAME] = (
228+
self.inbound_commands_counter[cmd.NAME] + 1
231229
)
232230

233-
cmd_cls = COMMAND_MAP[cmd_name]
234-
try:
235-
cmd = cmd_cls.from_line(rest_of_line)
236-
except Exception as e:
237-
logger.exception(
238-
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
239-
)
240-
self.send_error(
241-
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
242-
)
243-
return
244-
245231
# Now lets try and call on_<CMD_NAME> function
246232
run_as_background_process(
247233
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd

0 commit comments

Comments
 (0)