Skip to content

Commit 6790312

Browse files
authored
Fixup logcontexts after replication PR. (#19146)
Fixes logcontext leaks introduced in #19138.
1 parent d3ffd04 commit 6790312

4 files changed

Lines changed: 32 additions & 25 deletions

File tree

changelog.d/19146.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Minor speed up of processing of inbound replication.

synapse/util/async_helpers.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1029,7 +1029,8 @@ def __init__(self, clock: Clock) -> None:
10291029

10301030
def set(self) -> None:
10311031
if not self._deferred.called:
1032-
self._deferred.callback(None)
1032+
with PreserveLoggingContext():
1033+
self._deferred.callback(None)
10331034

10341035
def clear(self) -> None:
10351036
if self._deferred.called:
@@ -1042,26 +1043,15 @@ async def wait(self, timeout_seconds: float) -> bool:
10421043
if self.is_set():
10431044
return True
10441045

1045-
# Create a deferred that gets called in N seconds
1046-
sleep_deferred: "defer.Deferred[None]" = defer.Deferred()
1047-
call = self._clock.call_later(
1048-
timeout_seconds,
1049-
sleep_deferred.callback,
1050-
None,
1051-
)
1052-
10531046
try:
10541047
await make_deferred_yieldable(
1055-
defer.DeferredList(
1056-
[sleep_deferred, self._deferred],
1057-
fireOnOneCallback=True,
1058-
fireOnOneErrback=True,
1059-
consumeErrors=True,
1048+
timeout_deferred(
1049+
deferred=stop_cancellation(self._deferred),
1050+
timeout=timeout_seconds,
1051+
clock=self._clock,
10601052
)
10611053
)
1062-
finally:
1063-
# Cancel the sleep if we were woken up
1064-
if call.active():
1065-
call.cancel()
1054+
except defer.TimeoutError:
1055+
pass
10661056

10671057
return self.is_set()

synapse/util/background_queue.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
TypeVar,
2626
)
2727

28+
from twisted.internet import defer
29+
2830
from synapse.util.async_helpers import DeferredEvent
2931
from synapse.util.constants import MILLISECONDS_PER_SECOND
3032

@@ -110,6 +112,8 @@ async def _process_queue(self) -> None:
110112
item = self._queue.popleft()
111113
try:
112114
await self._callback(item)
115+
except defer.CancelledError:
116+
raise
113117
except Exception:
114118
logger.exception("Error processing background queue item")
115119

tests/util/test_background_queue.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
from twisted.internet.defer import Deferred
1919
from twisted.internet.testing import MemoryReactor
2020

21+
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
2122
from synapse.server import HomeServer
2223
from synapse.util.background_queue import BackgroundQueue
2324
from synapse.util.clock import Clock
2425

25-
from tests.unittest import HomeserverTestCase
26+
from tests.unittest import HomeserverTestCase, logcontext_clean
2627

2728

2829
class BackgroundQueueTests(HomeserverTestCase):
@@ -38,11 +39,14 @@ def prepare(
3839
timeout_ms=1000,
3940
)
4041

42+
@logcontext_clean
4143
def test_simple_call(self) -> None:
4244
"""Test that items added to the queue are processed."""
4345
# Register a deferred to be the return value of the callback.
4446
callback_result_deferred: Deferred[None] = Deferred()
45-
self._process_item_mock.side_effect = callback_result_deferred
47+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
48+
callback_result_deferred
49+
)
4650

4751
# Adding an item should cause the callback to be invoked.
4852
self.queue.add(1)
@@ -57,16 +61,20 @@ def test_simple_call(self) -> None:
5761

5862
# Once the first callback completes, the second item should be
5963
# processed.
60-
callback_result_deferred.callback(None)
64+
with PreserveLoggingContext():
65+
callback_result_deferred.callback(None)
6166
self._process_item_mock.assert_called_once_with(2)
6267

68+
@logcontext_clean
6369
def test_timeout(self) -> None:
6470
"""Test that the background process wakes up if its idle, and that it
6571
times out after being idle."""
6672

6773
# Register a deferred to be the return value of the callback.
6874
callback_result_deferred: Deferred[None] = Deferred()
69-
self._process_item_mock.side_effect = callback_result_deferred
75+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
76+
callback_result_deferred
77+
)
7078

7179
# Adding an item should cause the callback to be invoked.
7280
self.queue.add(1)
@@ -75,7 +83,8 @@ def test_timeout(self) -> None:
7583
self._process_item_mock.reset_mock()
7684

7785
# Let the callback complete.
78-
callback_result_deferred.callback(None)
86+
with PreserveLoggingContext():
87+
callback_result_deferred.callback(None)
7988

8089
# Advance the clock by less than the timeout, and add another item.
8190
self.reactor.advance(0.5)
@@ -84,12 +93,15 @@ def test_timeout(self) -> None:
8493

8594
# The callback should be invoked again.
8695
callback_result_deferred = Deferred()
87-
self._process_item_mock.side_effect = callback_result_deferred
96+
self._process_item_mock.side_effect = lambda _: make_deferred_yieldable(
97+
callback_result_deferred
98+
)
8899
self._process_item_mock.assert_called_once_with(2)
89100
self._process_item_mock.reset_mock()
90101

91102
# Let the callback complete.
92-
callback_result_deferred.callback(None)
103+
with PreserveLoggingContext():
104+
callback_result_deferred.callback(None)
93105

94106
# Advance the clock by more than the timeout.
95107
self.reactor.advance(1.5)

0 commit comments

Comments
 (0)