Skip to content

Commit ca1202f

Browse files
committed
Fix Twisted looking things
1 parent 726dc2e commit ca1202f

3 files changed

Lines changed: 48 additions & 22 deletions

File tree

synapse/http/client.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,11 @@
7777
from synapse.http.proxyagent import ProxyAgent
7878
from synapse.http.replicationagent import ReplicationAgent
7979
from synapse.http.types import QueryParams
80-
from synapse.logging.context import make_deferred_yieldable, run_in_background
80+
from synapse.logging.context import (
81+
make_deferred_yieldable,
82+
run_in_background,
83+
PreserveLoggingContext,
84+
)
8185
from synapse.logging.opentracing import set_tag, start_active_span, tags
8286
from synapse.metrics import SERVER_NAME_LABEL
8387
from synapse.types import ISynapseReactor, StrSequence
@@ -1036,7 +1040,8 @@ def _maybe_fail(self) -> None:
10361040
Report a max size exceed error and disconnect the first time this is called.
10371041
"""
10381042
if not self.deferred.called:
1039-
self.deferred.errback(BodyExceededMaxSize())
1043+
with PreserveLoggingContext():
1044+
self.deferred.errback(BodyExceededMaxSize())
10401045
# Close the connection (forcefully) since all the data will get
10411046
# discarded anyway.
10421047
assert self.transport is not None
@@ -1135,7 +1140,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11351140
logger.warning(
11361141
"Exception encountered writing file data to stream: %s", e
11371142
)
1138-
self.deferred.errback()
1143+
with PreserveLoggingContext():
1144+
self.deferred.errback()
11391145
self.file_length += end - start
11401146

11411147
callbacks: "multipart.MultipartCallbacks" = {
@@ -1147,7 +1153,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11471153

11481154
self.total_length += len(incoming_data)
11491155
if self.max_length is not None and self.total_length >= self.max_length:
1150-
self.deferred.errback(BodyExceededMaxSize())
1156+
with PreserveLoggingContext():
1157+
self.deferred.errback(BodyExceededMaxSize())
11511158
# Close the connection (forcefully) since all the data will get
11521159
# discarded anyway.
11531160
assert self.transport is not None
@@ -1157,7 +1164,8 @@ def on_part_data(data: bytes, start: int, end: int) -> None:
11571164
self.parser.write(incoming_data)
11581165
except Exception as e:
11591166
logger.warning("Exception writing to multipart parser: %s", e)
1160-
self.deferred.errback()
1167+
with PreserveLoggingContext():
1168+
self.deferred.errback()
11611169
return
11621170

11631171
def connectionLost(self, reason: Failure = connectionDone) -> None:
@@ -1167,9 +1175,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
11671175

11681176
if reason.check(ResponseDone):
11691177
self.multipart_response.length = self.file_length
1170-
self.deferred.callback(self.multipart_response)
1178+
with PreserveLoggingContext():
1179+
self.deferred.callback(self.multipart_response)
11711180
else:
1172-
self.deferred.errback(reason)
1181+
with PreserveLoggingContext():
1182+
self.deferred.errback(reason)
11731183

11741184

11751185
class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
@@ -1193,15 +1203,17 @@ def dataReceived(self, data: bytes) -> None:
11931203
try:
11941204
self.stream.write(data)
11951205
except Exception:
1196-
self.deferred.errback()
1206+
with PreserveLoggingContext():
1207+
self.deferred.errback()
11971208
return
11981209

11991210
self.length += len(data)
12001211
# The first time the maximum size is exceeded, error and cancel the
12011212
# connection. dataReceived might be called again if data was received
12021213
# in the meantime.
12031214
if self.max_size is not None and self.length >= self.max_size:
1204-
self.deferred.errback(BodyExceededMaxSize())
1215+
with PreserveLoggingContext():
1216+
self.deferred.errback(BodyExceededMaxSize())
12051217
# Close the connection (forcefully) since all the data will get
12061218
# discarded anyway.
12071219
assert self.transport is not None
@@ -1213,7 +1225,8 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
12131225
return
12141226

12151227
if reason.check(ResponseDone):
1216-
self.deferred.callback(self.length)
1228+
with PreserveLoggingContext():
1229+
self.deferred.callback(self.length)
12171230
elif reason.check(PotentialDataLoss):
12181231
# This applies to requests which don't set `Content-Length` or a
12191232
# `Transfer-Encoding` in the response because in this case the end of the
@@ -1222,9 +1235,11 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
12221235
# behavior is expected of some servers (like YouTube), let's ignore it.
12231236
# Stolen from https://github.com/twisted/treq/pull/49/files
12241237
# http://twistedmatrix.com/trac/ticket/4840
1225-
self.deferred.callback(self.length)
1238+
with PreserveLoggingContext():
1239+
self.deferred.callback(self.length)
12261240
else:
1227-
self.deferred.errback(reason)
1241+
with PreserveLoggingContext():
1242+
self.deferred.errback(reason)
12281243

12291244

12301245
def read_body_with_max_size(

synapse/http/connectproxyclient.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
from twisted.python.failure import Failure
4242
from twisted.web import http
4343

44+
from synapse.logging.context import PreserveLoggingContext
45+
4446
logger = logging.getLogger(__name__)
4547

4648

@@ -176,14 +178,16 @@ def buildProtocol(self, addr: IAddress) -> "HTTPConnectProtocol":
176178
def clientConnectionFailed(self, connector: IConnector, reason: Failure) -> None:
177179
logger.debug("Connection to proxy failed: %s", reason)
178180
if not self.on_connection.called:
179-
self.on_connection.errback(reason)
181+
with PreserveLoggingContext():
182+
self.on_connection.errback(reason)
180183
if isinstance(self.wrapped_factory, ClientFactory):
181184
return self.wrapped_factory.clientConnectionFailed(connector, reason)
182185

183186
def clientConnectionLost(self, connector: IConnector, reason: Failure) -> None:
184187
logger.debug("Connection to proxy lost: %s", reason)
185188
if not self.on_connection.called:
186-
self.on_connection.errback(reason)
189+
with PreserveLoggingContext():
190+
self.on_connection.errback(reason)
187191
if isinstance(self.wrapped_factory, ClientFactory):
188192
return self.wrapped_factory.clientConnectionLost(connector, reason)
189193

@@ -238,14 +242,16 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
238242
self.http_setup_client.connectionLost(reason)
239243

240244
if not self.connected_deferred.called:
241-
self.connected_deferred.errback(reason)
245+
with PreserveLoggingContext():
246+
self.connected_deferred.errback(reason)
242247

243248
def proxyConnected(self, _: Union[None, "defer.Deferred[None]"]) -> None:
244249
self.wrapped_connection_started = True
245250
assert self.transport is not None
246251
self.wrapped_protocol.makeConnection(self.transport)
247252

248-
self.connected_deferred.callback(self.wrapped_protocol)
253+
with PreserveLoggingContext():
254+
self.connected_deferred.callback(self.wrapped_protocol)
249255

250256
# Get any pending data from the http buf and forward it to the original protocol
251257
buf = self.http_setup_client.clearLineBuffer()
@@ -303,7 +309,8 @@ def handleStatus(self, version: bytes, status: bytes, message: bytes) -> None:
303309

304310
def handleEndHeaders(self) -> None:
305311
logger.debug("End Headers")
306-
self.on_connected.callback(None)
312+
with PreserveLoggingContext():
313+
self.on_connected.callback(None)
307314

308315
def handleResponse(self, body: bytes) -> None:
309316
pass

synapse/media/_base.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
defer_to_threadpool,
4949
make_deferred_yieldable,
5050
run_in_background,
51+
PreserveLoggingContext,
5152
)
5253
from synapse.util.async_helpers import DeferredEvent
5354
from synapse.util.clock import Clock
@@ -753,9 +754,10 @@ def stopProducing(self) -> None:
753754
self.wakeup_event.set()
754755

755756
if not self.deferred.called:
756-
self.deferred.errback(
757-
ConsumerRequestedStopError("Consumer asked us to stop producing")
758-
)
757+
with PreserveLoggingContext():
758+
self.deferred.errback(
759+
ConsumerRequestedStopError("Consumer asked us to stop producing")
760+
)
759761

760762
async def start_read_loop(self) -> None:
761763
"""This is the loop that drives reading/writing"""
@@ -809,7 +811,8 @@ def _error(self, failure: Failure) -> None:
809811
self.consumer = None
810812

811813
if not self.deferred.called:
812-
self.deferred.errback(failure)
814+
with PreserveLoggingContext():
815+
self.deferred.errback(failure)
813816

814817
def _finish(self) -> None:
815818
"""Called when we have finished writing (either on success or
@@ -823,4 +826,5 @@ def _finish(self) -> None:
823826
self.consumer = None
824827

825828
if not self.deferred.called:
826-
self.deferred.callback(None)
829+
with PreserveLoggingContext():
830+
self.deferred.callback(None)

0 commit comments

Comments
 (0)