From 6170762fb6aac461fc2b8ddad18ea6d07ef9822e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:08:59 -0500 Subject: [PATCH 01/27] Add logcontext to looping calls --- synapse/storage/database.py | 2 +- synapse/util/__init__.py | 44 ++++++++++++++++++++++++++++++++----- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index cfec36e0fa1..020ef0893c1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -708,7 +708,7 @@ def loop() -> None: "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) - self._clock.looping_call(loop, 10000) + self._clock.looping_call(description="db.profiling_loop", msec=10000) def new_transaction( self, diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 36129c3a67b..bf09395941f 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -135,9 +135,11 @@ def time_msec(self) -> int: def looping_call( self, + *, + description: str, + server_name: str, f: Callable[P, object], msec: float, - *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. @@ -153,18 +155,30 @@ def looping_call( other than trivial, you probably want to wrap it in run_as_background_process. Args: + description: Description the of the task, for logging purposes. + server_name: The homeserver name that this looping task is being run for + (this should be `hs.hostname`). f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, False, *args, **kwargs) + return self._looping_call_common( + description=description, + server_name=server_name, + f=f, + msec=msec, + now=False, + **kwargs, + ) def looping_call_now( self, + *, + description: str, + server_name: str, f: Callable[P, object], msec: float, - *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function immediately, and then repeatedly thereafter. @@ -176,23 +190,41 @@ def looping_call_now( you probably want to wrap it in `run_as_background_process`. Args: + description: Description the of the task, for logging purposes. + server_name: The homeserver name that this looping task is being run for + (this should be `hs.hostname`). f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common(f, msec, True, *args, **kwargs) + return self._looping_call_common( + description=description, + server_name=server_name, + f=f, + msec=msec, + now=True, + **kwargs, + ) def _looping_call_common( self, + *, + description: str, + server_name: str, f: Callable[P, object], msec: float, now: bool, - *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - call = task.LoopingCall(f, *args, **kwargs) + + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: + with context.PreserveLoggingContext(): + with context.LoggingContext(description): + return f(*args, **kwargs) + + call = task.LoopingCall(wrapped_f, **kwargs) call.clock = self._reactor d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) From 5d2e4c41de67905f1d533c1c5d625ae24d5a3f9a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:12:20 -0500 Subject: [PATCH 02/27] Remove `server_name` arg We will need this in https://github.com/element-hq/synapse/pull/18868 but not for now --- synapse/util/__init__.py | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index bf09395941f..1d2de0b04f7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -135,11 +135,10 @@ def time_msec(self) -> int: def looping_call( self, - *, description: str, - server_name: str, f: Callable[P, object], msec: float, + *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function repeatedly. @@ -156,29 +155,26 @@ def looping_call( Args: description: Description the of the task, for logging purposes. - server_name: The homeserver name that this looping task is being run for - (this should be `hs.hostname`). f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ return self._looping_call_common( - description=description, - server_name=server_name, - f=f, - msec=msec, - now=False, + description, + f, + msec, + False, + *args, **kwargs, ) def looping_call_now( self, - *, description: str, - server_name: str, f: Callable[P, object], msec: float, + *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Call a function immediately, and then repeatedly thereafter. @@ -191,30 +187,27 @@ def looping_call_now( Args: description: Description the of the task, for logging purposes. - server_name: The homeserver name that this looping task is being run for - (this should be `hs.hostname`). f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ return self._looping_call_common( - description=description, - server_name=server_name, - f=f, - msec=msec, - now=True, + description, + f, + msec, + True, + *args, **kwargs, ) def _looping_call_common( self, - *, description: str, - server_name: str, f: Callable[P, object], msec: float, now: bool, + *args: P.args, **kwargs: P.kwargs, ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" @@ -224,7 +217,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: with context.LoggingContext(description): return f(*args, **kwargs) - call = task.LoopingCall(wrapped_f, **kwargs) + call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) From 414b178d53e27a3e99362fe26526d17c76584e82 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:30:18 -0500 Subject: [PATCH 03/27] No need for a description --- synapse/util/__init__.py | 37 ++++++++++++------------------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 1d2de0b04f7..ab801e45fa2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -135,7 +135,6 @@ def time_msec(self) -> int: def looping_call( self, - description: str, f: Callable[P, object], msec: float, *args: P.args, @@ -151,27 +150,18 @@ def looping_call( `twisted.internet.task.LoopingCall`. Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + other than trivial, you probably want to wrap it in `run_as_background_process`. Args: - description: Description the of the task, for logging purposes. f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common( - description, - f, - msec, - False, - *args, - **kwargs, - ) + return self._looping_call_common(f, msec, False, *args, **kwargs) def looping_call_now( self, - description: str, f: Callable[P, object], msec: float, *args: P.args, @@ -186,24 +176,15 @@ def looping_call_now( you probably want to wrap it in `run_as_background_process`. Args: - description: Description the of the task, for logging purposes. f: The function to call repeatedly. msec: How long to wait between calls in milliseconds. *args: Positional arguments to pass to function. **kwargs: Key arguments to pass to function. """ - return self._looping_call_common( - description, - f, - msec, - True, - *args, - **kwargs, - ) + return self._looping_call_common(f, msec, True, *args, **kwargs) def _looping_call_common( self, - description: str, f: Callable[P, object], msec: float, now: bool, @@ -213,9 +194,15 @@ def _looping_call_common( """Common functionality for `looping_call` and `looping_call_now`""" def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: - with context.PreserveLoggingContext(): - with context.LoggingContext(description): - return f(*args, **kwargs) + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want to log with some logcontext + # as we want to know which server the logs came from. This also ensures that + # we return to the `sentinel` context when we exit this function and yield + # control back to the reactor to avoid leaking the current logcontext to the + # reactor (which would then get picked up and associated with the next thing + # the reactor does) + with context.LoggingContext("looping_call"): + return f(*args, **kwargs) call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor From 788cd1965d42587dcaccd4b83c5c1458d92f9a9c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:31:18 -0500 Subject: [PATCH 04/27] Fix db loop --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 020ef0893c1..3ec7b40f88d 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -708,7 +708,7 @@ def loop() -> None: "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) - self._clock.looping_call(description="db.profiling_loop", msec=10000) + self._clock.looping_call(loop, msec=10000) def new_transaction( self, From 77b32282d5017da93cc2093a08de8cde24f31d15 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:31:36 -0500 Subject: [PATCH 05/27] Revert keyword arg changes --- synapse/storage/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 3ec7b40f88d..cfec36e0fa1 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -708,7 +708,7 @@ def loop() -> None: "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) - self._clock.looping_call(loop, msec=10000) + self._clock.looping_call(loop, 10000) def new_transaction( self, From 91f7bb353bd742d24464bd398bfdf01e849c6bcd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:32:21 -0500 Subject: [PATCH 06/27] Multiple paragraphs --- synapse/util/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index ab801e45fa2..df4a49bef38 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -196,11 +196,12 @@ def _looping_call_common( def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: # Because this is a callback from the reactor, we will be using the # `sentinel` log context at this point. We want to log with some logcontext - # as we want to know which server the logs came from. This also ensures that - # we return to the `sentinel` context when we exit this function and yield - # control back to the reactor to avoid leaking the current logcontext to the - # reactor (which would then get picked up and associated with the next thing - # the reactor does) + # as we want to know which server the logs came from. + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) with context.LoggingContext("looping_call"): return f(*args, **kwargs) From 2e88eb35d2392da5820bb3c46c1e9fbb0089eeb2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:38:21 -0500 Subject: [PATCH 07/27] Update docstring --- synapse/util/__init__.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index df4a49bef38..59717ca5f6e 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -149,8 +149,9 @@ def looping_call( this functionality thanks to this function being a thin wrapper around `twisted.internet.task.LoopingCall`. - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in `run_as_background_process`. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. @@ -172,8 +173,9 @@ def looping_call_now( As with `looping_call`: subsequent calls are not scheduled until after the the Awaitable returned by a previous call has finished. - Also as with `looping_call`: the function is called with no logcontext and - you probably want to wrap it in `run_as_background_process`. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. From fbf59465d4f0a58a4a6440d608225a73d253fba5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 16:39:03 -0500 Subject: [PATCH 08/27] Apply the same treatment to `call_later` --- synapse/util/__init__.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 59717ca5f6e..e23183aa213 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -218,8 +218,9 @@ def call_later( ) -> IDelayedCall: """Call something later - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: delay: How long to wait in seconds. @@ -229,11 +230,18 @@ def call_later( """ def wrapped_callback(*args: Any, **kwargs: Any) -> None: - with context.PreserveLoggingContext(): + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want to log with some logcontext + # as we want to know which server the logs came from. + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.LoggingContext("call_later"): callback(*args, **kwargs) - with context.PreserveLoggingContext(): - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: From 3809f3f4cd619787e7e09b39fa1a315509581725 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 17:19:41 -0500 Subject: [PATCH 09/27] Add comments for why we `PreserveLoggingContext()` --- synapse/util/__init__.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index e23183aa213..81e40e6eb6a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -121,6 +121,8 @@ class Clock: async def sleep(self, seconds: float) -> None: d: defer.Deferred[float] = defer.Deferred() + # Start task in the `sentinel` logcontext, to avoid leaking the current context + # into the reactor once it finishes. with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) await d @@ -207,11 +209,15 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: with context.LoggingContext("looping_call"): return f(*args, **kwargs) - call = task.LoopingCall(wrapped_f, *args, **kwargs) - call.clock = self._reactor - d = call.start(msec / 1000.0, now=now) - d.addErrback(log_failure, "Looping call died", consumeErrors=False) - return call + # Start task in the `sentinel` logcontext, to avoid leaking the current context + # into the reactor if `d` ever finishes (perhaps someone cancels the looping + # call) + with context.PreserveLoggingContext(): + call = task.LoopingCall(wrapped_f, *args, **kwargs) + call.clock = self._reactor + d = call.start(msec / 1000.0, now=now) + d.addErrback(log_failure, "Looping call died", consumeErrors=False) + return call def call_later( self, delay: float, callback: Callable, *args: Any, **kwargs: Any @@ -241,7 +247,10 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: with context.LoggingContext("call_later"): callback(*args, **kwargs) - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + # Start task in the `sentinel` logcontext, to avoid leaking the current context + # into the reactor once it finishes. + with context.PreserveLoggingContext(): + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: From 442dbc0d622de64cc8f2f248f0730051641402ef Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 17:29:10 -0500 Subject: [PATCH 10/27] Add changelog --- changelog.d/18907.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18907.misc diff --git a/changelog.d/18907.misc b/changelog.d/18907.misc new file mode 100644 index 00000000000..4fca9ec8fbe --- /dev/null +++ b/changelog.d/18907.misc @@ -0,0 +1 @@ +Remove `sentinel` logcontext usage in `Clock` utilities like `looping_call` and `call_later`. From 13b938ff854b95de505b55275a788f28b9fb0f45 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 18:29:53 -0500 Subject: [PATCH 11/27] Sanity check that we start in the sentinel logcontext --- synapse/util/__init__.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 81e40e6eb6a..664cae405a5 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -198,6 +198,12 @@ def _looping_call_common( """Common functionality for `looping_call` and `looping_call_now`""" def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) + # Because this is a callback from the reactor, we will be using the # `sentinel` log context at this point. We want to log with some logcontext # as we want to know which server the logs came from. @@ -236,6 +242,12 @@ def call_later( """ def wrapped_callback(*args: Any, **kwargs: Any) -> None: + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) + # Because this is a callback from the reactor, we will be using the # `sentinel` log context at this point. We want to log with some logcontext # as we want to know which server the logs came from. From 04825eb2cf39f33b46b5425011ae9acc06b16ba3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 18:30:17 -0500 Subject: [PATCH 12/27] Improve `Clock.sleep` test --- tests/util/test_logcontext.py | 35 +++++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index af36e685d7c..2406e30bcca 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -28,6 +28,7 @@ SENTINEL_CONTEXT, LoggingContext, PreserveLoggingContext, + _Sentinel, current_context, make_deferred_yieldable, nested_logging_context, @@ -44,26 +45,56 @@ class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value: str) -> None: context = current_context() - assert isinstance(context, LoggingContext) - self.assertEqual(context.name, value) + assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( + f"Expected LoggingContext({value}) but saw {context}" + ) + self.assertEqual( + str(context), value, f"Expected LoggingContext({value}) but saw {context}" + ) def test_with_context(self) -> None: with LoggingContext("test"): self._check_test_key("test") async def test_sleep(self) -> None: + """ + Test `Clock.sleep` + """ clock = Clock(reactor) + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + async def competing_callback() -> None: + nonlocal callback_finished + + # A callback from the reactor should start with the sentinel context. In + # other words, another task shouldn't have leaked their context to us. + self._check_test_key("sentinel") + with LoggingContext("competing"): await clock.sleep(0) self._check_test_key("competing") + callback_finished = True + reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) with LoggingContext("one"): await clock.sleep(0) self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + + # Back to the sentinel context + self._check_test_key("sentinel") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: sentinel_context = current_context() From f4ad07dde410b3ae5c46f3b5f43afc9a2ce2919d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 18:57:15 -0500 Subject: [PATCH 13/27] Fix copy/paste typo --- synapse/util/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 664cae405a5..269708811ad 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -230,7 +230,7 @@ def call_later( ) -> IDelayedCall: """Call something later - Note that the function will be called with generic `looping_call` logcontext, so + Note that the function will be called with generic `call_later` logcontext, so if it is anything other than a trivial task, you probably want to wrap it in `run_as_background_process` to give it more specific label and track metrics. From a8e66e7c6eab062e6f93d96e0364672a9b7ab22c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:01:37 -0500 Subject: [PATCH 14/27] Fixup `looping_call` --- synapse/util/__init__.py | 35 +++++++++++++++-------- tests/util/test_logcontext.py | 54 +++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 12 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 269708811ad..e5ba2f39385 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -205,25 +205,36 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: ) # Because this is a callback from the reactor, we will be using the - # `sentinel` log context at this point. We want to log with some logcontext - # as we want to know which server the logs came from. + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `looping_call` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) # # This also ensures that we return to the `sentinel` context when we exit # this function and yield control back to the reactor to avoid leaking the # current logcontext to the reactor (which would then get picked up and # associated with the next thing the reactor does) - with context.LoggingContext("looping_call"): - return f(*args, **kwargs) - - # Start task in the `sentinel` logcontext, to avoid leaking the current context - # into the reactor if `d` ever finishes (perhaps someone cancels the looping - # call) + with context.PreserveLoggingContext(context.LoggingContext("looping_call")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes + return context.run_in_background(f, *args, **kwargs) + + call = task.LoopingCall(wrapped_f, *args, **kwargs) + call.clock = self._reactor + # If `now=true`, the function will be called here immediately so we need to be + # in the sentinel context now. + # + # We want to start the task in the `sentinel` logcontext, to avoid leaking the + # current context into the reactor after the function finishes. TODO: Or perhaps + # someone cancels the looping call (does this matter?). with context.PreserveLoggingContext(): - call = task.LoopingCall(wrapped_f, *args, **kwargs) - call.clock = self._reactor d = call.start(msec / 1000.0, now=now) - d.addErrback(log_failure, "Looping call died", consumeErrors=False) - return call + d.addErrback(log_failure, "Looping call died", consumeErrors=False) + return call def call_later( self, delay: float, callback: Callable, *args: Any, **kwargs: Any diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 2406e30bcca..33520d31e6e 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -19,6 +19,7 @@ # # +import logging from typing import Callable, Generator, cast import twisted.python.failure @@ -39,6 +40,8 @@ from .. import unittest +logger = logging.getLogger(__name__) + reactor = cast(ISynapseReactor, _reactor) @@ -96,6 +99,57 @@ async def competing_callback() -> None: "Callback never finished which means the test probably didn't wait long enough", ) + async def test_looping_call(self) -> None: + """ + Test `Clock.looping_call` + """ + # TODO + + async def test_looping_call_now(self) -> None: + """ + Test `Clock.looping_call_now` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + callback_finished = True + + with LoggingContext("one"): + lc = clock.looping_call_now( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() + def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: sentinel_context = current_context() From 078018312802dc310e27a2499e59c67a03dad4b8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:06:32 -0500 Subject: [PATCH 15/27] All logcontext tests should use `@logcontext_clean` --- tests/util/test_logcontext.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 33520d31e6e..e9ce7dd12cb 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -38,7 +38,8 @@ from synapse.types import ISynapseReactor from synapse.util import Clock -from .. import unittest +from tests import unittest +from tests.unittest import logcontext_clean logger = logging.getLogger(__name__) @@ -55,10 +56,12 @@ def _check_test_key(self, value: str) -> None: str(context), value, f"Expected LoggingContext({value}) but saw {context}" ) + @logcontext_clean def test_with_context(self) -> None: with LoggingContext("test"): self._check_test_key("test") + @logcontext_clean async def test_sleep(self) -> None: """ Test `Clock.sleep` @@ -99,12 +102,14 @@ async def competing_callback() -> None: "Callback never finished which means the test probably didn't wait long enough", ) + @logcontext_clean async def test_looping_call(self) -> None: """ Test `Clock.looping_call` """ # TODO + @logcontext_clean async def test_looping_call_now(self) -> None: """ Test `Clock.looping_call_now` @@ -190,12 +195,14 @@ def check_logcontext() -> None: # test is done once d2 finishes return d2 + @logcontext_clean def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: async def blocking_function() -> None: await Clock(reactor).sleep(0) return self._test_run_in_background(blocking_function) + @logcontext_clean def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: @defer.inlineCallbacks def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: @@ -204,6 +211,7 @@ def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: return self._test_run_in_background(nonblocking_function) + @logcontext_clean def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: # a function which returns a deferred which looks like it has been # called, but is actually paused @@ -212,6 +220,7 @@ def testfunc() -> defer.Deferred: return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_coroutine(self) -> defer.Deferred: async def testfunc() -> None: self._check_test_key("one") @@ -222,12 +231,14 @@ async def testfunc() -> None: return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: async def testfunc() -> None: self._check_test_key("one") return self._test_run_in_background(testfunc) + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable( self, @@ -251,6 +262,7 @@ def blocking_function() -> defer.Deferred: # now it should be restored self._check_test_key("one") + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable_with_chained_deferreds( self, @@ -267,6 +279,7 @@ def test_make_deferred_yieldable_with_chained_deferreds( # now it should be restored self._check_test_key("one") + @logcontext_clean def test_nested_logging_context(self) -> None: with LoggingContext("foo"): nested_context = nested_logging_context(suffix="bar") From c535d8a1e2dcab26d30c03092b4cdbf211ef8948 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:08:23 -0500 Subject: [PATCH 16/27] Add `test_looping_call` --- tests/util/test_logcontext.py | 41 ++++++++++++++++++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index e9ce7dd12cb..c449e3e4885 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -107,7 +107,46 @@ async def test_looping_call(self) -> None: """ Test `Clock.looping_call` """ - # TODO + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + callback_finished = True + + with LoggingContext("one"): + lc = clock.looping_call( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() @logcontext_clean async def test_looping_call_now(self) -> None: From aec706558ee6a674ebb2996596eee313b80a57e5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:19:17 -0500 Subject: [PATCH 17/27] Add `test_call_later` and align `call_later` with `looping_call` --- synapse/util/__init__.py | 21 ++++++++++------- tests/util/test_logcontext.py | 43 +++++++++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 10 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index e5ba2f39385..442c8380763 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -260,20 +260,25 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: ) # Because this is a callback from the reactor, we will be using the - # `sentinel` log context at this point. We want to log with some logcontext - # as we want to know which server the logs came from. + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `call_later` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) # # This also ensures that we return to the `sentinel` context when we exit # this function and yield control back to the reactor to avoid leaking the # current logcontext to the reactor (which would then get picked up and # associated with the next thing the reactor does) - with context.LoggingContext("call_later"): - callback(*args, **kwargs) + with context.PreserveLoggingContext(context.LoggingContext("call_later")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes + return context.run_in_background(callback, *args, **kwargs) - # Start task in the `sentinel` logcontext, to avoid leaking the current context - # into the reactor once it finishes. - with context.PreserveLoggingContext(): - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index c449e3e4885..f7f3a451a99 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -180,8 +180,6 @@ async def competing_callback() -> None: self._check_test_key("one") await clock.sleep(0) self._check_test_key("one") - await clock.sleep(0) - self._check_test_key("one") self.assertTrue( callback_finished, @@ -194,6 +192,47 @@ async def competing_callback() -> None: # Stop the looping call to prevent "Reactor was unclean" errors lc.stop() + @logcontext_clean + async def test_call_later(self) -> None: + """ + Test `Clock.call_later` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + # A `call_later` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("call_later") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("call_later") + callback_finished = True + + with LoggingContext("one"): + clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + await clock.sleep(0) + self._check_test_key("one") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: sentinel_context = current_context() From 2aa15b09db4463caba63d09cc4fae62b40c15e24 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:26:16 -0500 Subject: [PATCH 18/27] Align tests --- tests/util/test_logcontext.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index f7f3a451a99..8b97e09f8e7 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -84,6 +84,7 @@ async def competing_callback() -> None: await clock.sleep(0) self._check_test_key("competing") + self._check_test_key("sentinel") callback_finished = True reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) @@ -94,14 +95,14 @@ async def competing_callback() -> None: await clock.sleep(0) self._check_test_key("one") - # Back to the sentinel context - self._check_test_key("sentinel") - self.assertTrue( callback_finished, "Callback never finished which means the test probably didn't wait long enough", ) + # Back to the sentinel context + self._check_test_key("sentinel") + @logcontext_clean async def test_looping_call(self) -> None: """ From 1c001c95eca79962ecb5777daeb20b4317b03191 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:48:42 -0500 Subject: [PATCH 19/27] No need to return something --- synapse/util/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 442c8380763..90498be8fc0 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -197,7 +197,7 @@ def _looping_call_common( ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> None: assert context.current_context() is context.SENTINEL_CONTEXT, ( "Expected `call_later` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " @@ -221,7 +221,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: with context.PreserveLoggingContext(context.LoggingContext("looping_call")): # We use `run_in_background` to reset the logcontext after `f` (or the # awaitable returned by `f`) completes - return context.run_in_background(f, *args, **kwargs) + context.run_in_background(f, *args, **kwargs) call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor From 13a5a36fcf32297a1091b88834d804b2b5cc8fa9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:49:08 -0500 Subject: [PATCH 20/27] Revert "No need to return something" This reverts commit 1c001c95eca79962ecb5777daeb20b4317b03191. --- synapse/util/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 90498be8fc0..442c8380763 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -197,7 +197,7 @@ def _looping_call_common( ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> None: + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: assert context.current_context() is context.SENTINEL_CONTEXT, ( "Expected `call_later` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " @@ -221,7 +221,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> None: with context.PreserveLoggingContext(context.LoggingContext("looping_call")): # We use `run_in_background` to reset the logcontext after `f` (or the # awaitable returned by `f`) completes - context.run_in_background(f, *args, **kwargs) + return context.run_in_background(f, *args, **kwargs) call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor From eae83e8389d3966f85342cee2499cf34a1f60e5a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 20:51:08 -0500 Subject: [PATCH 21/27] Be more specific on what to return --- synapse/util/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 442c8380763..565f327ced2 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -41,6 +41,7 @@ from typing_extensions import ParamSpec from twisted.internet import defer, task +from twisted.internet.defer import Deferred from twisted.internet.interfaces import IDelayedCall, IReactorTime from twisted.internet.task import LoopingCall from twisted.python.failure import Failure @@ -197,7 +198,7 @@ def _looping_call_common( ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> object: + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: assert context.current_context() is context.SENTINEL_CONTEXT, ( "Expected `call_later` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " @@ -276,7 +277,7 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: with context.PreserveLoggingContext(context.LoggingContext("call_later")): # We use `run_in_background` to reset the logcontext after `f` (or the # awaitable returned by `f`) completes - return context.run_in_background(callback, *args, **kwargs) + context.run_in_background(callback, *args, **kwargs) return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) From 3797515723c142af9b20d24455438946e41f0bc8 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 10 Sep 2025 23:51:24 -0500 Subject: [PATCH 22/27] Fix typo in assertion message --- synapse/util/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 565f327ced2..91da8863088 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -200,7 +200,7 @@ def _looping_call_common( def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: assert context.current_context() is context.SENTINEL_CONTEXT, ( - "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + "Expected `looping_call` callback from the reactor to start with the sentinel logcontext " f"but saw {context.current_context()}. In other words, another task shouldn't have " "leaked their logcontext to us." ) From 9116e74640873eefa03bcfa6bf8b918616963016 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 11 Sep 2025 00:54:02 -0500 Subject: [PATCH 23/27] Fix logcontext leak in email pusher test --- tests/push/test_email.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 4d885c78ebf..c851e110300 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -31,6 +31,7 @@ import synapse.rest.admin from synapse.api.errors import Codes, SynapseError +from synapse.logging.context import make_deferred_yieldable from synapse.push.emailpusher import EmailPusher from synapse.rest.client import login, room from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource @@ -88,7 +89,7 @@ def sendmail(*args: Any, **kwargs: Any) -> Deferred: # This mocks out synapse.reactor.send_email._sendmail. d: Deferred = Deferred() self.email_attempts.append((d, args, kwargs)) - return d + return make_deferred_yieldable(d) hs.get_send_email_handler()._sendmail = sendmail # type: ignore[assignment] From 57cc6750c02ce16117ba9b3ca4ba800a127b056d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 Sep 2025 18:03:47 -0500 Subject: [PATCH 24/27] Clarify why See https://github.com/element-hq/synapse/pull/18907#discussion_r2345427657 --- synapse/util/__init__.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 91da8863088..7390823344a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -221,7 +221,8 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: # associated with the next thing the reactor does) with context.PreserveLoggingContext(context.LoggingContext("looping_call")): # We use `run_in_background` to reset the logcontext after `f` (or the - # awaitable returned by `f`) completes + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor return context.run_in_background(f, *args, **kwargs) call = task.LoopingCall(wrapped_f, *args, **kwargs) @@ -276,7 +277,8 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: # associated with the next thing the reactor does) with context.PreserveLoggingContext(context.LoggingContext("call_later")): # We use `run_in_background` to reset the logcontext after `f` (or the - # awaitable returned by `f`) completes + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor context.run_in_background(callback, *args, **kwargs) return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) From 45e6b78d44ba546812f74dcc6a6c5eb9cc09d255 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 Sep 2025 18:04:51 -0500 Subject: [PATCH 25/27] Use "foo" instead of "one" See https://github.com/element-hq/synapse/pull/18907#discussion_r2345536446 Prior art here uses `one` here but I find it difficult to write about in a comment. For example, "in one context" is kinda generic sounding when I'm talking about the specific context. --- tests/util/test_logcontext.py | 46 +++++++++++++++++------------------ 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 8b97e09f8e7..fbfd627478c 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -89,11 +89,11 @@ async def competing_callback() -> None: reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) - with LoggingContext("one"): + with LoggingContext("foo"): await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") self.assertTrue( callback_finished, @@ -128,15 +128,15 @@ async def competing_callback() -> None: self._check_test_key("looping_call") callback_finished = True - with LoggingContext("one"): + with LoggingContext("foo"): lc = clock.looping_call( lambda: defer.ensureDeferred(competing_callback()), 0 ) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") self.assertTrue( callback_finished, @@ -174,13 +174,13 @@ async def competing_callback() -> None: self._check_test_key("looping_call") callback_finished = True - with LoggingContext("one"): + with LoggingContext("foo"): lc = clock.looping_call_now( lambda: defer.ensureDeferred(competing_callback()), 0 ) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") self.assertTrue( callback_finished, @@ -218,13 +218,13 @@ async def competing_callback() -> None: self._check_test_key("call_later") callback_finished = True - with LoggingContext("one"): + with LoggingContext("foo"): clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") self.assertTrue( callback_finished, @@ -239,7 +239,7 @@ def _test_run_in_background(self, function: Callable[[], object]) -> defer.Defer callback_completed = False - with LoggingContext("one"): + with LoggingContext("foo"): # fire off function, but don't wait on it. d2 = run_in_background(function) @@ -250,7 +250,7 @@ def cb(res: object) -> object: d2.addCallback(cb) - self._check_test_key("one") + self._check_test_key("foo") # now wait for the function under test to have run, and check that # the logcontext is left in a sane state. @@ -302,18 +302,18 @@ def testfunc() -> defer.Deferred: @logcontext_clean def test_run_in_background_with_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") d = defer.ensureDeferred(Clock(reactor).sleep(0)) self.assertIs(current_context(), SENTINEL_CONTEXT) await d - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) @logcontext_clean def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) @@ -331,7 +331,7 @@ def blocking_function() -> defer.Deferred: sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(blocking_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -339,7 +339,7 @@ def blocking_function() -> defer.Deferred: yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") @logcontext_clean @defer.inlineCallbacks @@ -348,7 +348,7 @@ def test_make_deferred_yieldable_with_chained_deferreds( ) -> Generator["defer.Deferred[object]", object, None]: sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(_chained_deferred_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -356,7 +356,7 @@ def test_make_deferred_yieldable_with_chained_deferreds( yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") @logcontext_clean def test_nested_logging_context(self) -> None: From 41451602e3b9e04eb972839f92a91aff81c1e948 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 12 Sep 2025 18:06:44 -0500 Subject: [PATCH 26/27] Wrap callback to see underlying failure See https://github.com/element-hq/synapse/pull/18907#discussion_r2345537031 > We should wrap this in a `try: ... finally: callback_finished = True` so we can see any potential underlying error that occurs in the `competing_callback()` --- tests/util/test_logcontext.py | 85 ++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index fbfd627478c..a7706e685d8 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -75,17 +75,20 @@ async def test_sleep(self) -> None: async def competing_callback() -> None: nonlocal callback_finished + try: + # A callback from the reactor should start with the sentinel context. In + # other words, another task shouldn't have leaked their context to us. + self._check_test_key("sentinel") - # A callback from the reactor should start with the sentinel context. In - # other words, another task shouldn't have leaked their context to us. - self._check_test_key("sentinel") - - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") - self._check_test_key("sentinel") - callback_finished = True + self._check_test_key("sentinel") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) @@ -117,16 +120,20 @@ async def test_looping_call(self) -> None: async def competing_callback() -> None: nonlocal callback_finished - # A `looping_call` callback should have *some* logcontext since we should know - # which server spawned this loop and which server the logs came from. - self._check_test_key("looping_call") + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") - self._check_test_key("looping_call") - callback_finished = True + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True with LoggingContext("foo"): lc = clock.looping_call( @@ -163,16 +170,20 @@ async def test_looping_call_now(self) -> None: async def competing_callback() -> None: nonlocal callback_finished - # A `looping_call` callback should have *some* logcontext since we should know - # which server spawned this loop and which server the logs came from. - self._check_test_key("looping_call") + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") - self._check_test_key("looping_call") - callback_finished = True + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True with LoggingContext("foo"): lc = clock.looping_call_now( @@ -207,16 +218,20 @@ async def test_call_later(self) -> None: async def competing_callback() -> None: nonlocal callback_finished - # A `call_later` callback should have *some* logcontext since we should know - # which server spawned this loop and which server the logs came from. - self._check_test_key("call_later") - - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") - - self._check_test_key("call_later") - callback_finished = True + try: + # A `call_later` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("call_later") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("call_later") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True with LoggingContext("foo"): clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) From c8d0f977ec01ebe7181b8bb3456ad3d498f658df Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 22 Sep 2025 11:27:57 -0500 Subject: [PATCH 27/27] Cancelling the deferred doesn't matter AFACT See https://github.com/element-hq/synapse/pull/18914 for more docs on how deferreds interact with logcontexts --- synapse/util/clock.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 062e956751d..23e8705be9c 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -156,8 +156,7 @@ def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: # in the sentinel context now. # # We want to start the task in the `sentinel` logcontext, to avoid leaking the - # current context into the reactor after the function finishes. TODO: Or perhaps - # someone cancels the looping call (does this matter?). + # current context into the reactor after the function finishes. with context.PreserveLoggingContext(): d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False)