Skip to content

Commit edc0de9

Browse files
Fix bad deferred logcontext handling (#19180)
These aren't really something personally experienced but I just went around the codebase looking for all of the Deferred `.callback`, `.errback`, and `.cancel` and wrapped them with `PreserveLoggingContext()` Spawning from wanting to solve #19165 but unconfirmed whether this has any effect. To explain the fix, see the [*Deferred callbacks*](https://github.com/element-hq/synapse/blob/3b59ac3b69f6a2f73a504699b30313d8dcfe4709/docs/log_contexts.md#deferred-callbacks) section of our logcontext docs for more info (specifically using solution 2).
1 parent 8da8d4b commit edc0de9

7 files changed

Lines changed: 81 additions & 49 deletions

File tree

changelog.d/19180.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix bad deferred logcontext handling across the codebase.

synapse/handlers/worker_lock.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,7 @@ def _wake_all_locks(
184184
locks: Collection[WaitingLock | WaitingMultiLock],
185185
) -> None:
186186
for lock in locks:
187-
deferred = lock.deferred
188-
if not deferred.called:
189-
deferred.callback(None)
187+
lock.release_lock()
190188

191189
self._clock.call_later(
192190
0,
@@ -215,6 +213,12 @@ class WaitingLock:
215213
lambda: start_active_span("WaitingLock.lock")
216214
)
217215

216+
def release_lock(self) -> None:
217+
"""Release the lock (by resolving the deferred)"""
218+
if not self.deferred.called:
219+
with PreserveLoggingContext():
220+
self.deferred.callback(None)
221+
218222
async def __aenter__(self) -> None:
219223
self._lock_span.__enter__()
220224

@@ -298,6 +302,12 @@ class WaitingMultiLock:
298302
lambda: start_active_span("WaitingLock.lock")
299303
)
300304

305+
def release_lock(self) -> None:
306+
"""Release the lock (by resolving the deferred)"""
307+
if not self.deferred.called:
308+
with PreserveLoggingContext():
309+
self.deferred.callback(None)
310+
301311
async def __aenter__(self) -> None:
302312
self._lock_span.__enter__()
303313

