Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18751.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor `LaterGauge` metrics to be homeserver-scoped.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same changelog as #18714 so they merge

40 changes: 25 additions & 15 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""

import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -67,6 +68,25 @@
logger = logging.getLogger(__name__)


class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"


queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}

for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated
desc="",
labelnames=[SERVER_NAME_LABEL],
)


class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

Expand Down Expand Up @@ -111,23 +131,13 @@ def __init__(self, hs: "HomeServer"):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
)

for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
register(queue_name, queue=getattr(self, queue_name))

self.clock.looping_call(self._clear_queue, 30 * 1000)

Expand Down
46 changes: 27 additions & 19 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
Expand Down Expand Up @@ -398,38 +416,28 @@ def __init__(self, hs: "HomeServer"):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
},
}
)

LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
}
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
},
}
)

self._is_processing = False
Expand Down
26 changes: 16 additions & 10 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)

presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
Expand Down Expand Up @@ -779,11 +791,8 @@ def __init__(self, hs: "HomeServer"):
EduTypes.PRESENCE, self.incoming_presence
)

LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -882,11 +891,8 @@ def __init__(self, hs: "HomeServer"):
60 * 1000,
)

LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts


LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(_get_in_flight_counts)


class RequestMetrics:
Expand Down
48 changes: 31 additions & 17 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -163,29 +164,42 @@ class LaterGauge(Collector):
name: str
Comment thread
devonh marked this conversation as resolved.
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)

def collect(self) -> Iterable[Metric]:
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames)

try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
for hook in self._hooks:
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
return

if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)

if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g

yield g
def register_hook(
self,
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)

def __attrs_post_init__(self) -> None:
self._register()
Comment thread
devonh marked this conversation as resolved.
Outdated
Expand Down
42 changes: 24 additions & 18 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@
labelnames=["stream", SERVER_NAME_LABEL],
)


notifier_listeners_gauge = LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

notifier_rooms_gauge = LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
)
notifier_users_gauge = LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

T = TypeVar("T")


Expand Down Expand Up @@ -281,28 +299,16 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
)
}

LaterGauge(
name="synapse_notifier_listeners",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=count_listeners,
)

LaterGauge(
name="synapse_notifier_rooms",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
notifier_listeners_gauge.register_hook(count_listeners)
notifier_rooms_gauge.register_hook(
lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
},
}
)
LaterGauge(
name="synapse_notifier_users",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
)

def add_replication_callback(self, cb: Callable[[], None]) -> None:
Expand Down
28 changes: 17 additions & 11 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@
"synapse_replication_tcp_resource_user_ip_cache", "", labelnames=[SERVER_NAME_LABEL]
)

tcp_resource_total_connections_gauge = LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

tcp_command_queue_gauge = LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
)


# the type of the entries in _command_queues_by_stream
_StreamCommandQueue = Deque[
Expand Down Expand Up @@ -243,11 +255,8 @@ def __init__(self, hs: "HomeServer"):
# outgoing replication commands to.)
self._connections: List[IReplicationConnection] = []

LaterGauge(
name="synapse_replication_tcp_resource_total_connections",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self._connections)},
tcp_resource_total_connections_gauge.register_hook(
lambda: {(self.server_name,): len(self._connections)}
)

# When POSITION or RDATA commands arrive, we stick them in a queue and process
Expand All @@ -266,14 +275,11 @@ def __init__(self, hs: "HomeServer"):
# from that connection.
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}

LaterGauge(
name="synapse_replication_tcp_command_queue",
desc="Number of inbound RDATA/POSITION commands queued for processing",
labelnames=["stream_name", SERVER_NAME_LABEL],
caller=lambda: {
tcp_command_queue_gauge.register_hook(
lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
},
}
)

self._is_master = hs.config.worker.worker_app is None
Expand Down
Loading
Loading