Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19180.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bad deferred logcontext handling across the codebase.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on how often and easy it is to get this wrong, ideally, we'd have some lint for this. But not for now ⏩

16 changes: 13 additions & 3 deletions synapse/handlers/worker_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,7 @@ def _wake_all_locks(
locks: Collection[WaitingLock | WaitingMultiLock],
) -> None:
for lock in locks:
deferred = lock.deferred
if not deferred.called:
deferred.callback(None)
lock.release_lock()

self._clock.call_later(
0,
Expand Down Expand Up @@ -215,6 +213,12 @@ class WaitingLock:
lambda: start_active_span("WaitingLock.lock")
)

def release_lock(self) -> None:
"""Release the lock (by resolving the deferred)"""
if not self.deferred.called:
with PreserveLoggingContext():
self.deferred.callback(None)

async def __aenter__(self) -> None:
self._lock_span.__enter__()

Expand Down Expand Up @@ -298,6 +302,12 @@ class WaitingMultiLock:
lambda: start_active_span("WaitingLock.lock")
)

def release_lock(self) -> None:
"""Release the lock (by resolving the deferred)"""
if not self.deferred.called:
with PreserveLoggingContext():
self.deferred.callback(None)

async def __aenter__(self) -> None:
self._lock_span.__enter__()

Expand Down
39 changes: 27 additions & 12 deletions synapse/http/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@
from synapse.http.proxyagent import ProxyAgent
from synapse.http.replicationagent import ReplicationAgent
from synapse.http.types import QueryParams
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.context import (
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.metrics import SERVER_NAME_LABEL
from synapse.types import ISynapseReactor, StrSequence
Expand Down Expand Up @@ -1036,7 +1040,8 @@ def _maybe_fail(self) -> None:
Report a max size exceed error and disconnect the first time this is called.
"""
if not self.deferred.called:
self.deferred.errback(BodyExceededMaxSize())
with PreserveLoggingContext():
self.deferred.errback(BodyExceededMaxSize())
Comment on lines +1043 to +1044
Copy link
Copy Markdown
Contributor Author

@MadLittleMods MadLittleMods Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I painted with a wide brush, I think this is necessary because the deferred is being passed in and could have whatever callback chain which could change logcontext.

In practice, I don't think these really had callback chains to worry about which is why we probably didn't experience problems in this area before.

Applies with a bunch of the Twisted HTTP machinery type stuff here.

# Close the connection (forcefully) since all the data will get
# discarded anyway.
assert self.transport is not None
Expand Down Expand Up @@ -1135,7 +1140,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
logger.warning(
"Exception encountered writing file data to stream: %s", e
)
self.deferred.errback()
with PreserveLoggingContext():
self.deferred.errback()
self.file_length += end - start

callbacks: "multipart.MultipartCallbacks" = {
Expand All @@ -1147,7 +1153,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:

self.total_length += len(incoming_data)
if self.max_length is not None and self.total_length >= self.max_length:
self.deferred.errback(BodyExceededMaxSize())
with PreserveLoggingContext():
self.deferred.errback(BodyExceededMaxSize())
# Close the connection (forcefully) since all the data will get
# discarded anyway.
assert self.transport is not None
Expand All @@ -1157,7 +1164,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
self.parser.write(incoming_data)
except Exception as e:
logger.warning("Exception writing to multipart parser: %s", e)
self.deferred.errback()
with PreserveLoggingContext():
self.deferred.errback()
return

def connectionLost(self, reason: Failure = connectionDone) -> None:
Expand All @@ -1167,9 +1175,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:

if reason.check(ResponseDone):
self.multipart_response.length = self.file_length
self.deferred.callback(self.multipart_response)
with PreserveLoggingContext():
self.deferred.callback(self.multipart_response)
else:
self.deferred.errback(reason)
with PreserveLoggingContext():
self.deferred.errback(reason)


class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
Expand All @@ -1193,15 +1203,17 @@ def dataReceived(self, data: bytes) -> None:
try:
self.stream.write(data)
except Exception:
self.deferred.errback()
with PreserveLoggingContext():
self.deferred.errback()
return

self.length += len(data)
# The first time the maximum size is exceeded, error and cancel the
# connection. dataReceived might be called again if data was received
# in the meantime.
if self.max_size is not None and self.length >= self.max_size:
self.deferred.errback(BodyExceededMaxSize())
with PreserveLoggingContext():
self.deferred.errback(BodyExceededMaxSize())
# Close the connection (forcefully) since all the data will get
# discarded anyway.
assert self.transport is not None
Expand All @@ -1213,7 +1225,8 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
return

if reason.check(ResponseDone):
self.deferred.callback(self.length)
with PreserveLoggingContext():
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
# This applies to requests which don't set `Content-Length` or a
# `Transfer-Encoding` in the response because in this case the end of the
Expand All @@ -1222,9 +1235,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
# behavior is expected of some servers (like YouTube), let's ignore it.
# Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
with PreserveLoggingContext():
self.deferred.callback(self.length)
else:
self.deferred.errback(reason)
with PreserveLoggingContext():
self.deferred.errback(reason)


def read_body_with_max_size(
Expand Down
17 changes: 12 additions & 5 deletions synapse/http/connectproxyclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
from twisted.python.failure import Failure
from twisted.web import http

from synapse.logging.context import PreserveLoggingContext

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -176,14 +178,16 @@ def buildProtocol(self, addr: IAddress) -> "HTTPConnectProtocol":
def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
logger.debug("Connection to proxy failed: %s", reason)
if not self.on_connection.called:
self.on_connection.errback(reason)
with PreserveLoggingContext():
self.on_connection.errback(reason)
if isinstance(self.wrapped_factory, ClientFactory):
return self.wrapped_factory.clientConnectionFailed(connector, reason)

def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
logger.debug("Connection to proxy lost: %s", reason)
if not self.on_connection.called:
self.on_connection.errback(reason)
with PreserveLoggingContext():
self.on_connection.errback(reason)
if isinstance(self.wrapped_factory, ClientFactory):
return self.wrapped_factory.clientConnectionLost(connector, reason)

Expand Down Expand Up @@ -238,14 +242,16 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
self.http_setup_client.connectionLost(reason)

if not self.connected_deferred.called:
self.connected_deferred.errback(reason)
with PreserveLoggingContext():
self.connected_deferred.errback(reason)

def proxyConnected(self, _: Union[None, "defer.Deferred[None]"]) -> None:
self.wrapped_connection_started = True
assert self.transport is not None
self.wrapped_protocol.makeConnection(self.transport)

self.connected_deferred.callback(self.wrapped_protocol)
with PreserveLoggingContext():
self.connected_deferred.callback(self.wrapped_protocol)

# Get any pending data from the http buf and forward it to the original protocol
buf = self.http_setup_client.clearLineBuffer()
Expand Down Expand Up @@ -303,7 +309,8 @@ def handleStatus(self, version: bytes, status: bytes, message: bytes) -> None:

def handleEndHeaders(self) -> None:
logger.debug("End Headers")
self.on_connected.callback(None)
with PreserveLoggingContext():
self.on_connected.callback(None)

def handleResponse(self, body: bytes) -> None:
pass
14 changes: 9 additions & 5 deletions synapse/media/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from synapse.http.server import finish_request, respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import (
PreserveLoggingContext,
defer_to_threadpool,
make_deferred_yieldable,
run_in_background,
Expand Down Expand Up @@ -753,9 +754,10 @@ def stopProducing(self) -> None:
self.wakeup_event.set()

if not self.deferred.called:
self.deferred.errback(
ConsumerRequestedStopError("Consumer asked us to stop producing")
)
with PreserveLoggingContext():
self.deferred.errback(
ConsumerRequestedStopError("Consumer asked us to stop producing")
)

async def start_read_loop(self) -> None:
"""This is the loop that drives reading/writing"""
Expand Down Expand Up @@ -809,7 +811,8 @@ def _error(self, failure: Failure) -> None:
self.consumer = None

if not self.deferred.called:
self.deferred.errback(failure)
with PreserveLoggingContext():
self.deferred.errback(failure)

def _finish(self) -> None:
"""Called when we have finished writing (either on success or
Expand All @@ -823,4 +826,5 @@ def _finish(self) -> None:
self.consumer = None

if not self.deferred.called:
self.deferred.callback(None)
with PreserveLoggingContext():
self.deferred.callback(None)
36 changes: 14 additions & 22 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,8 @@ def time_it_out() -> None:
# will have errbacked new_d, but in case it hasn't, errback it now.

if not new_d.called:
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
with PreserveLoggingContext():
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
Comment on lines +816 to +817
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems necessary as we return new_d and people can chain whatever on top of it.

Usually, there are things chained that change the logcontext because of this pattern: await make_deferred_yieldable(timeout_deferred(...)) (make_deferred_yieldable changes the logcontext)


# We don't track these calls since they are short.
delayed_call = clock.call_later(
Expand All @@ -840,11 +841,13 @@ def cancel_timeout(result: _T) -> _T:

def success_cb(val: _T) -> None:
if not new_d.called:
new_d.callback(val)
with PreserveLoggingContext():
new_d.callback(val)

def failure_cb(val: Failure) -> None:
if not new_d.called:
new_d.errback(val)
with PreserveLoggingContext():
new_d.errback(val)

deferred.addCallbacks(success_cb, failure_cb)

Expand Down Expand Up @@ -946,7 +949,8 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
# propagating. we then `unpause` it once the wrapped deferred completes, to
# propagate the exception.
new_deferred.pause()
new_deferred.errback(Failure(CancelledError()))
with PreserveLoggingContext():
new_deferred.errback(Failure(CancelledError()))

deferred.addBoth(lambda _: new_deferred.unpause())

Expand Down Expand Up @@ -978,15 +982,6 @@ async def sleep(self, name: str, delay_ms: int) -> None:
"""Sleep for the given number of milliseconds, or return if the given
`name` is explicitly woken up.
"""

# Create a deferred that gets called in N seconds
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
call = self._clock.call_later(
delay_ms / 1000,
sleep_deferred.callback,
None,
)

# Create a deferred that will get called if `wake` is called with
# the same `name`.
stream_set = self._streams.setdefault(name, set())
Expand All @@ -996,13 +991,14 @@ async def sleep(self, name: str, delay_ms: int) -> None:
try:
# Wait for either the delay or for `wake` to be called.
await make_deferred_yieldable(
defer.DeferredList(
[sleep_deferred, notify_deferred],
fireOnOneCallback=True,
fireOnOneErrback=True,
consumeErrors=True,
timeout_deferred(
deferred=stop_cancellation(notify_deferred),
timeout=delay_ms / 1000,
clock=self._clock,
)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same fix as #19146

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bunch of smoke here that fixing AwakenableSleeper may fix #19165

PerDestinationQueue (federation_transaction_transmission_loop in the logs) may be intertwined (not sure, not following the layers quite concretely) with MatrixFederationHttpClient which uses AwakenableSleeper

)
except defer.TimeoutError:
pass
finally:
# Clean up the state
curr_stream_set = self._streams.get(name)
Expand All @@ -1011,10 +1007,6 @@ async def sleep(self, name: str, delay_ms: int) -> None:
if len(curr_stream_set) == 0:
self._streams.pop(name)

# Cancel the sleep if we were woken up
if call.active():
call.cancel()


class DeferredEvent:
"""Like threading.Event but for async code"""
Expand Down
7 changes: 5 additions & 2 deletions synapse/util/caches/deferred_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from twisted.internet import defer
from twisted.python.failure import Failure

from synapse.logging.context import PreserveLoggingContext
from synapse.metrics import SERVER_NAME_LABEL
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.lrucache import LruCache
Expand Down Expand Up @@ -514,7 +515,8 @@ def complete_bulk(
cache._completed_callback(value, self, key)

if self._deferred:
self._deferred.callback(result)
with PreserveLoggingContext():
self._deferred.callback(result)

def error_bulk(
self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure
Expand All @@ -524,4 +526,5 @@ def error_bulk(
cache._error_callback(failure, self, key)

if self._deferred:
self._deferred.errback(failure)
with PreserveLoggingContext():
self._deferred.errback(failure)
Loading