synapse/http/client.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@
7777
from synapse.http.proxyagent import ProxyAgent
7878
from synapse.http.replicationagent import ReplicationAgent
7979
from synapse.http.types import QueryParams
80-
from synapse.logging.context import make_deferred_yieldable, run_in_background
80+
from synapse.logging.context import (
81+
PreserveLoggingContext,
82+
make_deferred_yieldable,
83+
run_in_background,
84+
)
8185
from synapse.logging.opentracing import set_tag, start_active_span, tags
8286
from synapse.metrics import SERVER_NAME_LABEL
8387
from synapse.types import ISynapseReactor, StrSequence
@@ -1036,7 +1040,8 @@ def _maybe_fail(self) -> None:
10361040
Report a max size exceed error and disconnect the first time this is called.
10371041
"""
10381042
if not self.deferred.called:
1039-
self.deferred.errback(BodyExceededMaxSize())
1043+
with PreserveLoggingContext():
1044+
self.deferred.errback(BodyExceededMaxSize())
10401045
# Close the connection (forcefully) since all the data will get
10411046
# discarded anyway.
10421047
assert self.transport is not None
@@ -1135,7 +1140,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11351140
logger.warning(
11361141
"Exception encountered writing file data to stream: %s", e
11371142
)
1138-
self.deferred.errback()
1143+
with PreserveLoggingContext():
1144+
self.deferred.errback()
11391145
self.file_length += end - start
11401146

11411147
callbacks: "multipart.MultipartCallbacks" = {
@@ -1147,7 +1153,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11471153

11481154
self.total_length += len(incoming_data)
11491155
if self.max_length is not None and self.total_length >= self.max_length:
1150-
self.deferred.errback(BodyExceededMaxSize())
1156+
with PreserveLoggingContext():
1157+
self.deferred.errback(BodyExceededMaxSize())
11511158
# Close the connection (forcefully) since all the data will get
11521159
# discarded anyway.
11531160
assert self.transport is not None
@@ -1157,7 +1164,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11571164
self.parser.write(incoming_data)
11581165
except Exception as e:
11591166
logger.warning("Exception writing to multipart parser: %s", e)
1160-
self.deferred.errback()
1167+
with PreserveLoggingContext():
1168+
self.deferred.errback()
11611169
return
11621170

11631171
def connectionLost(self, reason: Failure = connectionDone) -> None:
@@ -1167,9 +1175,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
11671175

11681176
if reason.check(ResponseDone):
11691177
self.multipart_response.length = self.file_length
1170-
self.deferred.callback(self.multipart_response)
1178+
with PreserveLoggingContext():
1179+
self.deferred.callback(self.multipart_response)
11711180
else:
1172-
self.deferred.errback(reason)
1181+
with PreserveLoggingContext():
1182+
self.deferred.errback(reason)
11731183

11741184

11751185
class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
@@ -1193,15 +1203,17 @@ def dataReceived(self, data: bytes) -> None:
11931203
try:
11941204
self.stream.write(data)
11951205
except Exception:
1196-
self.deferred.errback()
1206+
with PreserveLoggingContext():
1207+
self.deferred.errback()
11971208
return
11981209

11991210
self.length += len(data)
12001211
# The first time the maximum size is exceeded, error and cancel the
12011212
# connection. dataReceived might be called again if data was received
12021213
# in the meantime.
12031214
if self.max_size is not None and self.length >= self.max_size:
1204-
self.deferred.errback(BodyExceededMaxSize())
1215+
with PreserveLoggingContext():
1216+
self.deferred.errback(BodyExceededMaxSize())
12051217
# Close the connection (forcefully) since all the data will get
12061218
# discarded anyway.
12071219
assert self.transport is not None
@@ -1213,7 +1225,8 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
12131225
return
12141226

12151227
if reason.check(ResponseDone):
1216-
self.deferred.callback(self.length)
1228+
with PreserveLoggingContext():
1229+
self.deferred.callback(self.length)
12171230
elif reason.check(PotentialDataLoss):
12181231
# This applies to requests which don't set `Content-Length` or a
12191232
# `Transfer-Encoding` in the response because in this case the end of the
@@ -1222,9 +1235,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
12221235
# behavior is expected of some servers (like YouTube), let's ignore it.
12231236
# Stolen from https://github.com/twisted/treq/pull/49/files
12241237
# http://twistedmatrix.com/trac/ticket/4840
1225-
self.deferred.callback(self.length)
1238+
with PreserveLoggingContext():
1239+
self.deferred.callback(self.length)
12261240
else:
1227-
self.deferred.errback(reason)
1241+
with PreserveLoggingContext():
1242+
self.deferred.errback(reason)
12281243

12291244

12301245
def read_body_with_max_size(

synapse/http/connectproxyclient.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
from twisted.python.failure import Failure
4242
from twisted.web import http
4343

44+
from synapse.logging.context import PreserveLoggingContext
45+
4446
logger = logging.getLogger(__name__)
4547

4648

@@ -176,14 +178,16 @@ def buildProtocol(self, addr: IAddress) -> "HTTPConnectProtocol":
176178
def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
177179
logger.debug("Connection to proxy failed: %s", reason)
178180
if not self.on_connection.called:
179-
self.on_connection.errback(reason)
181+
with PreserveLoggingContext():
182+
self.on_connection.errback(reason)
180183
if isinstance(self.wrapped_factory, ClientFactory):
181184
return self.wrapped_factory.clientConnectionFailed(connector, reason)
182185

183186
def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
184187
logger.debug("Connection to proxy lost: %s", reason)
185188
if not self.on_connection.called:
186-
self.on_connection.errback(reason)
189+
with PreserveLoggingContext():
190+
self.on_connection.errback(reason)
187191
if isinstance(self.wrapped_factory, ClientFactory):
188192
return self.wrapped_factory.clientConnectionLost(connector, reason)
189193

@@ -238,14 +242,16 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
238242
self.http_setup_client.connectionLost(reason)
239243

240244
if not self.connected_deferred.called:
241-
self.connected_deferred.errback(reason)
245+
with PreserveLoggingContext():
246+
self.connected_deferred.errback(reason)
242247

243248
def proxyConnected(self, _: Union[None, "defer.Deferred[None]"]) -> None:
244249
self.wrapped_connection_started = True
245250
assert self.transport is not None
246251
self.wrapped_protocol.makeConnection(self.transport)
247252

248-
self.connected_deferred.callback(self.wrapped_protocol)
253+
with PreserveLoggingContext():
254+
self.connected_deferred.callback(self.wrapped_protocol)
249255

250256
# Get any pending data from the http buf and forward it to the original protocol
251257
buf = self.http_setup_client.clearLineBuffer()
@@ -303,7 +309,8 @@ def handleStatus(self, version: bytes, status: bytes, message: bytes) -> None:
303309

304310
def handleEndHeaders(self) -> None:
305311
logger.debug("End Headers")
306-
self.on_connected.callback(None)
312+
with PreserveLoggingContext():
313+
self.on_connected.callback(None)
307314

308315
def handleResponse(self, body: bytes) -> None:
309316
pass

synapse/media/_base.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from synapse.http.server import finish_request, respond_with_json
4646
from synapse.http.site import SynapseRequest
4747
from synapse.logging.context import (
48+
PreserveLoggingContext,
4849
defer_to_threadpool,
4950
make_deferred_yieldable,
5051
run_in_background,
@@ -753,9 +754,10 @@ def stopProducing(self) -> None:
753754
self.wakeup_event.set()
754755

755756
if not self.deferred.called:
756-
self.deferred.errback(
757-
ConsumerRequestedStopError("Consumer asked us to stop producing")
758-
)
757+
with PreserveLoggingContext():
758+
self.deferred.errback(
759+
ConsumerRequestedStopError("Consumer asked us to stop producing")
760+
)
759761

760762
async def start_read_loop(self) -> None:
761763
"""This is the loop that drives reading/writing"""
@@ -809,7 +811,8 @@ def _error(self, failure: Failure) -> None:
809811
self.consumer = None
810812

811813
if not self.deferred.called:
812-
self.deferred.errback(failure)
814+
with PreserveLoggingContext():
815+
self.deferred.errback(failure)
813816

814817
def _finish(self) -> None:
815818
"""Called when we have finished writing (either on success or
@@ -823,4 +826,5 @@ def _finish(self) -> None:
823826
self.consumer = None
824827

825828
if not self.deferred.called:
826-
self.deferred.callback(None)
829+
with PreserveLoggingContext():
830+
self.deferred.callback(None)

synapse/util/async_helpers.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,8 @@ def time_it_out() -> None:
813813
# will have errbacked new_d, but in case it hasn't, errback it now.
814814

815815
if not new_d.called:
816-
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
816+
with PreserveLoggingContext():
817+
new_d.errback(defer.TimeoutError("Timed out after %gs" % (timeout,)))
817818

818819
# We don't track these calls since they are short.
819820
delayed_call = clock.call_later(
@@ -840,11 +841,13 @@ def cancel_timeout(result: _T) -> _T:
840841

841842
def success_cb(val: _T) -> None:
842843
if not new_d.called:
843-
new_d.callback(val)
844+
with PreserveLoggingContext():
845+
new_d.callback(val)
844846

845847
def failure_cb(val: Failure) -> None:
846848
if not new_d.called:
847-
new_d.errback(val)
849+
with PreserveLoggingContext():
850+
new_d.errback(val)
848851

849852
deferred.addCallbacks(success_cb, failure_cb)
850853

@@ -946,7 +949,8 @@ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
946949
# propagating. we then `unpause` it once the wrapped deferred completes, to
947950
# propagate the exception.
948951
new_deferred.pause()
949-
new_deferred.errback(Failure(CancelledError()))
952+
with PreserveLoggingContext():
953+
new_deferred.errback(Failure(CancelledError()))
950954

951955
deferred.addBoth(lambda _: new_deferred.unpause())
952956

@@ -978,15 +982,6 @@ async def sleep(self, name: str, delay_ms: int) -> None:
978982
"""Sleep for the given number of milliseconds, or return if the given
979983
`name` is explicitly woken up.
980984
"""
981-
982-
# Create a deferred that gets called in N seconds
983-
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
984-
call = self._clock.call_later(
985-
delay_ms / 1000,
986-
sleep_deferred.callback,
987-
None,
988-
)
989-
990985
# Create a deferred that will get called if `wake` is called with
991986
# the same `name`.
992987
stream_set = self._streams.setdefault(name, set())
@@ -996,13 +991,14 @@ async def sleep(self, name: str, delay_ms: int) -> None:
996991
try:
997992
# Wait for either the delay or for `wake` to be called.
998993
await make_deferred_yieldable(
999-
defer.DeferredList(
1000-
[sleep_deferred, notify_deferred],
1001-
fireOnOneCallback=True,
1002-
fireOnOneErrback=True,
1003-
consumeErrors=True,
994+
timeout_deferred(
995+
deferred=stop_cancellation(notify_deferred),
996+
timeout=delay_ms / 1000,
997+
clock=self._clock,
1004998
)
1005999
)
1000+
except defer.TimeoutError:
1001+
pass
10061002
finally:
10071003
# Clean up the state
10081004
curr_stream_set = self._streams.get(name)
@@ -1011,10 +1007,6 @@ async def sleep(self, name: str, delay_ms: int) -> None:
10111007
if len(curr_stream_set) == 0:
10121008
self._streams.pop(name)
10131009

1014-
# Cancel the sleep if we were woken up
1015-
if call.active():
1016-
call.cancel()
1017-
10181010

10191011
class DeferredEvent:
10201012
"""Like threading.Event but for async code"""

synapse/util/caches/deferred_cache.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
from twisted.internet import defer
4040
from twisted.python.failure import Failure
4141

42+
from synapse.logging.context import PreserveLoggingContext
4243
from synapse.metrics import SERVER_NAME_LABEL
4344
from synapse.util.async_helpers import ObservableDeferred
4445
from synapse.util.caches.lrucache import LruCache
@@ -514,7 +515,8 @@ def complete_bulk(
514515
cache._completed_callback(value, self, key)
515516

516517
if self._deferred:
517-
self._deferred.callback(result)
518+
with PreserveLoggingContext():
519+
self._deferred.callback(result)
518520

519521
def error_bulk(
520522
self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure
@@ -524,4 +526,5 @@ def error_bulk(
524526
cache._error_callback(failure, self, key)
525527

526528
if self._deferred:
527-
self._deferred.errback(failure)
529+
with PreserveLoggingContext():
530+
self._deferred.errback(failure)

0 commit comments

Comments
 (0)