Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 5f77ba3

Browse files
committed
Add redis support
1 parent 2dd2754 commit 5f77ba3

7 files changed

Lines changed: 234 additions & 4 deletions

File tree

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,6 @@ ignore_missing_imports = True
7575

7676
[mypy-jwt.*]
7777
ignore_missing_imports = True
78+
79+
[mypy-txredisapi]
80+
ignore_missing_imports = True

synapse/app/generic_worker.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,10 +599,19 @@ def start_listening(self, listeners):
599599
else:
600600
logger.warning("Unrecognized listener type: %s", listener["type"])
601601

602-
factory = ReplicationClientFactory(self, self.config.worker_name)
603-
host = self.config.worker_replication_host
604-
port = self.config.worker_replication_port
605-
self.get_reactor().connectTCP(host, port, factory)
602+
if self.config.redis.redis_enabled:
603+
from synapse.replication.tcp.redis import RedisFactory
604+
605+
logger.info("Connecting to redis.")
606+
factory = RedisFactory(self)
607+
self.get_reactor().connectTCP(
608+
self.config.redis.redis_host, self.config.redis.redis_port, factory
609+
)
610+
else:
611+
factory = ReplicationClientFactory(self, self.config.worker_name)
612+
host = self.config.worker_replication_host
613+
port = self.config.worker_replication_port
614+
self.get_reactor().connectTCP(host, port, factory)
606615

607616
def remove_pusher(self, app_id, push_key, user_id):
608617
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)

synapse/app/homeserver.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,15 @@ def _configure_named_resource(self, name, compress=False):
263263
def start_listening(self, listeners):
264264
config = self.get_config()
265265

266+
if config.redis_enabled:
267+
from synapse.replication.tcp.redis import RedisFactory
268+
269+
logger.info("Connecting to redis.")
270+
factory = RedisFactory(self)
271+
self.get_reactor().connectTCP(
272+
self.config.redis.redis_host, self.config.redis.redis_port, factory
273+
)
274+
266275
for listener in listeners:
267276
if listener["type"] == "http":
268277
self._listening_services.extend(self._listener_http(config, listener))
@@ -282,6 +291,7 @@ def start_listening(self, listeners):
282291
)
283292
for s in services:
284293
reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
294+
285295
elif listener["type"] == "metrics":
286296
if not self.get_config().enable_metrics:
287297
logger.warning(

synapse/config/homeserver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from .password_auth_providers import PasswordAuthProviderConfig
3232
from .push import PushConfig
3333
from .ratelimiting import RatelimitConfig
34+
from .redis import RedisConfig
3435
from .registration import RegistrationConfig
3536
from .repository import ContentRepositoryConfig
3637
from .room_directory import RoomDirectoryConfig
@@ -82,4 +83,5 @@ class HomeServerConfig(RootConfig):
8283
RoomDirectoryConfig,
8384
ThirdPartyRulesConfig,
8485
TracerConfig,
86+
RedisConfig,
8587
]

synapse/config/redis.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2020 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
from synapse.config._base import Config, ConfigError
17+
18+
try:
19+
import txredisapi
20+
except ImportError:
21+
txredisapi = None
22+
23+
24+
MISSING_REDIS = """Missing 'txredisapi' library. This is required for redis support.
25+
26+
Install by running:
27+
pip install txredisapi
28+
"""
29+
30+
31+
class RedisConfig(Config):
32+
section = "redis"
33+
34+
def read_config(self, config, **kwargs):
35+
redis_config = config.get("redis", {})
36+
self.redis_enabled = redis_config.get("enabled", False)
37+
38+
if not self.redis_enabled:
39+
return
40+
41+
if txredisapi is None:
42+
raise ConfigError(MISSING_REDIS)
43+
44+
self.redis_host = redis_config.get("host", "localhost")
45+
self.redis_port = redis_config.get("port", 6379)
46+
self.redis_dbid = redis_config.get("dbid")
47+
self.redis_password = redis_config.get("password")

synapse/python_dependencies.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
"sentry": ["sentry-sdk>=0.7.2"],
9999
"opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
100100
"jwt": ["pyjwt>=1.6.4"],
101+
"redis": ["txredisapi>=1.4.7"],
101102
}
102103

103104
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

