Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19138.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor speed up of processing of inbound replication.
Comment thread
erikjohnston marked this conversation as resolved.
50 changes: 15 additions & 35 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#
#
import logging
from collections import deque
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -71,6 +70,7 @@
DeviceListsStream,
ThreadSubscriptionsStream,
)
from synapse.util.async_helpers import BackgroundQueue

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -115,8 +115,8 @@


# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = deque[
tuple[Union[RdataCommand, PositionCommand], IReplicationConnection]
_StreamCommandQueueItem = tuple[
Union[RdataCommand, PositionCommand], IReplicationConnection
]


Expand Down Expand Up @@ -265,7 +265,12 @@ def __init__(self, hs: "HomeServer"):
# for each stream, a queue of commands that are awaiting processing, and the
# connection that they arrived on.
self._command_queues_by_stream = {
stream_name: _StreamCommandQueue() for stream_name in self._streams
stream_name: BackgroundQueue[_StreamCommandQueueItem](
hs,
"process-replication-data",
self._unsafe_process,
)
for stream_name in self._streams
}

# For each connection, the incoming stream names that have received a POSITION
Expand Down Expand Up @@ -349,38 +354,13 @@ def _add_command_to_stream_queue(
logger.error("Got %s for unknown stream: %s", cmd.NAME, stream_name)
return

queue.append((cmd, conn))

# if we're already processing this stream, there's nothing more to do:
# the new entry on the queue will get picked up in due course
if stream_name in self._processing_streams:
return

# fire off a background process to start processing the queue.
self.hs.run_as_background_process(
"process-replication-data",
self._unsafe_process_queue,
stream_name,
)

async def _unsafe_process_queue(self, stream_name: str) -> None:
"""Processes the command queue for the given stream, until it is empty
queue.add((cmd, conn))

Does not check if there is already a thread processing the queue, hence "unsafe"
"""
assert stream_name not in self._processing_streams

self._processing_streams.add(stream_name)
try:
queue = self._command_queues_by_stream.get(stream_name)
while queue:
cmd, conn = queue.popleft()
try:
await self._process_command(cmd, conn, stream_name)
except Exception:
logger.exception("Failed to handle command %s", cmd)
finally:
self._processing_streams.discard(stream_name)
async def _unsafe_process(self, item: _StreamCommandQueueItem) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this unsafe anymore?

If unsafe, we should maintain the docstring note from before.

If not, we should rename this function.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel like this still needs a bit more clarity on why unsafe.

Here is what it was changed to:

async def _unsafe_process_item(self, item: _StreamCommandQueueItem) -> None:
"""Process a single command from the stream queue.
This should only be called one at a time per stream, and is called from
the stream's BackgroundQueue.
"""

"Unsafe because this should be called ..." (and it's the caller's responsibility)

"""Process a single command from the stream queue"""
cmd, conn = item
stream_name = cmd.stream_name
await self._process_command(cmd, conn, stream_name)

async def _process_command(
self,
Expand Down
95 changes: 95 additions & 0 deletions synapse/util/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import typing
from contextlib import asynccontextmanager
from typing import (
TYPE_CHECKING,
Any,
AsyncContextManager,
AsyncIterator,
Expand Down Expand Up @@ -61,6 +62,9 @@
)
from synapse.util.clock import Clock

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

_T = TypeVar("_T")
Expand Down Expand Up @@ -1065,3 +1069,94 @@ async def wait(self, timeout_seconds: float) -> bool:
call.cancel()

return self.is_set()


class BackgroundQueue(Generic[T]):
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
"""A single-producer single-consumer async queue processing items in the
background.

This is optimised for the case where we receive many items, but processing
each one takes a short amount of time. In this case we don't want to pay the
overhead of a new background process each time. Instead, we spawn a
background process that will wait for new items to arrive.

If the background process has been idle for a while, it will exit, and a new
background process will be spawned when new items arrive.

Args:
hs: The homeserver.
name: The name of the background process.
callback: The async callback to process each item.
timeout_ms: The time in milliseconds to wait for new items before
exiting the background process.
"""

def __init__(
self,
hs: "HomeServer",
name: str,
callback: Callable[[T], Awaitable[None]],
timeout_ms: int = 1000,
) -> None:
self._hs = hs
self._name = name
self._callback = callback
self._timeout_ms = timeout_ms

# The queue of items to process.
self._queue: collections.deque[T] = collections.deque()

# Indicates if a background process is running, and if so whether there
# is new data in the queue.
self._event: Optional[DeferredEvent] = None
Comment thread
erikjohnston marked this conversation as resolved.
Outdated

def add(self, item: T) -> None:
"""Add an item into the queue."""

self._queue.append(item)
if self._event is None:
self._hs.run_as_background_process(self._name, self._process_queue)
else:
self._event.set()

async def _process_queue(self) -> None:
"""Process items in the queue until it is empty."""

# Make sure we're the only background process.
if self._event is not None:
self._event.set()
return
Comment thread
erikjohnston marked this conversation as resolved.
Outdated

self._event = DeferredEvent(self._hs.get_clock())

try:
while True:
# Clear the event before checking the queue.
self._event.clear()
Comment thread
erikjohnston marked this conversation as resolved.
Outdated

while self._queue:
item = self._queue.popleft()
try:
await self._callback(item)
except Exception:
logger.exception("Error processing background queue item")

# Wait for new data to arrive, timing out after a while to avoid
# keeping the background process alive forever.
#
# New data may have arrived and been processed while we were
# pulling from the queue, so this may return that there is new
# data immediately even though there isn't. That's fine, we'll
# just loop round, clear the event, recheck the queue, and then
# wait here again.
new_data = await self._event.wait(
timeout_seconds=self._timeout_ms / 1000
Comment thread
erikjohnston marked this conversation as resolved.
Outdated
)
if not new_data:
# Timed out waiting for new data, so exit the loop
break
finally:
self._event = None
Comment thread
erikjohnston marked this conversation as resolved.
Outdated

def __len__(self) -> int:
return len(self._queue)
Loading