-
Notifications
You must be signed in to change notification settings - Fork 522
Faster redis replication handling #19138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
74ce8c1
Add BackgroundQueue
erikjohnston 955c143
Use new BackgroundQueue
erikjohnston cae42be
Newsfile
erikjohnston 9f8a540
Move to separate file
erikjohnston 4c9db58
Comments and rename _event
erikjohnston b9a1611
Use constant
erikjohnston d04778b
Comment on why we set event to none
erikjohnston 4437b4a
Comment on unsafety
erikjohnston 75fc400
Lint
erikjohnston 6f2c518
Add tests
erikjohnston 98d8abc
Fix trial-olddeps
erikjohnston 4740afa
Fix lint
erikjohnston File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Minor speed up of processing of inbound replication. | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| # | ||
| # This file is licensed under the Affero General Public License (AGPL) version 3. | ||
| # | ||
| # Copyright (C) 2025 Element Creations Ltd | ||
| # | ||
| # This program is free software: you can redistribute it and/or modify | ||
| # it under the terms of the GNU Affero General Public License as | ||
| # published by the Free Software Foundation, either version 3 of the | ||
| # License, or (at your option) any later version. | ||
| # | ||
| # See the GNU Affero General Public License for more details: | ||
| # <https://www.gnu.org/licenses/agpl-3.0.html>. | ||
| # | ||
| # | ||
| # | ||
|
|
||
| import collections | ||
| import logging | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Awaitable, | ||
| Callable, | ||
| Generic, | ||
| Optional, | ||
| TypeVar, | ||
| ) | ||
|
|
||
| from synapse.util.async_helpers import DeferredEvent | ||
| from synapse.util.constants import MILLISECONDS_PER_SECOND | ||
|
|
||
| if TYPE_CHECKING: | ||
| from synapse.server import HomeServer | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
| class BackgroundQueue(Generic[T]): | ||
| """A single-producer single-consumer async queue processing items in the | ||
| background. | ||
|
|
||
| This is optimised for the case where we receive many items, but processing | ||
| each one takes a short amount of time. In this case we don't want to pay the | ||
| overhead of a new background process each time. Instead, we spawn a | ||
| background process that will wait for new items to arrive. | ||
|
|
||
| If the background process has been idle for a while, it will exit, and a new | ||
| background process will be spawned when new items arrive. | ||
|
|
||
| Args: | ||
| hs: The homeserver. | ||
| name: The name of the background process. | ||
| callback: The async callback to process each item. | ||
| timeout_ms: The time in milliseconds to wait for new items before | ||
| exiting the background process. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| hs: "HomeServer", | ||
| name: str, | ||
| callback: Callable[[T], Awaitable[None]], | ||
| timeout_ms: int = 1000, | ||
| ) -> None: | ||
| self._hs = hs | ||
| self._name = name | ||
| self._callback = callback | ||
| self._timeout_ms = timeout_ms | ||
|
|
||
| # The queue of items to process. | ||
| self._queue: collections.deque[T] = collections.deque() | ||
|
|
||
| # Indicates if a background process is running, and if so whether there | ||
| # is new data in the queue. Used to signal to an existing background | ||
| # process that there is new data added to the queue. | ||
| self._wakeup_event: Optional[DeferredEvent] = None | ||
|
|
||
| def add(self, item: T) -> None: | ||
| """Add an item into the queue.""" | ||
|
|
||
| self._queue.append(item) | ||
| if self._wakeup_event is None: | ||
| self._hs.run_as_background_process(self._name, self._process_queue) | ||
| else: | ||
| self._wakeup_event.set() | ||
|
|
||
| async def _process_queue(self) -> None: | ||
| """Process items in the queue until it is empty.""" | ||
|
|
||
| # Make sure we're the only background process. | ||
| if self._wakeup_event is not None: | ||
| # If there is already a background process then we signal it to wake | ||
| # up and exit. We do not want multiple background processes running | ||
| # at a time. | ||
| self._wakeup_event.set() | ||
| return | ||
|
Comment on lines
+93
to
+97
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This doesn't seem accurate. It will wakeup the other background process which will loop around to process more data. While this call exits to prevent multiple background processes from running. |
||
|
|
||
| self._wakeup_event = DeferredEvent(self._hs.get_clock()) | ||
|
|
||
| try: | ||
| while True: | ||
| # Clear the event before checking the queue. If we cleared after | ||
| # we run the risk of the wakeup signal racing with us checking | ||
| # the queue. (This can't really happen in Python due to the | ||
| # single threaded nature, but let's be a bit defensive anyway.) | ||
| self._wakeup_event.clear() | ||
|
|
||
| while self._queue: | ||
| item = self._queue.popleft() | ||
| try: | ||
| await self._callback(item) | ||
| except Exception: | ||
| logger.exception("Error processing background queue item") | ||
|
|
||
| # Wait for new data to arrive, timing out after a while to avoid | ||
| # keeping the background process alive forever. | ||
| # | ||
| # New data may have arrived and been processed while we were | ||
| # pulling from the queue, so this may return that there is new | ||
| # data immediately even though there isn't. That's fine, we'll | ||
| # just loop round, clear the event, recheck the queue, and then | ||
| # wait here again. | ||
| new_data = await self._wakeup_event.wait( | ||
| timeout_seconds=self._timeout_ms / MILLISECONDS_PER_SECOND | ||
| ) | ||
| if not new_data: | ||
| # Timed out waiting for new data, so exit the loop | ||
| break | ||
| finally: | ||
| # This background process is exiting, so clear the wakeup event to | ||
| # indicate that a new one should be started when new data arrives. | ||
| self._wakeup_event = None | ||
|
|
||
| # The queue must be empty here. | ||
| assert not self._queue | ||
|
|
||
| def __len__(self) -> int: | ||
| return len(self._queue) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,105 @@ | ||
| # | ||
| # This file is licensed under the Affero General Public License (AGPL) version 3. | ||
| # | ||
| # Copyright (C) 2025 Element Creations Ltd | ||
| # | ||
| # This program is free software: you can redistribute it and/or modify | ||
| # it under the terms of the GNU Affero General Public License as | ||
| # published by the Free Software Foundation, either version 3 of the | ||
| # License, or (at your option) any later version. | ||
| # | ||
| # See the GNU Affero General Public License for more details: | ||
| # <https://www.gnu.org/licenses/agpl-3.0.html>. | ||
| # | ||
|
|
||
|
|
||
| from unittest.mock import Mock | ||
|
|
||
| from twisted.internet.defer import Deferred | ||
| from twisted.internet.testing import MemoryReactor | ||
|
|
||
| from synapse.server import HomeServer | ||
| from synapse.util.bacckground_queue import BackgroundQueue | ||
| from synapse.util.clock import Clock | ||
|
|
||
| from tests.unittest import HomeserverTestCase | ||
|
|
||
|
|
||
| class BackgroundQueueTests(HomeserverTestCase): | ||
| def prepare( | ||
| self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer | ||
| ) -> None: | ||
| self._process_item_mock = Mock(spec_set=[]) | ||
|
|
||
| self.queue = BackgroundQueue[int]( | ||
| hs=homeserver, | ||
| name="test_queue", | ||
| callback=self._process_item_mock, | ||
| timeout_ms=1000, | ||
| ) | ||
|
|
||
| def test_simple_call(self) -> None: | ||
| """Test that items added to the queue are processed.""" | ||
| # Register a deferred to be the return value of the callback. | ||
| callback_result_deferred: Deferred[None] = Deferred() | ||
| self._process_item_mock.side_effect = callback_result_deferred | ||
|
|
||
| # Adding an item should cause the callback to be invoked. | ||
| self.queue.add(1) | ||
|
|
||
| self._process_item_mock.assert_called_once_with(1) | ||
| self._process_item_mock.reset_mock() | ||
|
|
||
| # Adding another item should not cause the callback to be invoked again | ||
| # until the previous one has completed. | ||
| self.queue.add(2) | ||
| self._process_item_mock.assert_not_called() | ||
|
|
||
| # Once the first callback completes, the second item should be | ||
| # processed. | ||
| callback_result_deferred.callback(None) | ||
| self._process_item_mock.assert_called_once_with(2) | ||
|
|
||
| def test_timeout(self) -> None: | ||
| """Test that the background process wakes up if its idle, and that it | ||
| times out after being idle.""" | ||
|
|
||
| # Register a deferred to be the return value of the callback. | ||
| callback_result_deferred: Deferred[None] = Deferred() | ||
| self._process_item_mock.side_effect = callback_result_deferred | ||
|
|
||
| # Adding an item should cause the callback to be invoked. | ||
| self.queue.add(1) | ||
|
|
||
| self._process_item_mock.assert_called_once_with(1) | ||
| self._process_item_mock.reset_mock() | ||
|
|
||
| # Let the callback complete. | ||
| callback_result_deferred.callback(None) | ||
|
|
||
| # Advance the clock by less than the timeout, and add another item. | ||
| self.reactor.advance(0.5) | ||
| self.assertIsNotNone(self.queue._wakeup_event) | ||
| self.queue.add(2) | ||
|
|
||
| # The callback should be invoked again. | ||
| callback_result_deferred = Deferred() | ||
| self._process_item_mock.side_effect = callback_result_deferred | ||
| self._process_item_mock.assert_called_once_with(2) | ||
| self._process_item_mock.reset_mock() | ||
|
|
||
| # Let the callback complete. | ||
| callback_result_deferred.callback(None) | ||
|
|
||
| # Advance the clock by more than the timeout. | ||
| self.reactor.advance(1.5) | ||
|
|
||
| # The background process should have exited, we check this by checking | ||
| # the internal wakeup event has been removed. | ||
| self.assertIsNone(self.queue._wakeup_event) | ||
|
|
||
| # Add another item. This should cause a new background process to be | ||
| # started. | ||
| self.queue.add(3) | ||
|
|
||
| self._process_item_mock.assert_called_once_with(3) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.