Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9558236

Browse files
authored
Merge pull request #3127 from matrix-org/rav/deferred_timeout
Use deferred.addTimeout instead of time_bound_deferred
2 parents d2737c1 + 6146332 commit 9558236

7 files changed

Lines changed: 135 additions & 130 deletions

File tree

synapse/http/__init__.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
# Copyright 2014-2016 OpenMarket Ltd
3+
# Copyright 2018 New Vector Ltd
34
#
45
# Licensed under the Apache License, Version 2.0 (the "License");
56
# you may not use this file except in compliance with the License.
@@ -12,3 +13,24 @@
1213
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1314
# See the License for the specific language governing permissions and
1415
# limitations under the License.
16+
from twisted.internet.defer import CancelledError
17+
from twisted.python import failure
18+
19+
from synapse.api.errors import SynapseError
20+
21+
22+
class RequestTimedOutError(SynapseError):
23+
"""Exception representing timeout of an outbound request"""
24+
def __init__(self):
25+
super(RequestTimedOutError, self).__init__(504, "Timed out")
26+
27+
28+
def cancelled_to_request_timed_out_error(value):
29+
"""Turns CancelledErrors into RequestTimedOutErrors.
30+
31+
For use with async.add_timeout_to_deferred
32+
"""
33+
if isinstance(value, failure.Failure):
34+
value.trap(CancelledError)
35+
raise RequestTimedOutError()
36+
return value

synapse/http/client.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
# Copyright 2014-2016 OpenMarket Ltd
3+
# Copyright 2018 New Vector Ltd
34
#
45
# Licensed under the Apache License, Version 2.0 (the "License");
56
# you may not use this file except in compliance with the License.
@@ -18,9 +19,10 @@
1819
from synapse.api.errors import (
1920
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
2021
)
22+
from synapse.http import cancelled_to_request_timed_out_error
23+
from synapse.util.async import add_timeout_to_deferred
2124
from synapse.util.caches import CACHE_SIZE_FACTOR
2225
from synapse.util.logcontext import make_deferred_yieldable
23-
from synapse.util import logcontext
2426
import synapse.metrics
2527
from synapse.http.endpoint import SpiderEndpoint
2628

@@ -95,21 +97,17 @@ def request(self, method, uri, *args, **kwargs):
9597
# counters to it
9698
outgoing_requests_counter.inc(method)
9799

