Skip to content

Commit 7b095dd

Browse files
committed
Fixup logcontexts after replication PR.
Fixes logcontext leaks introduced in #19138.
1 parent d3ffd04 commit 7b095dd

3 files changed

Lines changed: 32 additions & 25 deletions

File tree

synapse/util/async_helpers.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,8 @@ def __init__(self, clock: Clock) -> None:
10291029

10301030
def set(self) -> None:
10311031
if not self._deferred.called:
1032-
self._deferred.callback(None)
1032+
with PreserveLoggingContext():
1033+
self._deferred.callback(None)
10331034

10341035
def clear(self) -> None:
10351036
if self._deferred.called:
@@ -1042,26 +1043,15 @@ async def wait(self, timeout_seconds: float) -> bool:
10421043
if self.is_set():
10431044
return True
10441045

1045-
# Create a deferred that gets called in N seconds
1046-
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
1047-
call = self._clock.call_later(
1048-
timeout_seconds,
1049-
sleep_deferred.callback,
1050-
None,
1051-
)
1052-
10531046
try:
10541047
await make_deferred_yieldable(
1055-
defer.DeferredList(
1056-
[sleep_deferred, self._deferred],
1057-
fireOnOneCallback=True,
1058-
fireOnOneErrback=True,
1059-
consumeErrors=True,
1048+
timeout_deferred(
1049+
deferred=stop_cancellation(self._deferred),
1050+
timeout=timeout_seconds,
1051+
clock=self._clock,
10601052
)
10611053
)
1062-
finally:
1063-
# Cancel the sleep if we were woken up
1064-
if call.active():
1065-
call.cancel()
1054+
except defer.TimeoutError:
1055+
pass
10661056

10671057
return self.is_set()

synapse/util/background_queue.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
TypeVar,
2626
)
2727

28+
from twisted.internet import defer
29+
2830
from synapse.util.async_helpers import DeferredEvent
2931
from synapse.util.constants import MILLISECONDS_PER_SECOND
3032

@@ -110,6 +112,8 @@ async def _process_queue(self) -> None:
110112
item = self._queue.popleft()
111113
try:
112114
await self._callback(item)
115+
except defer.CancelledError:
116+
raise
113117
except Exception:
114118
logger.exception("Error processing background queue item")
115119

tests/util/test_background_queue.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
from twisted.internet.defer import Deferred
1919
from twisted.internet.testing import MemoryReactor
2020

21+
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
2122
from synapse.server import HomeServer
23+
from synapse.util.async_helpers import DoneAwaitable
2224
from synapse.util.background_queue import BackgroundQueue
2325
from synapse.util.clock import Clock
2426

25-
from tests.unittest import HomeserverTestCase
27+
from tests.unittest import HomeserverTestCase, logcontext_clean
2628

2729

2830
class BackgroundQueueTests(HomeserverTestCase):
@@ -38,11 +40,14 @@ def prepare(
3840
timeout_ms=1000,
3941
)
4042

43+
@logcontext_clean
4144
def test_simple_call(self) -> None:
4245
"""Test that items added to the queue are processed."""
4346
# Register a deferred to be the return value of the callback.
4447
callback_result_deferred: Deferred[None] = Deferred()
45-
self._process_item_mock.side_effect = callback_result_deferred
48+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
49+
callback_result_deferred
50+
)
4651

4752
# Adding an item should cause the callback to be invoked.
4853
self.queue.add(1)
@@ -57,16 +62,20 @@ def test_simple_call(self) -> None:
5762

5863
# Once the first callback completes, the second item should be
5964
# processed.
60-
callback_result_deferred.callback(None)
65+
with PreserveLoggingContext():
66+
callback_result_deferred.callback(None)
6167
self._process_item_mock.assert_called_once_with(2)
6268

69+
@logcontext_clean
6370
def test_timeout(self) -> None:
6471
"""Test that the background process wakes up if its idle, and that it
6572
times out after being idle."""
6673

6774
# Register a deferred to be the return value of the callback.
6875
callback_result_deferred: Deferred[None] = Deferred()
69-
self._process_item_mock.side_effect = callback_result_deferred
76+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
77+
callback_result_deferred
78+
)
7079

7180
# Adding an item should cause the callback to be invoked.
7281
self.queue.add(1)
@@ -75,7 +84,8 @@ def test_timeout(self) -> None:
7584
self._process_item_mock.reset_mock()
7685

7786
# Let the callback complete.
78-
callback_result_deferred.callback(None)
87+
with PreserveLoggingContext():
88+
callback_result_deferred.callback(None)
7989

8090
# Advance the clock by less than the timeout, and add another item.
8191
self.reactor.advance(0.5)
@@ -84,12 +94,15 @@ def test_timeout(self) -> None:
8494

8595
# The callback should be invoked again.
8696
callback_result_deferred = Deferred()
87-
self._process_item_mock.side_effect = callback_result_deferred
97+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
98+
callback_result_deferred
99+
)
88100
self._process_item_mock.assert_called_once_with(2)
89101
self._process_item_mock.reset_mock()
90102

91103
# Let the callback complete.
92-
callback_result_deferred.callback(None)
104+
with PreserveLoggingContext():
105+
callback_result_deferred.callback(None)
93106

94107
# Advance the clock by more than the timeout.
95108
self.reactor.advance(1.5)

0 commit comments

Comments
 (0)