Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
285f192
Fix `LaterGauge` metrics to collect from all servers (#18751)
MadLittleMods Aug 5, 2025
4dda2b1
First pass: keep track of hooks per server
MadLittleMods Aug 7, 2025
505263d
Some fix-ups
MadLittleMods Aug 7, 2025
46e4053
Add `server_name` to `register_hook`
MadLittleMods Aug 7, 2025
2e5ac44
Move metric clean-up to catch all servers
MadLittleMods Aug 7, 2025
2f25722
Update changelog number
MadLittleMods Aug 7, 2025
8b98536
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 7, 2025
a34122f
Try fix `synapse/_scripts/synapse_port_db.py` script
MadLittleMods Aug 7, 2025
b226f63
Cleanup homeservers when we `synapse/_scripts/generate_workers_map.py`
MadLittleMods Aug 7, 2025
75e7463
Fix lints
MadLittleMods Aug 7, 2025
68bb036
Use `instance_id` instead of `server_name` to track metrics
MadLittleMods Aug 7, 2025
87cc52f
Fix `BaseStreamTestCase`
MadLittleMods Aug 7, 2025
c7d1a78
Fix lints
MadLittleMods Aug 7, 2025
14aee2f
Fix multiple databases registering metrics
MadLittleMods Aug 8, 2025
5baa576
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 12, 2025
d7946f4
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 19, 2025
635dcce
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 27, 2025
59701ed
Create consistent `instance_id` in `MockHomeserver`
MadLittleMods Aug 27, 2025
309a72d
Note when `cleanup` should be called
MadLittleMods Aug 27, 2025
371aa51
`cleanup` when homeserver is garbage collected
MadLittleMods Aug 27, 2025
08755ae
Only yield the metric once when we `collect`
MadLittleMods Aug 27, 2025
31ad15a
Continue to return metrics that aren't broken
MadLittleMods Aug 27, 2025
b4f06b2
Add tests to ensure we get all metrics even if one hook throws exception
MadLittleMods Aug 27, 2025
9e07d37
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Sep 2, 2025
1c1d6c2
Don't double yield the same gauge
MadLittleMods Sep 2, 2025
595c174
Fix `generate_workers_map` script erroring out
MadLittleMods Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18791.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix `LaterGauge` metrics to collect from all servers.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should have another set of eyes go over the changes here?

-- @devonh, #18791 (review)

Sounds like a good idea given the new cleanup pattern and you were the same one that reviewed the original PR.

43 changes: 28 additions & 15 deletions synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""

import logging
from enum import Enum
from typing import (
TYPE_CHECKING,
Dict,
Expand Down Expand Up @@ -67,6 +68,25 @@
logger = logging.getLogger(__name__)


class QueueNames(str, Enum):
PRESENCE_MAP = "presence_map"
KEYED_EDU = "keyed_edu"
KEYED_EDU_CHANGED = "keyed_edu_changed"
EDUS = "edus"
POS_TIME = "pos_time"
PRESENCE_DESTINATIONS = "presence_destinations"


queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}

for queue_name in QueueNames:
queue_name_to_gauge_map[queue_name] = LaterGauge(
name=f"synapse_federation_send_queue_{queue_name.value}_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)


class FederationRemoteSendQueue(AbstractFederationSender):
"""A drop in replacement for FederationSender"""

Expand Down Expand Up @@ -111,23 +131,16 @@ def __init__(self, hs: "HomeServer"):
# we make a new function, so we need to make a new function so the inner
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name: str, queue: Sized) -> None:
LaterGauge(
name="synapse_federation_send_queue_%s_size" % (queue_name,),
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(queue)},
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(queue)},
)

for queue_name in [
"presence_map",
"keyed_edu",
"keyed_edu_changed",
"edus",
"pos_time",
"presence_destinations",
]:
register(queue_name, getattr(self, queue_name))
for queue_name in QueueNames:
queue = getattr(self, queue_name.value)
assert isinstance(queue, Sized)
register(queue_name, queue=queue)

self.clock.looping_call(self._clear_queue, 30 * 1000)

Expand Down
43 changes: 27 additions & 16 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_destinations_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_pdus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