synapse/replication/tcp/redis.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2020 The Matrix.org Foundation C.I.C.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import logging
17+
18+
import txredisapi
19+
20+
from synapse.logging.context import PreserveLoggingContext
21+
from synapse.metrics.background_process_metrics import run_as_background_process
22+
from synapse.replication.tcp.commands import (
23+
COMMAND_MAP,
24+
Command,
25+
RdataCommand,
26+
ReplicateCommand,
27+
)
28+
from synapse.util.stringutils import random_string
29+
30+
logger = logging.getLogger(__name__)
31+
32+
33+
class RedisSubscriber(txredisapi.SubscriberProtocol):
34+
"""Connection to redis subscribed to replication stream.
35+
"""
36+
37+
def connectionMade(self):
38+
logger.info("Connected to redis instance")
39+
self.subscribe(self.stream_name)
40+
self.send_command(ReplicateCommand())
41+
42+
self.handler.new_connection(self)
43+
44+
def messageReceived(self, pattern: str, channel: str, message: str):
45+
"""Received a message from redis.
46+
"""
47+
48+
if message.strip() == "":
49+
# Ignore blank lines
50+
return
51+
52+
line = message
53+
cmd_name, rest_of_line = line.split(" ", 1)
54+
55+
cmd_cls = COMMAND_MAP[cmd_name]
56+
try:
57+
cmd = cmd_cls.from_line(rest_of_line)
58+
except Exception as e:
59+
logger.exception(
60+
"[%s] failed to parse line %r: %r", self.id(), cmd_name, rest_of_line
61+
)
62+
self.send_error(
63+
"failed to parse line for %r: %r (%r):" % (cmd_name, e, rest_of_line)
64+
)
65+
return
66+
67+
# Now lets try and call on_<CMD_NAME> function
68+
run_as_background_process(
69+
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
70+
)
71+
72+
async def handle_command(self, cmd: Command):
73+
"""Handle a command we have received over the replication stream.
74+
75+
By default delegates to on_<COMMAND>, which should return an awaitable.
76+
77+
Args:
78+
cmd: received command
79+
"""
80+
handled = False
81+
82+
# First call any command handlers on this instance. These are for redis
83+
# specific handling.
84+
cmd_func = getattr(self, "on_%s" % (cmd.NAME,), None)
85+
if cmd_func:
86+
await cmd_func(cmd)
87+
handled = True
88+
89+
# Then call out to the handler.
90+
cmd_func = getattr(self.handler, "on_%s" % (cmd.NAME,), None)
91+
if cmd_func:
92+
await cmd_func(cmd)
93+
handled = True
94+
95+
if not handled:
96+
logger.warning("Unhandled command: %r", cmd)
97+
98+
def connectionLost(self, reason):
99+
logger.info("Lost connection to redis instance")
100+
self.handler.lost_connection(self)
101+
102+
def send_command(self, cmd):
103+
"""Send a command if connection has been established.
104+
105+
Args:
106+
cmd (Command)
107+
"""
108+
string = "%s %s" % (cmd.NAME, cmd.to_line())
109+
if "\n" in string:
110+
raise Exception("Unexpected newline in command: %r", string)
111+
112+
encoded_string = string.encode("utf-8")
113+
114+
async def _send():
115+
with PreserveLoggingContext():
116+
await self.redis_connection.publish(self.stream_name, encoded_string)
117+
118+
run_as_background_process("send-cmd", _send)
119+
120+
def stream_update(self, stream_name, token, data):
121+
"""Called when a new update is available to stream to clients.
122+
123+
We need to check if the client is interested in the stream or not
124+
"""
125+
self.send_command(RdataCommand(stream_name, token, data))
126+
127+
128+
class RedisFactory(txredisapi.SubscriberFactory):
129+
130+
maxDelay = 5
131+
continueTrying = True
132+
protocol = RedisSubscriber
133+
134+
def __init__(self, hs):
135+
super(RedisFactory, self).__init__()
136+
137+
self.password = hs.config.redis.redis_password
138+
139+
self.handler = hs.get_tcp_replication()
140+
self.stream_name = hs.hostname
141+
142+
self.redis_connection = txredisapi.lazyConnection(
143+
host=hs.config.redis_host,
144+
port=hs.config.redis_port,
145+
dbid=hs.config.redis_dbid,
146+
password=hs.config.redis.redis_password,
147+
reconnect=True,
148+
)
149+
150+
self.conn_id = random_string(5)
151+
152+
def buildProtocol(self, addr):
153+
p = super(RedisFactory, self).buildProtocol(addr)
154+
p.handler = self.handler
155+
p.redis_connection = self.redis_connection
156+
p.conn_id = self.conn_id
157+
p.stream_name = self.stream_name
158+
return p

0 commit comments

Comments
 (0)