Skip to content

Commit 6c091b1

Browse files
committed
Fill in get_retry_limiter
1 parent 03b7d9b commit 6c091b1

4 files changed

Lines changed: 95 additions & 22 deletions

File tree

synapse/federation/sender/per_destination_queue.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(
9191
transaction_manager: "synapse.federation.sender.TransactionManager",
9292
destination: str,
9393
):
94-
self._server_name = hs.hostname
94+
self.server_name = hs.hostname
9595
self._clock = hs.get_clock()
9696
self._storage_controllers = hs.get_storage_controllers()
9797
self._store = hs.get_datastores().main
@@ -323,7 +323,12 @@ async def _transaction_transmission_loop(self) -> None:
323323
# This will throw if we wouldn't retry. We do this here so we fail
324324
# quickly, but we will later check this again in the http client,
325325
# hence why we throw the result away.
326-
await get_retry_limiter(self._destination, self._clock, self._store)
326+
await get_retry_limiter(
327+
destination=self._destination,
328+
our_server_name=self.server_name,
329+
clock=self._clock,
330+
store=self._store,
331+
)
327332

328333
if self._catching_up:
329334
# we potentially need to catch-up first
@@ -567,7 +572,7 @@ async def _catch_up_transmission_loop(self) -> None:
567572
new_pdus = await filter_events_for_server(
568573
self._storage_controllers,
569574
self._destination,
570-
self._server_name,
575+
self.server_name,
571576
new_pdus,
572577
redact=False,
573578
filter_out_erased_senders=True,
@@ -614,7 +619,7 @@ def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
614619
# Send at most limit EDUs for receipts.
615620
for content in self._pending_receipt_edus[:limit]:
616621
yield Edu(
617-
origin=self._server_name,
622+
origin=self.server_name,
618623
destination=self._destination,
619624
edu_type=EduTypes.RECEIPT,
620625
content=content,
@@ -640,7 +645,7 @@ async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]:
640645
)
641646
edus = [
642647
Edu(
643-
origin=self._server_name,
648+
origin=self.server_name,
644649
destination=self._destination,
645650
edu_type=edu_type,
646651
content=content,
@@ -667,7 +672,7 @@ async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]
667672

668673
edus = [
669674
Edu(
670-
origin=self._server_name,
675+
origin=self.server_name,
671676
destination=self._destination,
672677
edu_type=EduTypes.DIRECT_TO_DEVICE,
673678
content=content,

synapse/http/matrixfederationclient.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -619,9 +619,10 @@ async def _send_request(
619619
raise FederationDeniedError(request.destination)
620620

621621
limiter = await synapse.util.retryutils.get_retry_limiter(
622-
request.destination,
623-
self.clock,
624-
self._store,
622+
destination=request.destination,
623+
our_server_name=self.server_name,
624+
clock=self.clock,
625+
store=self._store,
625626
backoff_on_404=backoff_on_404,
626627
ignore_backoff=ignore_backoff,
627628
notifier=self.hs.get_notifier(),

synapse/util/retryutils.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ def __init__(self, retry_last_ts: int, retry_interval: int, destination: str):
5959

6060

6161
async def get_retry_limiter(
62+
*,
6263
destination: str,
6364
our_server_name: str,
6465
clock: Clock,
@@ -84,7 +85,12 @@ async def get_retry_limiter(
8485
Example usage:
8586
8687
try:
87-
limiter = await get_retry_limiter(destination, clock, store)
88+
limiter = await get_retry_limiter(
89+
destination=destination,
90+
our_server_name=self.server_name,
91+
clock=clock,
92+
store=store,
93+
)
8894
with limiter:
8995
response = await do_request()
9096
except NotRetryingDestination:

tests/util/test_retryutils.py

Lines changed: 73 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,14 @@ class RetryLimiterTestCase(HomeserverTestCase):
3131
def test_new_destination(self) -> None:
3232
"""A happy-path case with a new destination and a successful operation"""
3333
store = self.hs.get_datastores().main
34-
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
34+
limiter = self.get_success(
35+
get_retry_limiter(
36+
destination="test_dest",
37+
our_server_name=self.hs.hostname,
38+
clock=self.clock,
39+
store=store,
40+
)
41+
)
3542

3643
# advance the clock a bit before making the request
3744
self.pump(1)
@@ -46,7 +53,14 @@ def test_limiter(self) -> None:
4653
"""General test case which walks through the process of a failing request"""
4754
store = self.hs.get_datastores().main
4855

49-
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
56+
limiter = self.get_success(
57+
get_retry_limiter(
58+
destination="test_dest",
59+
our_server_name=self.hs.hostname,
60+
clock=self.clock,
61+
store=store,
62+
)
63+
)
5064

5165
min_retry_interval_ms = (
5266
self.hs.config.federation.destination_min_retry_interval_ms
@@ -72,15 +86,28 @@ def test_limiter(self) -> None:
7286

7387
# now if we try again we should get a failure
7488
self.get_failure(
75-
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
89+
get_retry_limiter(
90+
destination="test_dest",
91+
our_server_name=self.hs.hostname,
92+
clock=self.clock,
93+
store=store,
94+
),
95+
NotRetryingDestination,
7696
)
7797

7898
#
7999
# advance the clock and try again
80100
#
81101

82102
self.pump(min_retry_interval_ms)
83-
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
103+
limiter = self.get_success(
104+
get_retry_limiter(
105+
destination="test_dest",
106+
our_server_name=self.hs.hostname,
107+
clock=self.clock,
108+
store=store,
109+
)
110+
)
84111

85112
self.pump(1)
86113
try:
@@ -108,7 +135,14 @@ def test_limiter(self) -> None:
108135
# one more go, with success
109136
#
110137
self.reactor.advance(min_retry_interval_ms * retry_multiplier * 2.0)
111-
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
138+
limiter = self.get_success(
139+
get_retry_limiter(
140+
destination="test_dest",
141+
our_server_name=self.hs.hostname,
142+
clock=self.clock,
143+
store=store,
144+
)
145+
)
112146

113147
self.pump(1)
114148
with limiter:
@@ -129,9 +163,10 @@ def test_notifier_replication(self) -> None:
129163

130164
limiter = self.get_success(
131165
get_retry_limiter(
132-
"test_dest",
133-
self.clock,
134-
store,
166+
destination="test_dest",
167+
our_server_name=self.hs.hostname,
168+
clock=self.clock,
169+
store=store,
135170
notifier=notifier,
136171
replication_client=replication_client,
137172
)
@@ -199,7 +234,14 @@ def test_max_retry_interval(self) -> None:
199234
self.hs.config.federation.destination_max_retry_interval_ms
200235
)
201236

202-
self.get_success(get_retry_limiter("test_dest", self.clock, store))
237+
self.get_success(
238+
get_retry_limiter(
239+
destination="test_dest",
240+
our_server_name=self.hs.hostname,
241+
clock=self.clock,
242+
store=store,
243+
)
244+
)
203245
self.pump(1)
204246

205247
failure_ts = self.clock.time_msec()
@@ -216,12 +258,25 @@ def test_max_retry_interval(self) -> None:
216258

217259
# Check it fails
218260
self.get_failure(
219-
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
261+
get_retry_limiter(
262+
destination="test_dest",
263+
our_server_name=self.hs.hostname,
264+
clock=self.clock,
265+
store=store,
266+
),
267+
NotRetryingDestination,
220268
)
221269

222270
# Get past retry_interval and we can try again, and still throw an error to continue the backoff
223271
self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1)
224-
limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
272+
limiter = self.get_success(
273+
get_retry_limiter(
274+
destination="test_dest",
275+
our_server_name=self.hs.hostname,
276+
clock=self.clock,
277+
store=store,
278+
)
279+
)
225280
self.pump(1)
226281
try:
227282
with limiter:
@@ -239,5 +294,11 @@ def test_max_retry_interval(self) -> None:
239294

240295
# Check it fails
241296
self.get_failure(
242-
get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
297+
get_retry_limiter(
298+
destination="test_dest",
299+
our_server_name=self.hs.hostname,
300+
clock=self.clock,
301+
store=store,
302+
),
303+
NotRetryingDestination,
243304
)

0 commit comments

Comments
 (0)