|
21 | 21 | from synapse.api.errors import ShadowBanError, SynapseError |
22 | 22 | from synapse.api.ratelimiting import Ratelimiter |
23 | 23 | from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME |
| 24 | +from synapse.http.site import SynapseRequest |
24 | 25 | from synapse.logging.context import make_deferred_yieldable |
25 | 26 | from synapse.logging.opentracing import set_tag |
26 | 27 | from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions |
|
29 | 30 | ) |
30 | 31 | from synapse.storage.databases.main.delayed_events import ( |
31 | 32 | DelayedEventDetails, |
32 | | - DelayID, |
33 | 33 | EventType, |
34 | 34 | StateKey, |
35 | 35 | Timestamp, |
36 | | - UserLocalpart, |
37 | 36 | ) |
38 | 37 | from synapse.storage.databases.main.state_deltas import StateDelta |
39 | 38 | from synapse.types import ( |
@@ -399,96 +398,63 @@ def on_added(self, next_send_ts: int) -> None: |
399 | 398 | if self._next_send_ts_changed(next_send_ts): |
400 | 399 | self._schedule_next_at(next_send_ts) |
401 | 400 |
|
402 | | - async def cancel(self, requester: Requester, delay_id: str) -> None: |
| 401 | + async def cancel(self, request: SynapseRequest, delay_id: str) -> None: |
403 | 402 | """ |
404 | 403 | Cancels the scheduled delivery of the matching delayed event. |
405 | 404 |
|
406 | | - Args: |
407 | | - requester: The owner of the delayed event to act on. |
408 | | - delay_id: The ID of the delayed event to act on. |
409 | | -
|
410 | 405 | Raises: |
411 | 406 | NotFoundError: if no matching delayed event could be found. |
412 | 407 | """ |
413 | 408 | assert self._is_master |
414 | 409 | await self._delayed_event_mgmt_ratelimiter.ratelimit( |
415 | | - requester, |
416 | | - (requester.user.to_string(), requester.device_id), |
| 410 | + None, request.getClientAddress().host |
417 | 411 | ) |
418 | 412 | await make_deferred_yieldable(self._initialized_from_db) |
419 | 413 |
|
420 | | - next_send_ts = await self._store.cancel_delayed_event( |
421 | | - delay_id=delay_id, |
422 | | - user_localpart=requester.user.localpart, |
423 | | - ) |
| 414 | + next_send_ts = await self._store.cancel_delayed_event(delay_id) |
424 | 415 |
|
425 | 416 | if self._next_send_ts_changed(next_send_ts): |
426 | 417 | self._schedule_next_at_or_none(next_send_ts) |
427 | 418 |
|
428 | | - async def restart(self, requester: Requester, delay_id: str) -> None: |
| 419 | + async def restart(self, request: SynapseRequest, delay_id: str) -> None: |
429 | 420 | """ |
430 | 421 | Restarts the scheduled delivery of the matching delayed event. |
431 | 422 |
|
432 | | - Args: |
433 | | - requester: The owner of the delayed event to act on. |
434 | | - delay_id: The ID of the delayed event to act on. |
435 | | -
|
436 | 423 | Raises: |
437 | 424 | NotFoundError: if no matching delayed event could be found. |
438 | 425 | """ |
439 | 426 | assert self._is_master |
440 | 427 | await self._delayed_event_mgmt_ratelimiter.ratelimit( |
441 | | - requester, |
442 | | - (requester.user.to_string(), requester.device_id), |
| 428 | + None, request.getClientAddress().host |
443 | 429 | ) |
444 | 430 | await make_deferred_yieldable(self._initialized_from_db) |
445 | 431 |
|
446 | 432 | next_send_ts = await self._store.restart_delayed_event( |
447 | | - delay_id=delay_id, |
448 | | - user_localpart=requester.user.localpart, |
449 | | - current_ts=self._get_current_ts(), |
| 433 | + delay_id, self._get_current_ts() |
450 | 434 | ) |
451 | 435 |
|
452 | 436 | if self._next_send_ts_changed(next_send_ts): |
453 | 437 | self._schedule_next_at(next_send_ts) |
454 | 438 |
|
455 | | - async def send(self, requester: Requester, delay_id: str) -> None: |
| 439 | + async def send(self, request: SynapseRequest, delay_id: str) -> None: |
456 | 440 | """ |
457 | 441 | Immediately sends the matching delayed event, instead of waiting for its scheduled delivery. |
458 | 442 |
|
459 | | - Args: |
460 | | - requester: The owner of the delayed event to act on. |
461 | | - delay_id: The ID of the delayed event to act on. |
462 | | -
|
463 | 443 | Raises: |
464 | 444 | NotFoundError: if no matching delayed event could be found. |
465 | 445 | """ |
466 | 446 | assert self._is_master |
467 | | - # Use standard request limiter for sending delayed events on-demand, |
468 | | - # as an on-demand send is similar to sending a regular event. |
469 | | - await self._request_ratelimiter.ratelimit(requester) |
| 447 | + await self._delayed_event_mgmt_ratelimiter.ratelimit( |
| 448 | + None, request.getClientAddress().host |
| 449 | + ) |
470 | 450 | await make_deferred_yieldable(self._initialized_from_db) |
471 | 451 |
|
472 | | - event, next_send_ts = await self._store.process_target_delayed_event( |
473 | | - delay_id=delay_id, |
474 | | - user_localpart=requester.user.localpart, |
475 | | - ) |
| 452 | + event, next_send_ts = await self._store.process_target_delayed_event(delay_id) |
476 | 453 |
|
477 | 454 | if self._next_send_ts_changed(next_send_ts): |
478 | 455 | self._schedule_next_at_or_none(next_send_ts) |
479 | 456 |
|
480 | | - await self._send_event( |
481 | | - DelayedEventDetails( |
482 | | - delay_id=DelayID(delay_id), |
483 | | - user_localpart=UserLocalpart(requester.user.localpart), |
484 | | - room_id=event.room_id, |
485 | | - type=event.type, |
486 | | - state_key=event.state_key, |
487 | | - origin_server_ts=event.origin_server_ts, |
488 | | - content=event.content, |
489 | | - device_id=event.device_id, |
490 | | - ) |
491 | | - ) |
| 457 | + await self._send_event(event) |
492 | 458 |
|
493 | 459 | async def _send_on_timeout(self) -> None: |
494 | 460 | self._next_delayed_event_call = None |
@@ -611,9 +577,7 @@ async def _send_event( |
611 | 577 | finally: |
612 | 578 | # TODO: If this is a temporary error, retry. Otherwise, consider notifying clients of the failure |
613 | 579 | try: |
614 | | - await self._store.delete_processed_delayed_event( |
615 | | - event.delay_id, event.user_localpart |
616 | | - ) |
| 580 | + await self._store.delete_processed_delayed_event(event.delay_id) |
617 | 581 | except Exception: |
618 | 582 | logger.exception("Failed to delete processed delayed event") |
619 | 583 |
|
|
0 commit comments