98-
def send_request():
100+
logger.info("Sending request %s %s", method, uri)
101+
102+
try:
99103
request_deferred = self.agent.request(
100104
method, uri, *args, **kwargs
101105
)
102-
103-
return self.clock.time_bound_deferred(
106+
add_timeout_to_deferred(
104107
request_deferred,
105-
time_out=60,
108+
60, cancelled_to_request_timed_out_error,
106109
)
107-
108-
logger.info("Sending request %s %s", method, uri)
109-
110-
try:
111-
with logcontext.PreserveLoggingContext():
112-
response = yield send_request()
110+
response = yield make_deferred_yieldable(request_deferred)
113111

114112
incoming_responses_counter.inc(method, response.code)
115113
logger.info(

synapse/http/matrixfederationclient.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
# Copyright 2014-2016 OpenMarket Ltd
3+
# Copyright 2018 New Vector Ltd
34
#
45
# Licensed under the Apache License, Version 2.0 (the "License");
56
# you may not use this file except in compliance with the License.
@@ -12,17 +13,19 @@
1213
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1314
# See the License for the specific language governing permissions and
1415
# limitations under the License.
15-
import synapse.util.retryutils
1616
from twisted.internet import defer, reactor, protocol
1717
from twisted.internet.error import DNSLookupError
1818
from twisted.web.client import readBody, HTTPConnectionPool, Agent
1919
from twisted.web.http_headers import Headers
2020
from twisted.web._newclient import ResponseDone
2121

22+
from synapse.http import cancelled_to_request_timed_out_error
2223
from synapse.http.endpoint import matrix_federation_endpoint
23-
from synapse.util.async import sleep
24-
from synapse.util import logcontext
2524
import synapse.metrics
25+
from synapse.util.async import sleep, add_timeout_to_deferred
26+
from synapse.util import logcontext
27+
from synapse.util.logcontext import make_deferred_yieldable
28+
import synapse.util.retryutils
2629

2730
from canonicaljson import encode_canonical_json
2831

@@ -184,21 +187,20 @@ def _request(self, destination, method, path,
184187
producer = body_callback(method, http_url_bytes, headers_dict)
185188

186189
try:
187-
def send_request():
188-
request_deferred = self.agent.request(
189-
method,
190-
url_bytes,
191-
Headers(headers_dict),
192-
producer
193-
)
194-
195-
return self.clock.time_bound_deferred(
196-
request_deferred,
197-
time_out=timeout / 1000. if timeout else 60,
198-
)
199-
200-
with logcontext.PreserveLoggingContext():
201-
response = yield send_request()
190+
request_deferred = self.agent.request(
191+
method,
192+
url_bytes,
193+
Headers(headers_dict),
194+
producer
195+
)
196+
add_timeout_to_deferred(
197+
request_deferred,
198+
timeout / 1000. if timeout else 60,
199+
cancelled_to_request_timed_out_error,
200+
)
201+
response = yield make_deferred_yieldable(
202+
request_deferred,
203+
)
202204

203205
log_result = "%d %s" % (response.code, response.phrase,)
204206
break

synapse/notifier.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
# limitations under the License.
1515

1616
from twisted.internet import defer
17+
1718
from synapse.api.constants import EventTypes, Membership
1819
from synapse.api.errors import AuthError
1920
from synapse.handlers.presence import format_user_presence_state
2021

21-
from synapse.util import DeferredTimedOutError
2222
from synapse.util.logutils import log_function
23-
from synapse.util.async import ObservableDeferred
23+
from synapse.util.async import (
24+
ObservableDeferred, add_timeout_to_deferred,
25+
DeferredTimeoutError,
26+
)
2427
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
2528
from synapse.util.metrics import Measure
2629
from synapse.types import StreamToken
@@ -336,11 +339,12 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
336339
# Now we wait for the _NotifierUserStream to be told there
337340
# is a new token.
338341
listener = user_stream.new_listener(prev_token)
342+
add_timeout_to_deferred(
343+
listener.deferred,
344+
(end_time - now) / 1000.,
345+
)
339346
with PreserveLoggingContext():
340-
yield self.clock.time_bound_deferred(
341-
listener.deferred,
342-
time_out=(end_time - now) / 1000.
343-
)
347+
yield listener.deferred
344348

345349
current_token = user_stream.current_token
346350

@@ -351,7 +355,7 @@ def wait_for_events(self, user_id, timeout, callback, room_ids=None,
351355
# Update the prev_token to the current_token since nothing
352356
# has happened between the old prev_token and the current_token
353357
prev_token = current_token
354-
except DeferredTimedOutError:
358+
except DeferredTimeoutError:
355359
break
356360
except defer.CancelledError:
357361
break
@@ -556,13 +560,14 @@ def wait_for_replication(self, callback, timeout):
556560
if end_time <= now:
557561
break
558562

563+
add_timeout_to_deferred(
564+
listener.deferred.addTimeout,
565+
(end_time - now) / 1000.,
566+
)
559567
try:
560568
with PreserveLoggingContext():
561-
yield self.clock.time_bound_deferred(
562-
listener.deferred,
563-
time_out=(end_time - now) / 1000.
564-
)
565-
except DeferredTimedOutError:
569+
yield listener.deferred
570+
except DeferredTimeoutError:
566571
break
567572
except defer.CancelledError:
568573
break

synapse/util/__init__.py

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
from synapse.api.errors import SynapseError
1716
from synapse.util.logcontext import PreserveLoggingContext
1817

1918
from twisted.internet import defer, reactor, task
@@ -24,11 +23,6 @@
2423
logger = logging.getLogger(__name__)
2524

2625

27-
class DeferredTimedOutError(SynapseError):
28-
def __init__(self):
29-
super(DeferredTimedOutError, self).__init__(504, "Timed out")
30-
31-
3226
def unwrapFirstError(failure):
3327
# defer.gatherResults and DeferredLists wrap failures.
3428
failure.trap(defer.FirstError)
@@ -85,53 +79,3 @@ def cancel_call_later(self, timer, ignore_errs=False):
8579
except Exception:
8680
if not ignore_errs:
8781
raise
88-
89-
def time_bound_deferred(self, given_deferred, time_out):
90-
if given_deferred.called:
91-
return given_deferred
92-
93-
ret_deferred = defer.Deferred()
94-
95-
def timed_out_fn():
96-
e = DeferredTimedOutError()
97-
98-
try:
99-
ret_deferred.errback(e)
100-
except Exception:
101-
pass
102-
103-
try:
104-
given_deferred.cancel()
105-
except Exception:
106-
pass
107-
108-
timer = None
109-
110-
def cancel(res):
111-
try:
112-
self.cancel_call_later(timer)
113-
except Exception:
114-
pass
115-
return res
116-
117-
ret_deferred.addBoth(cancel)
118-
119-
def success(res):
120-
try:
121-
ret_deferred.callback(res)
122-
except Exception:
123-
pass
124-
125-
return res
126-
127-
def err(res):
128-
try:
129-
ret_deferred.errback(res)
130-
except Exception:
131-
pass
132-
133-
given_deferred.addCallbacks(callback=success, errback=err)
134-
135-
timer = self.call_later(time_out, timed_out_fn)
136-
137-
return ret_deferred

synapse/util/async.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616

1717
from twisted.internet import defer, reactor
18+
from twisted.internet.defer import CancelledError
19+
from twisted.python import failure
1820

1921
from .logcontext import (
2022
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
@@ -392,3 +394,68 @@ def _ctx_manager():
392394
self.key_to_current_writer.pop(key)
393395

394396
defer.returnValue(_ctx_manager())
397+
398+
399+
class DeferredTimeoutError(Exception):
400+
"""
401+
This error is raised by default when a L{Deferred} times out.
402+
"""
403+
404+
405+
def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
406+
"""
407+
Add a timeout to a deferred by scheduling it to be cancelled after
408+
timeout seconds.
409+
410+
This is essentially a backport of deferred.addTimeout, which was introduced
411+
in twisted 16.5.
412+
413+
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
414+
unless a cancelable function was passed to its initialization or unless
415+
a different on_timeout_cancel callable is provided.
416+
417+
Args:
418+
deferred (defer.Deferred): deferred to be timed out
419+
timeout (Number): seconds to time out after
420+
421+
on_timeout_cancel (callable): A callable which is called immediately
422+
after the deferred times out, and not if this deferred is
423+
otherwise cancelled before the timeout.
424+
425+
It takes an arbitrary value, which is the value of the deferred at
426+
that exact point in time (probably a CancelledError Failure), and
427+
the timeout.
428+
429+
The default callable (if none is provided) will translate a
430+
CancelledError Failure into a DeferredTimeoutError.
431+
"""
432+
timed_out = [False]
433+
434+
def time_it_out():
435+
timed_out[0] = True
436+
deferred.cancel()
437+
438+
delayed_call = reactor.callLater(timeout, time_it_out)
439+
440+
def convert_cancelled(value):
441+
if timed_out[0]:
442+
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
443+
return to_call(value, timeout)
444+
return value
445+
446+
deferred.addBoth(convert_cancelled)
447+
448+
def cancel_timeout(result):
449+
# stop the pending call to cancel the deferred if it's been fired
450+
if delayed_call.active():
451+
delayed_call.cancel()
452+
return result
453+
454+
deferred.addBoth(cancel_timeout)
455+
456+
457+
def _cancelled_to_timed_out_error(value, timeout):
458+
if isinstance(value, failure.Failure):
459+
value.trap(CancelledError)
460+
raise DeferredTimeoutError(timeout, "Deferred")
461+
return value

tests/util/test_clock.py

Lines changed: 0 additions & 33 deletions
This file was deleted.

0 commit comments

Comments
 (0)