From 315cab49391b558df0ca2b1da818acbfa4211f13 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 10:44:11 -0600 Subject: [PATCH 1/8] Fix `WorkerLocksHandler`/`WaitingLock` --- synapse/handlers/worker_lock.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py index 3e097d21f2d..0e3fab292f9 100644 --- a/synapse/handlers/worker_lock.py +++ b/synapse/handlers/worker_lock.py @@ -184,9 +184,7 @@ def _wake_all_locks( locks: Collection[WaitingLock | WaitingMultiLock], ) -> None: for lock in locks: - deferred = lock.deferred - if not deferred.called: - deferred.callback(None) + lock.release_lock() self._clock.call_later( 0, @@ -215,6 +213,12 @@ class WaitingLock: lambda: start_active_span("WaitingLock.lock") ) + def release_lock(self) -> None: + """Release the lock (by resolving the deferred)""" + if not self.deferred.called: + with PreserveLoggingContext(): + self.deferred.callback(None) + async def __aenter__(self) -> None: self._lock_span.__enter__() @@ -298,6 +302,12 @@ class WaitingMultiLock: lambda: start_active_span("WaitingLock.lock") ) + def release_lock(self) -> None: + """Release the lock (by resolving the deferred)""" + if not self.deferred.called: + with PreserveLoggingContext(): + self.deferred.callback(None) + async def __aenter__(self) -> None: self._lock_span.__enter__() From f6ea74052a436f83dab0f3e1ab38548398070651 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 10:54:14 -0600 Subject: [PATCH 2/8] Fix `AwakenableSleeper` Same as https://github.com/element-hq/synapse/pull/19146 --- synapse/util/async_helpers.py | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 825fb10acfd..fb47407ff23 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -978,15 +978,6 @@ async def sleep(self, name: str, delay_ms: int) -> None: """Sleep for the given number of milliseconds, or return if the given `name` is explicitly woken up. """ - - # Create a deferred that gets called in N seconds - sleep_deferred: "defer.Deferred[None]" = defer.Deferred() - call = self._clock.call_later( - delay_ms / 1000, - sleep_deferred.callback, - None, - ) - # Create a deferred that will get called if `wake` is called with # the same `name`. stream_set = self._streams.setdefault(name, set()) @@ -996,13 +987,14 @@ async def sleep(self, name: str, delay_ms: int) -> None: try: # Wait for either the delay or for `wake` to be called. await make_deferred_yieldable( - defer.DeferredList( - [sleep_deferred, notify_deferred], - fireOnOneCallback=True, - fireOnOneErrback=True, - consumeErrors=True, + timeout_deferred( + deferred=stop_cancellation(notify_deferred), + timeout=delay_ms / 1000, + clock=self._clock, ) ) + except defer.TimeoutError: + pass finally: # Clean up the state curr_stream_set = self._streams.get(name) @@ -1011,10 +1003,6 @@ async def sleep(self, name: str, delay_ms: int) -> None: if len(curr_stream_set) == 0: self._streams.pop(name) - # Cancel the sleep if we were woken up - if call.active(): - call.cancel() - class DeferredEvent: """Like threading.Event but for async code""" From e60eb82c1c0e888c5ce40efe5a4b11d4793b78c9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 10:56:51 -0600 Subject: [PATCH 3/8] Fix `CacheMultipleEntries` --- synapse/util/caches/deferred_cache.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index a1601cd4e9a..e7e21d0de94 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -39,6 +39,7 @@ from twisted.internet import defer from twisted.python.failure import Failure +from synapse.logging.context import PreserveLoggingContext from synapse.metrics import SERVER_NAME_LABEL from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.lrucache import LruCache @@ -514,7 +515,8 @@ def complete_bulk( cache._completed_callback(value, self, key) if self._deferred: - self._deferred.callback(result) + with PreserveLoggingContext(): + self._deferred.callback(result) def error_bulk( self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure @@ -524,4 +526,5 @@ def error_bulk( cache._error_callback(failure, self, key) if self._deferred: - self._deferred.errback(failure) + with PreserveLoggingContext(): + self._deferred.errback(failure) From 7da7726b26eac97dc076016aa0020cea240f208f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 11:05:59 -0600 Subject: [PATCH 4/8] Be extra careful with `timeout_deferred` --- synapse/util/async_helpers.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index fb47407ff23..d308dd2d3f8 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -813,7 +813,8 @@ def time_it_out() -> None: # will have errbacked new_d, but in case it hasn't, errback it now. if not new_d.called: - new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,))) + with PreserveLoggingContext(): + new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,))) # We don't track these calls since they are short. delayed_call = clock.call_later( @@ -840,11 +841,13 @@ def cancel_timeout(result: _T) -> _T: def success_cb(val: _T) -> None: if not new_d.called: - new_d.callback(val) + with PreserveLoggingContext(): + new_d.callback(val) def failure_cb(val: Failure) -> None: if not new_d.called: - new_d.errback(val) + with PreserveLoggingContext(): + new_d.errback(val) deferred.addCallbacks(success_cb, failure_cb) From ade9ef00977516e24ea2a437f6eefa65524e326f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 11:18:54 -0600 Subject: [PATCH 5/8] Fix Twisted looking things --- synapse/http/client.py | 39 +++++++++++++++++++++--------- synapse/http/connectproxyclient.py | 17 +++++++++---- synapse/media/_base.py | 14 +++++++---- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index 9971accccd5..b433490b663 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -77,7 +77,11 @@ from synapse.http.proxyagent import ProxyAgent from synapse.http.replicationagent import ReplicationAgent from synapse.http.types import QueryParams -from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.context import ( + make_deferred_yieldable, + run_in_background, + PreserveLoggingContext, +) from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.metrics import SERVER_NAME_LABEL from synapse.types import ISynapseReactor, StrSequence @@ -1036,7 +1040,8 @@ def _maybe_fail(self) -> None: Report a max size exceed error and disconnect the first time this is called. """ if not self.deferred.called: - self.deferred.errback(BodyExceededMaxSize()) + with PreserveLoggingContext(): + self.deferred.errback(BodyExceededMaxSize()) # Close the connection (forcefully) since all the data will get # discarded anyway. assert self.transport is not None @@ -1135,7 +1140,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None: logger.warning( "Exception encountered writing file data to stream: %s", e ) - self.deferred.errback() + with PreserveLoggingContext(): + self.deferred.errback() self.file_length += end - start callbacks: "multipart.MultipartCallbacks" = { @@ -1147,7 +1153,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None: self.total_length += len(incoming_data) if self.max_length is not None and self.total_length >= self.max_length: - self.deferred.errback(BodyExceededMaxSize()) + with PreserveLoggingContext(): + self.deferred.errback(BodyExceededMaxSize()) # Close the connection (forcefully) since all the data will get # discarded anyway. assert self.transport is not None @@ -1157,7 +1164,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None: self.parser.write(incoming_data) except Exception as e: logger.warning("Exception writing to multipart parser: %s", e) - self.deferred.errback() + with PreserveLoggingContext(): + self.deferred.errback() return def connectionLost(self, reason: Failure = connectionDone) -> None: @@ -1167,9 +1175,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: if reason.check(ResponseDone): self.multipart_response.length = self.file_length - self.deferred.callback(self.multipart_response) + with PreserveLoggingContext(): + self.deferred.callback(self.multipart_response) else: - self.deferred.errback(reason) + with PreserveLoggingContext(): + self.deferred.errback(reason) class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): @@ -1193,7 +1203,8 @@ def dataReceived(self, data: bytes) -> None: try: self.stream.write(data) except Exception: - self.deferred.errback() + with PreserveLoggingContext(): + self.deferred.errback() return self.length += len(data) @@ -1201,7 +1212,8 @@ def dataReceived(self, data: bytes) -> None: # connection. dataReceived might be called again if data was received # in the meantime. if self.max_size is not None and self.length >= self.max_size: - self.deferred.errback(BodyExceededMaxSize()) + with PreserveLoggingContext(): + self.deferred.errback(BodyExceededMaxSize()) # Close the connection (forcefully) since all the data will get # discarded anyway. assert self.transport is not None @@ -1213,7 +1225,8 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: return if reason.check(ResponseDone): - self.deferred.callback(self.length) + with PreserveLoggingContext(): + self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): # This applies to requests which don't set `Content-Length` or a # `Transfer-Encoding` in the response because in this case the end of the @@ -1222,9 +1235,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: # behavior is expected of some servers (like YouTube), let's ignore it. # Stolen from https://github.com/twisted/treq/pull/49/files # http://twistedmatrix.com/trac/ticket/4840 - self.deferred.callback(self.length) + with PreserveLoggingContext(): + self.deferred.callback(self.length) else: - self.deferred.errback(reason) + with PreserveLoggingContext(): + self.deferred.errback(reason) def read_body_with_max_size( diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index 094655f91a2..285e6d3546a 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -41,6 +41,8 @@ from twisted.python.failure import Failure from twisted.web import http +from synapse.logging.context import PreserveLoggingContext + logger = logging.getLogger(__name__) @@ -176,14 +178,16 @@ def buildProtocol(self, addr: IAddress) -> "HTTPConnectProtocol": def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None: logger.debug("Connection to proxy failed: %s", reason) if not self.on_connection.called: - self.on_connection.errback(reason) + with PreserveLoggingContext(): + self.on_connection.errback(reason) if isinstance(self.wrapped_factory, ClientFactory): return self.wrapped_factory.clientConnectionFailed(connector, reason) def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None: logger.debug("Connection to proxy lost: %s", reason) if not self.on_connection.called: - self.on_connection.errback(reason) + with PreserveLoggingContext(): + self.on_connection.errback(reason) if isinstance(self.wrapped_factory, ClientFactory): return self.wrapped_factory.clientConnectionLost(connector, reason) @@ -238,14 +242,16 @@ def connectionLost(self, reason: Failure = connectionDone) -> None: self.http_setup_client.connectionLost(reason) if not self.connected_deferred.called: - self.connected_deferred.errback(reason) + with PreserveLoggingContext(): + self.connected_deferred.errback(reason) def proxyConnected(self, _: Union[None, "defer.Deferred[None]"]) -> None: self.wrapped_connection_started = True assert self.transport is not None self.wrapped_protocol.makeConnection(self.transport) - self.connected_deferred.callback(self.wrapped_protocol) + with PreserveLoggingContext(): + self.connected_deferred.callback(self.wrapped_protocol) # Get any pending data from the http buf and forward it to the original protocol buf = self.http_setup_client.clearLineBuffer() @@ -303,7 +309,8 @@ def handleStatus(self, version: bytes, status: bytes, message: bytes) -> None: def handleEndHeaders(self) -> None: logger.debug("End Headers") - self.on_connected.callback(None) + with PreserveLoggingContext(): + self.on_connected.callback(None) def handleResponse(self, body: bytes) -> None: pass diff --git a/synapse/media/_base.py b/synapse/media/_base.py index e0313d2893b..7daf973f54c 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -48,6 +48,7 @@ defer_to_threadpool, make_deferred_yieldable, run_in_background, + PreserveLoggingContext, ) from synapse.util.async_helpers import DeferredEvent from synapse.util.clock import Clock @@ -753,9 +754,10 @@ def stopProducing(self) -> None: self.wakeup_event.set() if not self.deferred.called: - self.deferred.errback( - ConsumerRequestedStopError("Consumer asked us to stop producing") - ) + with PreserveLoggingContext(): + self.deferred.errback( + ConsumerRequestedStopError("Consumer asked us to stop producing") + ) async def start_read_loop(self) -> None: """This is the loop that drives reading/writing""" @@ -809,7 +811,8 @@ def _error(self, failure: Failure) -> None: self.consumer = None if not self.deferred.called: - self.deferred.errback(failure) + with PreserveLoggingContext(): + self.deferred.errback(failure) def _finish(self) -> None: """Called when we have finished writing (either on success or @@ -823,4 +826,5 @@ def _finish(self) -> None: self.consumer = None if not self.deferred.called: - self.deferred.callback(None) + with PreserveLoggingContext(): + self.deferred.callback(None) From f123e095dd459be43f3ba45b72a547be07c73583 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 11:19:28 -0600 Subject: [PATCH 6/8] Fix `delay_cancellation` --- synapse/util/async_helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index d308dd2d3f8..6f9bbcac678 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -949,7 +949,8 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None: # propagating. we then `unpause` it once the wrapped deferred completes, to # propagate the exception. new_deferred.pause() - new_deferred.errback(Failure(CancelledError())) + with PreserveLoggingContext(): + new_deferred.errback(Failure(CancelledError())) deferred.addBoth(lambda _: new_deferred.unpause()) From 8495f338708ad34f03c074c9ed06fbafce70db38 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 11:23:46 -0600 Subject: [PATCH 7/8] Add changelog --- changelog.d/19180.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19180.misc diff --git a/changelog.d/19180.misc b/changelog.d/19180.misc new file mode 100644 index 00000000000..6de107d8dc3 --- /dev/null +++ b/changelog.d/19180.misc @@ -0,0 +1 @@ +Fix bad deferred logcontext handling across the codebase. From 13dd110a000f963477240c4cb1423358c4ae1383 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 11:24:32 -0600 Subject: [PATCH 8/8] Fix automatic lint fixes --- synapse/http/client.py | 2 +- synapse/media/_base.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/http/client.py b/synapse/http/client.py index b433490b663..cb9b8cd6831 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -78,9 +78,9 @@ from synapse.http.replicationagent import ReplicationAgent from synapse.http.types import QueryParams from synapse.logging.context import ( + PreserveLoggingContext, make_deferred_yieldable, run_in_background, - PreserveLoggingContext, ) from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.metrics import SERVER_NAME_LABEL diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 7daf973f54c..0fe2e5b529c 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -45,10 +45,10 @@ from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import ( + PreserveLoggingContext, defer_to_threadpool, make_deferred_yieldable, run_in_background, - PreserveLoggingContext, ) from synapse.util.async_helpers import DeferredEvent from synapse.util.clock import Clock