Skip to content

Commit c283db8

Browse files
committed
More shutdown cleanup
1 parent 7a9725c commit c283db8

82 files changed

Lines changed: 329 additions & 185 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

synapse/api/auth_blocking.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636

3737
class AuthBlocking:
3838
def __init__(self, hs: "HomeServer"):
39-
self.store = hs.get_datastores().main
39+
self.store = weakref.proxy(hs.get_datastores().main)
4040

4141
self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
4242
self._hs_disabled = hs.config.server.hs_disabled

synapse/api/ratelimiting.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#
2222

2323
from typing import TYPE_CHECKING, Dict, Hashable, Optional, Tuple
24+
import weakref
2425

2526
from synapse.api.errors import LimitExceededError
2627
from synapse.config.ratelimiting import RatelimitSettings
@@ -33,6 +34,7 @@
3334
from synapse.module_api.callbacks.ratelimit_callbacks import (
3435
RatelimitModuleApiCallbacks,
3536
)
37+
from synapse.server import HomeServer
3638

3739

3840
class Ratelimiter:
@@ -75,6 +77,7 @@ class Ratelimiter:
7577

7678
def __init__(
7779
self,
80+
hs: "HomeServer",
7881
store: DataStore,
7982
clock: Clock,
8083
cfg: RatelimitSettings,
@@ -83,7 +86,7 @@ def __init__(
8386
self.clock = clock
8487
self.rate_hz = cfg.per_second
8588
self.burst_count = cfg.burst_count
86-
self.store = store
89+
self.store = weakref.proxy(store)
8790
self._limiter_name = cfg.key
8891
self._ratelimit_callbacks = ratelimit_callbacks
8992

@@ -94,7 +97,7 @@ def __init__(
9497
# * The rate_hz (leak rate) of this particular bucket.
9598
self.actions: Dict[Hashable, Tuple[float, float, float]] = {}
9699

97-
self.clock.looping_call(self._prune_message_counts, 60 * 1000)
100+
hs.register_looping_call(self.clock.looping_call(self._prune_message_counts, 60 * 1000))
98101

99102
def _get_key(
100103
self, requester: Optional[Requester], key: Optional[Hashable]
@@ -348,6 +351,7 @@ async def ratelimit(
348351
class RequestRatelimiter:
349352
def __init__(
350353
self,
354+
hs: "HomeServer",
351355
store: DataStore,
352356
clock: Clock,
353357
rc_message: RatelimitSettings,
@@ -358,6 +362,7 @@ def __init__(
358362

359363
# The rate_hz and burst_count are overridden on a per-user basis
360364
self.request_ratelimiter = Ratelimiter(
365+
hs=hs,
361366
store=self.store,
362367
clock=self.clock,
363368
cfg=RatelimitSettings(key=rc_message.key, per_second=0, burst_count=0),
@@ -368,6 +373,7 @@ def __init__(
368373
# by the presence of rate limits in the config
369374
if rc_admin_redaction:
370375
self.admin_redaction_ratelimiter: Optional[Ratelimiter] = Ratelimiter(
376+
hs=hs,
371377
store=self.store,
372378
clock=self.clock,
373379
cfg=rc_admin_redaction,

synapse/app/homeserver.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
from synapse.rest.well_known import well_known_resource
7070
from synapse.server import HomeServer
7171
from synapse.storage import DataStore
72+
from synapse.types import ISynapseReactor
7273
from synapse.util.check_dependencies import VERSION, check_requirements
7374
from synapse.util.httpresourcetree import create_resource_tree
7475
from synapse.util.module_loader import load_module
@@ -83,6 +84,14 @@ def gz_wrap(r: Resource) -> Resource:
8384
class SynapseHomeServer(HomeServer):
8485
DATASTORE_CLASS = DataStore
8586

87+
def shutdown(self) -> None:
88+
super().shutdown()
89+
for listener in self._listening_services:
90+
listener.loseConnection()
91+
self._listening_services.clear()
92+
93+
self._reactor.stop()
94+
8695
def _listener_http(
8796
self,
8897
config: HomeServerConfig,
@@ -308,7 +317,8 @@ def start_listening(self) -> None:
308317
logger.warning("Unrecognized listener type: %s", listener.type)
309318

310319

311-
def setup(config_options: List[str]) -> SynapseHomeServer:
320+
from typing import Optional
321+
def setup(config_options: List[str], reactor: Optional[ISynapseReactor]=None) -> SynapseHomeServer:
312322
"""
313323
Args:
314324
config_options_options: The options passed to Synapse. Usually `sys.argv[1:]`.
@@ -365,6 +375,7 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
365375
config.server.server_name,
366376
config=config,
367377
version_string=f"Synapse/{VERSION}",
378+
reactor=reactor,
368379
)
369380

370381
synapse.config.logger.setup_logging(hs, config, use_worker_options=False)

synapse/app/phone_stats_home.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,12 @@ def performance_stats_init() -> None:
206206
)
207207

208208
# monthly active user limiting functionality
209-
clock.looping_call(
209+
hs.register_looping_call(clock.looping_call(
210210
hs.get_datastores().main.reap_monthly_active_users,
211211
ONE_HOUR_SECONDS * MILLISECONDS_PER_SECOND,
212-
)
213-
hs.get_datastores().main.reap_monthly_active_users()
212+
))
213+
# TODO: (devon) how does this hold onto a DB pool reference?
214+
#hs.register_background_process(hs.get_datastores().main.reap_monthly_active_users())
214215

215216
@wrap_as_background_process("generate_monthly_active_users")
216217
async def generate_monthly_active_users() -> None:
@@ -234,7 +235,7 @@ async def generate_monthly_active_users() -> None:
234235

235236
if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
236237
generate_monthly_active_users()
237-
clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
238+
hs.register_looping_call(clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000))
238239
# End of monthly active user settings
239240

240241
if hs.config.metrics.report_stats:

synapse/appservice/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ def force_retry(self) -> None:
523523
if self.scheduled_recovery:
524524
self.clock.cancel_call_later(self.scheduled_recovery)
525525
# Run a retry, which will resechedule a recovery if it fails.
526-
run_as_background_process(
526+
deferred = run_as_background_process(
527527
"retry",
528528
self.retry,
529529
)

synapse/federation/federation_client.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def __init__(self, hs: "HomeServer"):
133133
super().__init__(hs)
134134

135135
self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
136-
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
136+
hs.register_looping_call(self._clock.looping_call(self._clear_tried_cache, 60 * 1000))
137137
self.state = hs.get_state_handler()
138138
self.transport_layer = hs.get_federation_transport_client()
139139

@@ -143,6 +143,7 @@ def __init__(self, hs: "HomeServer"):
143143
# Cache mapping `event_id` to a tuple of the event itself and the `pull_origin`
144144
# (which server we pulled the event from)
145145
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
146+
hs=hs,
146147
cache_name="get_pdu_cache",
147148
server_name=self.server_name,
148149
clock=self._clock,
@@ -162,6 +163,7 @@ def __init__(self, hs: "HomeServer"):
162163
Tuple[str, bool],
163164
Tuple[JsonDict, Sequence[JsonDict], Sequence[JsonDict], Sequence[str]],
164165
] = ExpiringCache(
166+
hs=hs,
165167
cache_name="get_room_hierarchy_cache",
166168
server_name=self.server_name,
167169
clock=self._clock,

synapse/federation/federation_server.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ async def on_incoming_transaction(
300300
# Start a periodic check for old staged events. This is to handle
301301
# the case where locks time out, e.g. if another process gets killed
302302
# without dropping its locks.
303-
self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
303+
self.hs.register_looping_call(self._clock.looping_call(self._handle_old_staged_events, 60 * 1000))
304304

305305
# keep this as early as possible to make the calculated origin ts as
306306
# accurate as possible.

synapse/federation/persistence.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import logging
3030
from typing import Optional, Tuple
31+
import weakref
3132

3233
from synapse.federation.units import Transaction
3334
from synapse.storage.databases.main import DataStore
@@ -40,7 +41,7 @@ class TransactionActions:
4041
"""Defines persistence actions that relate to handling Transactions."""
4142

4243
def __init__(self, datastore: DataStore):
43-
self.store = datastore
44+
self.store = weakref.proxy(datastore)
4445

4546
async def have_responded(
4647
self, origin: str, transaction: Transaction

synapse/federation/send_queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,12 @@ def __init__(self, hs: "HomeServer"):
112112
# lambda binds to the queue rather than to the name of the queue which
113113
# changes. ARGH.
114114
def register(name: str, queue: Sized) -> None:
115-
LaterGauge(
115+
hs.register_later_gauge(LaterGauge(
116116
"synapse_federation_send_queue_%s_size" % (queue_name,),
117117
"",
118118
[],
119119
lambda: len(queue),
120-
)
120+
))
121121

122122
for queue_name in [
123123
"presence_map",
@@ -129,7 +129,7 @@ def register(name: str, queue: Sized) -> None:
129129
]:
130130
register(queue_name, getattr(self, queue_name))
131131

132-
self.clock.looping_call(self._clear_queue, 30 * 1000)
132+
hs.register_looping_call(self.clock.looping_call(self._clear_queue, 30 * 1000))
133133

134134
def _next_pos(self) -> int:
135135
pos = self.pos

synapse/federation/sender/__init__.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ def __init__(self, hs: "HomeServer"):
389389
# map from destination to PerDestinationQueue
390390
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
391391

392-
LaterGauge(
392+
hs.register_later_gauge(LaterGauge(
393393
"synapse_federation_transaction_queue_pending_destinations",
394394
"",
395395
[],
@@ -398,24 +398,24 @@ def __init__(self, hs: "HomeServer"):
398398
for d in self._per_destination_queues.values()
399399
if d.transmission_loop_running
400400
),
401-
)
401+
))
402402

403-
LaterGauge(
403+
hs.register_later_gauge(LaterGauge(
404404
"synapse_federation_transaction_queue_pending_pdus",
405405
"",
406406
[],
407407
lambda: sum(
408408
d.pending_pdu_count() for d in self._per_destination_queues.values()
409409
),
410-
)
411-
LaterGauge(
410+
))
411+
hs.register_later_gauge(LaterGauge(
412412
"synapse_federation_transaction_queue_pending_edus",
413413
"",
414414
[],
415415
lambda: sum(
416416
d.pending_edu_count() for d in self._per_destination_queues.values()
417417
),
418-
)
418+
))
419419

420420
self._is_processing = False
421421
self._last_poked_id = -1
@@ -426,16 +426,16 @@ def __init__(self, hs: "HomeServer"):
426426
1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
427427
)
428428
self._destination_wakeup_queue = _DestinationWakeupQueue(
429-
self, self.clock, max_delay_s=rr_txn_interval_per_room_s
429+
weakref.proxy(self), self.clock, max_delay_s=rr_txn_interval_per_room_s
430430
)
431431

432432
# Regularly wake up destinations that have outstanding PDUs to be caught up
433-
self.clock.looping_call_now(
433+
hs.register_looping_call(self.clock.looping_call_now(
434434
run_as_background_process,
435435
WAKEUP_RETRY_PERIOD_SEC * 1000.0,
436436
"wake_destinations_needing_catchup",
437437
self._wake_destinations_needing_catchup,
438-
)
438+
))
439439

440440
def _get_per_destination_queue(
441441
self, destination: str

0 commit comments

Comments
 (0)