1414#
1515
1616
17+ import logging
1718from typing import (
1819 Any ,
1920 Callable ,
3031from synapse .logging import context
3132from synapse .types import ISynapseThreadlessReactor
3233from synapse .util import log_failure
34+ from synapse .util .stringutils import random_string_insecure_fast
3335
3436P = ParamSpec ("P" )
3537
3638
39+ logger = logging .getLogger (__name__ )
40+
41+
3742class Clock :
3843 """
3944 A Clock wraps a Twisted reactor and provides utilities on top of it.
@@ -64,7 +69,12 @@ def __init__(self, reactor: ISynapseThreadlessReactor, server_name: str) -> None
6469 """List of active looping calls"""
6570
6671 self ._call_id_to_delayed_call : dict [int , IDelayedCall ] = {}
67- """Mapping from unique call ID to delayed call"""
72+ """
73+ Mapping from unique call ID to delayed call.
74+
75+ For "performance", this only tracks a subset of delayed calls: those created
76+ with `call_later` with `call_later_cancel_on_shutdown=True`.
77+ """
6878
6979 self ._is_shutdown = False
7080 """Whether shutdown has been requested by the HomeServer"""
@@ -153,11 +163,20 @@ def _looping_call_common(
153163 ** kwargs : P .kwargs ,
154164 ) -> LoopingCall :
155165 """Common functionality for `looping_call` and `looping_call_now`"""
166+ instance_id = random_string_insecure_fast (5 )
156167
157168 if self ._is_shutdown :
158169 raise Exception ("Cannot start looping call. Clock has been shutdown" )
159170
171+ looping_call_context_string = "looping_call"
172+ if now :
173+ looping_call_context_string = "looping_call_now"
174+
160175 def wrapped_f (* args : P .args , ** kwargs : P .kwargs ) -> Deferred :
176+ logger .debug (
177+ "%s(%s): Executing callback" , looping_call_context_string , instance_id
178+ )
179+
161180 assert context .current_context () is context .SENTINEL_CONTEXT , (
162181 "Expected `looping_call` callback from the reactor to start with the sentinel logcontext "
163182 f"but saw { context .current_context ()} . In other words, another task shouldn't have "
@@ -201,6 +220,17 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred:
201220 d = call .start (msec / 1000.0 , now = now )
202221 d .addErrback (log_failure , "Looping call died" , consumeErrors = False )
203222 self ._looping_calls .append (call )
223+
224+ logger .debug (
225+ "%s(%s): Scheduled looping call every %sms later" ,
226+ looping_call_context_string ,
227+ instance_id ,
228+ msec ,
229+ # Find out who is scheduling the call which makes it easy to follow in the
230+ # logs.
231+ stack_info = True ,
232+ )
233+
204234 return call
205235
206236 def cancel_all_looping_calls (self , consumeErrors : bool = True ) -> None :
@@ -226,7 +256,7 @@ def call_later(
226256 * args : Any ,
227257 call_later_cancel_on_shutdown : bool = True ,
228258 ** kwargs : Any ,
229- ) -> IDelayedCall :
259+ ) -> "DelayedCallWrapper" :
230260 """Call something later
231261
232262 Note that the function will be called with generic `call_later` logcontext, so
@@ -245,74 +275,79 @@ def call_later(
245275 issue, we can just track all delayed calls.
246276 **kwargs: Key arguments to pass to function.
247277 """
278+ call_id = self ._delayed_call_id
279+ self ._delayed_call_id = self ._delayed_call_id + 1
248280
249281 if self ._is_shutdown :
250282 raise Exception ("Cannot start delayed call. Clock has been shutdown" )
251283
252- def create_wrapped_callback (
253- track_for_shutdown_cancellation : bool ,
254- ) -> Callable [P , None ]:
255- def wrapped_callback (* args : Any , ** kwargs : Any ) -> None :
256- assert context .current_context () is context .SENTINEL_CONTEXT , (
257- "Expected `call_later` callback from the reactor to start with the sentinel logcontext "
258- f"but saw { context .current_context ()} . In other words, another task shouldn't have "
259- "leaked their logcontext to us."
260- )
284+ def wrapped_callback (* args : Any , ** kwargs : Any ) -> None :
285+ logger .debug ("call_later(%s): Executing callback" , call_id )
261286
262- # Because this is a callback from the reactor, we will be using the
263- # `sentinel` log context at this point. We want the function to log with
264- # some logcontext as we want to know which server the logs came from.
265- #
266- # We use `PreserveLoggingContext` to prevent our new `call_later`
267- # logcontext from finishing as soon as we exit this function, in case `f`
268- # returns an awaitable/deferred which would continue running and may try to
269- # restore the `call_later` context when it's done (because it's trying to
270- # adhere to the Synapse logcontext rules.)
271- #
272- # This also ensures that we return to the `sentinel` context when we exit
273- # this function and yield control back to the reactor to avoid leaking the
274- # current logcontext to the reactor (which would then get picked up and
275- # associated with the next thing the reactor does)
276- try :
277- with context .PreserveLoggingContext (
278- context .LoggingContext (
279- name = "call_later" , server_name = self ._server_name
280- )
281- ):
282- # We use `run_in_background` to reset the logcontext after `f` (or the
283- # awaitable returned by `f`) completes to avoid leaking the current
284- # logcontext to the reactor
285- context .run_in_background (callback , * args , ** kwargs )
286- finally :
287- if track_for_shutdown_cancellation :
288- # We still want to remove the call from the tracking map. Even if
289- # the callback raises an exception.
290- self ._call_id_to_delayed_call .pop (call_id )
291-
292- return wrapped_callback
287+ assert context .current_context () is context .SENTINEL_CONTEXT , (
288+ "Expected `call_later` callback from the reactor to start with the sentinel logcontext "
289+ f"but saw { context .current_context ()} . In other words, another task shouldn't have "
290+ "leaked their logcontext to us."
291+ )
293292
293+ # Because this is a callback from the reactor, we will be using the
294+ # `sentinel` log context at this point. We want the function to log with
295+ # some logcontext as we want to know which server the logs came from.
296+ #
297+ # We use `PreserveLoggingContext` to prevent our new `call_later`
298+ # logcontext from finishing as soon as we exit this function, in case `f`
299+ # returns an awaitable/deferred which would continue running and may try to
300+ # restore the `call_later` context when it's done (because it's trying to
301+ # adhere to the Synapse logcontext rules.)
302+ #
303+ # This also ensures that we return to the `sentinel` context when we exit
304+ # this function and yield control back to the reactor to avoid leaking the
305+ # current logcontext to the reactor (which would then get picked up and
306+ # associated with the next thing the reactor does)
307+ try :
308+ with context .PreserveLoggingContext (
309+ context .LoggingContext (
310+ name = "call_later" , server_name = self ._server_name
311+ )
312+ ):
313+ # We use `run_in_background` to reset the logcontext after `f` (or the
314+ # awaitable returned by `f`) completes to avoid leaking the current
315+ # logcontext to the reactor
316+ context .run_in_background (callback , * args , ** kwargs )
317+ finally :
318+ if call_later_cancel_on_shutdown :
319+ # We still want to remove the call from the tracking map. Even if
320+ # the callback raises an exception.
321+ self ._call_id_to_delayed_call .pop (call_id )
322+
323+ # We can ignore the lint here since this class is the one location callLater should
324+ # be called.
325+ call = self ._reactor .callLater (delay , wrapped_callback , * args , ** kwargs ) # type: ignore[call-later-not-tracked]
326+
327+ logger .debug (
328+ "call_later(%s): Scheduled call for %ss later (tracked for shutdown: %s)" ,
329+ call_id ,
330+ delay ,
331+ call_later_cancel_on_shutdown ,
332+ # Find out who is scheduling the call which makes it easy to follow in the
333+ # logs.
334+ stack_info = True ,
335+ )
336+
337+ wrapped_call = DelayedCallWrapper (call , call_id , self )
294338 if call_later_cancel_on_shutdown :
295- call_id = self ._delayed_call_id
296- self ._delayed_call_id = self ._delayed_call_id + 1
297-
298- # We can ignore the lint here since this class is the one location callLater
299- # should be called.
300- call = self ._reactor .callLater (
301- delay , create_wrapped_callback (True ), * args , ** kwargs
302- ) # type: ignore[call-later-not-tracked]
303- call = DelayedCallWrapper (call , call_id , self )
304- self ._call_id_to_delayed_call [call_id ] = call
305- return call
306- else :
307- # We can ignore the lint here since this class is the one location callLater should
308- # be called.
309- return self ._reactor .callLater (
310- delay , create_wrapped_callback (False ), * args , ** kwargs
311- ) # type: ignore[call-later-not-tracked]
339+ self ._call_id_to_delayed_call [call_id ] = wrapped_call
340+
341+ return wrapped_call
312342
313- def cancel_call_later (self , timer : IDelayedCall , ignore_errs : bool = False ) -> None :
343+ def cancel_call_later (
344+ self , wrapped_call : "DelayedCallWrapper" , ignore_errs : bool = False
345+ ) -> None :
314346 try :
315- timer .cancel ()
347+ logger .debug (
348+ "cancel_call_later: cancelling scheduled call %s" , wrapped_call .call_id
349+ )
350+ wrapped_call .delayed_call .cancel ()
316351 except Exception :
317352 if not ignore_errs :
318353 raise
@@ -327,8 +362,11 @@ def cancel_all_delayed_calls(self, ignore_errs: bool = True) -> None:
327362 """
328363 # We make a copy here since calling `cancel()` on a delayed_call
329364 # will result in the call removing itself from the map mid-iteration.
330- for call in list (self ._call_id_to_delayed_call .values ()):
365+ for call_id , call in list (self ._call_id_to_delayed_call .items ()):
331366 try :
367+ logger .debug (
368+ "cancel_all_delayed_calls: cancelling scheduled call %s" , call_id
369+ )
332370 call .cancel ()
333371 except Exception :
334372 if not ignore_errs :
@@ -352,8 +390,11 @@ def call_when_running(
352390 *args: Postional arguments to pass to function.
353391 **kwargs: Key arguments to pass to function.
354392 """
393+ instance_id = random_string_insecure_fast (5 )
355394
356395 def wrapped_callback (* args : Any , ** kwargs : Any ) -> None :
396+ logger .debug ("call_when_running(%s): Executing callback" , instance_id )
397+
357398 # Since this callback can be invoked immediately if the reactor is already
358399 # running, we can't always assume that we're running in the sentinel
359400 # logcontext (i.e. we can't assert that we're in the sentinel context like
@@ -392,6 +433,14 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None:
392433 # callWhenRunning should be called.
393434 self ._reactor .callWhenRunning (wrapped_callback , * args , ** kwargs ) # type: ignore[prefer-synapse-clock-call-when-running]
394435
436+ logger .debug (
437+ "call_when_running(%s): Scheduled call" ,
438+ instance_id ,
439+ # Find out who is scheduling the call which makes it easy to follow in the
440+ # logs.
441+ stack_info = True ,
442+ )
443+
395444 def add_system_event_trigger (
396445 self ,
397446 phase : str ,
@@ -417,8 +466,16 @@ def add_system_event_trigger(
417466 Returns:
418467 an ID that can be used to remove this call with `reactor.removeSystemEventTrigger`.
419468 """
469+ instance_id = random_string_insecure_fast (5 )
420470
421471 def wrapped_callback (* args : Any , ** kwargs : Any ) -> None :
472+ logger .debug (
473+ "add_system_event_trigger(%s): Executing %s %s callback" ,
474+ instance_id ,
475+ phase ,
476+ event_type ,
477+ )
478+
422479 assert context .current_context () is context .SENTINEL_CONTEXT , (
423480 "Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext "
424481 f"but saw { context .current_context ()} . In other words, another task shouldn't have "
@@ -449,6 +506,16 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None:
449506 # logcontext to the reactor
450507 context .run_in_background (callback , * args , ** kwargs )
451508
509+ logger .debug (
510+ "add_system_event_trigger(%s) for %s %s" ,
511+ instance_id ,
512+ phase ,
513+ event_type ,
514+ # Find out who is scheduling the call which makes it easy to follow in the
515+ # logs.
516+ stack_info = True ,
517+ )
518+
452519 # We can ignore the lint here since this class is the one location
453520 # `addSystemEventTrigger` should be called.
454521 return self ._reactor .addSystemEventTrigger (
0 commit comments