transaction_queue_pending_edus_gauge = LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# Time (in s) to wait before trying to wake up destinations that have
# catch-up outstanding.
# Please note that rate limiting still applies, so while the loop is
Expand Down Expand Up @@ -398,34 +416,27 @@ def __init__(self, hs: "HomeServer"):
# map from destination to PerDestinationQueue
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

LaterGauge(
name="synapse_federation_transaction_queue_pending_destinations",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_destinations_gauge.register_hook(
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
},
)

LaterGauge(
name="synapse_federation_transaction_queue_pending_pdus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_pdus_gauge.register_hook(
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
},
)
LaterGauge(
name="synapse_federation_transaction_queue_pending_edus",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {
transaction_queue_pending_edus_gauge.register_hook(
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
Expand Down
28 changes: 18 additions & 10 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
)

presence_user_to_current_state_size_gauge = LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

presence_wheel_timer_size_gauge = LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
)

# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
# "currently_active"
LAST_ACTIVE_GRANULARITY = 60 * 1000
Expand Down Expand Up @@ -779,11 +791,9 @@ def __init__(self, hs: "HomeServer"):
EduTypes.PRESENCE, self.incoming_presence
)

LaterGauge(
name="synapse_handlers_presence_user_to_current_state_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
presence_user_to_current_state_size_gauge.register_hook(
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -882,11 +892,9 @@ def __init__(self, hs: "HomeServer"):
60 * 1000,
)

LaterGauge(
name="synapse_handlers_presence_wheel_timer_size",
desc="",
labelnames=[SERVER_NAME_LABEL],
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
presence_wheel_timer_size_gauge.register_hook(
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
4 changes: 2 additions & 2 deletions synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
return counts


LaterGauge(
in_flight_requests = LaterGauge(
name="synapse_http_server_in_flight_requests_count",
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
caller=_get_in_flight_counts,
)
in_flight_requests.register_hook(server_name=None, hook=_get_in_flight_counts)


class RequestMetrics:
Expand Down
116 changes: 82 additions & 34 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@

METRICS_PREFIX = "/_synapse/metrics"

all_gauges: Dict[str, Collector] = {}

HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")

SERVER_NAME_LABEL = "server_name"
Expand Down Expand Up @@ -163,42 +161,100 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
_server_name_to_hook_map: Dict[
Optional[str], # server_name
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
] = attr.ib(factory=dict, hash=False)
"""
Map from server_name to a callback. Each callback should either return a value (if
there are no labels for this metric), or dict mapping from a label tuple to a value
"""

def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]

try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
for hook in self._server_name_to_hook_map.values():
try:
hook_result = hook()
except Exception:
logger.exception(
"Exception running callback for LaterGauge(%s)", self.name
)
yield g
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated
return
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated

if isinstance(hook_result, (int, float)):
g.add_metric([], hook_result)
else:
for k, v in hook_result.items():
g.add_metric(k, v)

yield g
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated
return

if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
def register_hook(
self,
*,
server_name: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.

yield g
Args:
server_name: The homeserver name (`hs.hostname`) this hook is associated
with. This can be used later to lookup all hooks associated with a given
server name in order to unregister them. This should only be omitted
for global hooks that work across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same `server_name`.
existing_hook = self._server_name_to_hook_map.get(server_name)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for server_name={server_name}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)

def __attrs_post_init__(self) -> None:
self._register()
self._server_name_to_hook_map[server_name] = hook

def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
def unregister_hooks_for_server_name(self, server_name: str) -> None:
"""
Unregister all hooks associated with the given `server_name`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.

Args:
server_name: The homeserver name to unregister hooks for (`hs.hostname`).
"""
self._server_name_to_hook_map.pop(server_name, None)

def __attrs_post_init__(self) -> None:
REGISTRY.register(self)
all_gauges[self.name] = self

# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "

# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self


all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""


# `MetricsEntry` only makes sense when it is a `Protocol`,
Expand Down Expand Up @@ -250,7 +306,7 @@ def __init__(
# Protects access to _registrations
self._lock = threading.Lock()

self._register_with_collector()
REGISTRY.register(self)

def register(
self,
Expand Down Expand Up @@ -341,14 +397,6 @@ def collect(self) -> Iterable[Metric]:
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge

def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))

REGISTRY.register(self)
all_gauges[self.name] = self


class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
Expand Down
Loading
Loading