Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
285f192
Fix `LaterGauge` metrics to collect from all servers (#18751)
MadLittleMods Aug 5, 2025
4dda2b1
First pass: keep track of hooks per server
MadLittleMods Aug 7, 2025
505263d
Some fix-ups
MadLittleMods Aug 7, 2025
46e4053
Add `server_name` to `register_hook`
MadLittleMods Aug 7, 2025
2e5ac44
Move metric clean-up to catch all servers
MadLittleMods Aug 7, 2025
2f25722
Update changelog number
MadLittleMods Aug 7, 2025
8b98536
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 7, 2025
a34122f
Try fix `synapse/_scripts/synapse_port_db.py` script
MadLittleMods Aug 7, 2025
b226f63
Cleanup homeservers when we `synapse/_scripts/generate_workers_map.py`
MadLittleMods Aug 7, 2025
75e7463
Fix lints
MadLittleMods Aug 7, 2025
68bb036
Use `instance_id` instead of `server_name` to track metrics
MadLittleMods Aug 7, 2025
87cc52f
Fix `BaseStreamTestCase`
MadLittleMods Aug 7, 2025
c7d1a78
Fix lints
MadLittleMods Aug 7, 2025
14aee2f
Fix multiple databases registering metrics
MadLittleMods Aug 8, 2025
5baa576
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 12, 2025
d7946f4
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 19, 2025
635dcce
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Aug 27, 2025
59701ed
Create consistent `instance_id` in `MockHomeserver`
MadLittleMods Aug 27, 2025
309a72d
Note when `cleanup` should be called
MadLittleMods Aug 27, 2025
371aa51
`cleanup` when homeserver is garbage collected
MadLittleMods Aug 27, 2025
08755ae
Only yield the metric once when we `collect`
MadLittleMods Aug 27, 2025
31ad15a
Continue to return metrics that aren't broken
MadLittleMods Aug 27, 2025
b4f06b2
Add tests to ensure we get all metrics even if one hook throws exception
MadLittleMods Aug 27, 2025
9e07d37
Merge branch 'develop' into madlittlemods/re-introduce-18751
MadLittleMods Sep 2, 2025
1c1d6c2
Don't double yield the same gauge
MadLittleMods Sep 2, 2025
595c174
Fix `generate_workers_map` script erroring out
MadLittleMods Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
3 changes: 2 additions & 1 deletion synapse/federation/send_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ def __init__(self, hs: "HomeServer"):
# changes. ARGH.
def register(queue_name: QueueNames, queue: Sized) -> None:
queue_name_to_gauge_map[queue_name].register_hook(
lambda: {(self.server_name,): len(queue)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(queue)},
)

for queue_name in QueueNames:
Expand Down
15 changes: 9 additions & 6 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,27 +417,30 @@ def __init__(self, hs: "HomeServer"):
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}

transaction_queue_pending_destinations_gauge.register_hook(
lambda: {
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
1
for d in self._per_destination_queues.values()
if d.transmission_loop_running
)
}
},
)
transaction_queue_pending_pdus_gauge.register_hook(
lambda: {
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
d.pending_pdu_count() for d in self._per_destination_queues.values()
)
}
},
)
transaction_queue_pending_edus_gauge.register_hook(
lambda: {
server_name=self.server_name,
hook=lambda: {
(self.server_name,): sum(
d.pending_edu_count() for d in self._per_destination_queues.values()
)
}
},
)

self._is_processing = False
Expand Down
6 changes: 4 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ def __init__(self, hs: "HomeServer"):
)

presence_user_to_current_state_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_current_state)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self.user_to_current_state)},
)

# The per-device presence state, maps user to devices to per-device presence state.
Expand Down Expand Up @@ -892,7 +893,8 @@ def __init__(self, hs: "HomeServer"):
)

presence_wheel_timer_size_gauge.register_hook(
lambda: {(self.server_name,): len(self.wheel_timer)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self.wheel_timer)},
)

# Used to handle sending of presence to newly joined users/servers
Expand Down
2 changes: 1 addition & 1 deletion synapse/http/request_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
desc="",
labelnames=["method", "servlet", SERVER_NAME_LABEL],
)
in_flight_requests.register_hook(_get_in_flight_counts)
in_flight_requests.register_hook(server_name=None, hook=_get_in_flight_counts)


class RequestMetrics:
Expand Down
68 changes: 60 additions & 8 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
Dict,
Generic,
Iterable,
List,
Mapping,
Optional,
Sequence,
Expand Down Expand Up @@ -162,20 +161,23 @@ class LaterGauge(Collector):
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# List of callbacks: each callback should either return a value (if there are no
# labels for this metric), or dict mapping from a label tuple to a value
_hooks: List[
_server_name_to_hook_map: Dict[
Optional[str], # server_name
Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
] = attr.ib(factory=list, hash=False)
],
] = attr.ib(factory=dict, hash=False)
"""
Map from server_name to a callback. Each callback should either return a value (if
there are no labels for this metric), or dict mapping from a label tuple to a value
"""

def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]

for hook in self._hooks:
for hook in self._server_name_to_hook_map.values():
try:
hook_result = hook()
except Exception:
Expand All @@ -195,15 +197,65 @@ def collect(self) -> Iterable[Metric]:

def register_hook(
self,
*,
server_name: Optional[str],
hook: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
],
) -> None:
self._hooks.append(hook)
"""
Register a callback/hook that will be called to generate a metric samples for
the gauge.

Args:
server_name: The homeserver name (`hs.hostname`) this hook is associated
with. This can be used later to lookup all hooks associated with a given
server name in order to unregister them. This should only be omitted
for global hooks that work across all homeservers.
hook: A callback that should either return a value (if there are no
labels for this metric), or dict mapping from a label tuple to a value
"""
# We shouldn't have multiple hooks registered for the same `server_name`.
existing_hook = self._server_name_to_hook_map.get(server_name)
assert existing_hook is None, (
f"LaterGauge(name={self.name}) hook already registered for server_name={server_name}. "
"This is likely a Synapse bug and you forgot to unregister the previous hooks for "
"the server (especially in tests)."
)

self._server_name_to_hook_map[server_name] = hook

def unregister_hooks_for_server_name(self, server_name: str) -> None:
"""
Unregister all hooks associated with the given `server_name`. This should be
called when a homeserver is shutdown to avoid extra hooks sitting around.

Args:
server_name: The homeserver name to unregister hooks for (`hs.hostname`).
"""
self._server_name_to_hook_map.pop(server_name, None)

def __attrs_post_init__(self) -> None:
REGISTRY.register(self)

# We shouldn't have multiple metrics with the same name. Typically, metrics
# should be created globally so you shouldn't be running into this and this will
# catch any stupid mistakes. The `REGISTRY.register(self)` call above will also
# raise an error if the metric already exists but to make things explicit, we'll
# also check here.
existing_gauge = all_later_gauges_to_clean_up_on_shutdown.get(self.name)
assert existing_gauge is None, f"LaterGauge(name={self.name}) already exists. "

# Keep track of the gauge so we can clean it up later.
all_later_gauges_to_clean_up_on_shutdown[self.name] = self


all_later_gauges_to_clean_up_on_shutdown: Dict[str, LaterGauge] = {}
"""
Track all `LaterGauge` instances so we can remove any associated hooks during homeserver
shutdown.
"""


# `MetricsEntry` only makes sense when it is a `Protocol`,
# but `Protocol` can't be used as a `TypeVar` bound.
Expand Down
12 changes: 8 additions & 4 deletions synapse/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,20 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
)
}

notifier_listeners_gauge.register_hook(count_listeners)
notifier_listeners_gauge.register_hook(
server_name=self.server_name, hook=count_listeners
)
notifier_rooms_gauge.register_hook(
lambda: {
server_name=self.server_name,
hook=lambda: {
(self.server_name,): count(
bool, list(self.room_to_user_streams.values())
)
}
},
)
notifier_users_gauge.register_hook(
lambda: {(self.server_name,): len(self.user_to_user_stream)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self.user_to_user_stream)},
)

def add_replication_callback(self, cb: Callable[[], None]) -> None:
Expand Down
8 changes: 5 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ def __init__(self, hs: "HomeServer"):
self._connections: List[IReplicationConnection] = []

tcp_resource_total_connections_gauge.register_hook(
lambda: {(self.server_name,): len(self._connections)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self._connections)},
)

# When POSITION or RDATA commands arrive, we stick them in a queue and process
Expand All @@ -276,10 +277,11 @@ def __init__(self, hs: "HomeServer"):
self._streams_by_connection: Dict[IReplicationConnection, Set[str]] = {}

tcp_command_queue_gauge.register_hook(
lambda: {
server_name=self.server_name,
hook=lambda: {
(stream_name, self.server_name): len(queue)
for stream_name, queue in self._command_queues_by_stream.items()
}
},
)

self._is_master = hs.config.worker.worker_app is None
Expand Down
20 changes: 12 additions & 8 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,10 @@ def replicate(self) -> None:
labelnames=["name", SERVER_NAME_LABEL],
)
pending_commands.register_hook(
lambda: {
server_name=None,
hook=lambda: {
(p.name, p.server_name): len(p.pending_commands) for p in connected_connections
}
},
)


Expand All @@ -548,9 +549,10 @@ def transport_buffer_size(protocol: BaseReplicationStreamProtocol) -> int:
labelnames=["name", SERVER_NAME_LABEL],
)
transport_send_buffer.register_hook(
lambda: {
server_name=None,
hook=lambda: {
(p.name, p.server_name): transport_buffer_size(p) for p in connected_connections
}
},
)


Expand All @@ -577,10 +579,11 @@ def transport_kernel_read_buffer_size(
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_send_buffer.register_hook(
lambda: {
server_name=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
}
},
)


Expand All @@ -590,8 +593,9 @@ def transport_kernel_read_buffer_size(
labelnames=["name", SERVER_NAME_LABEL],
)
tcp_transport_kernel_read_buffer.register_hook(
lambda: {
server_name=None,
hook=lambda: {
(p.name, p.server_name): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
}
},
)
3 changes: 2 additions & 1 deletion synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,8 @@ def __init__(

self.updates = BackgroundUpdater(hs, self)
background_update_status.register_hook(
lambda: {(self.server_name,): self.updates.get_status()},
server_name=self.server_name,
hook=lambda: {(self.server_name,): self.updates.get_status()},
)

self._previous_txn_total_time = 0.0
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def __init__(
self._count_known_servers,
)
federation_known_servers_gauge.register_hook(
lambda: {(self.server_name,): self._known_servers_count}
server_name=self.server_name,
hook=lambda: {(self.server_name,): self._known_servers_count},
)

@wrap_as_background_process("_count_known_servers")
Expand Down
10 changes: 6 additions & 4 deletions synapse/util/ratelimitutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,27 @@ def _get_counts_from_rate_limiter_instance(
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
sleep_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
server_name=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_sleep()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)
reject_affected_hosts_gauge = LaterGauge(
name="synapse_rate_limit_reject_affected_hosts",
desc="Number of hosts that had requests rejected",
labelnames=["rate_limiter_name", SERVER_NAME_LABEL],
)
reject_affected_hosts_gauge.register_hook(
lambda: _get_counts_from_rate_limiter_instance(
server_name=None,
hook=lambda: _get_counts_from_rate_limiter_instance(
lambda rate_limiter_instance: sum(
ratelimiter.should_reject()
for ratelimiter in rate_limiter_instance.ratelimiters.values()
)
)
),
)


Expand Down
3 changes: 2 additions & 1 deletion synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def __init__(self, hs: "HomeServer"):
)

running_tasks_gauge.register_hook(
lambda: {(self.server_name,): len(self._running_tasks)}
server_name=self.server_name,
hook=lambda: {(self.server_name,): len(self._running_tasks)},
)

def register_action(
Expand Down
4 changes: 2 additions & 2 deletions tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ def test_later_gauge_multiple_servers(self) -> None:
desc="",
labelnames=[SERVER_NAME_LABEL],
)
later_gauge.register_hook(lambda: {("hs1",): 1})
later_gauge.register_hook(lambda: {("hs2",): 2})
later_gauge.register_hook(server_name="hs1", hook=lambda: {("hs1",): 1})
later_gauge.register_hook(server_name="hs2", hook=lambda: {("hs2",): 2})

metrics_map = get_latest_metrics()

Expand Down
8 changes: 8 additions & 0 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
from synapse.handlers.auth import load_legacy_password_auth_providers
from synapse.http.site import SynapseRequest
from synapse.logging.context import ContextResourceUsage
from synapse.metrics import all_later_gauges_to_clean_up_on_shutdown
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
load_legacy_third_party_event_rules,
Expand Down Expand Up @@ -1215,6 +1216,13 @@ def cleanup() -> None:
# Register the cleanup hook
cleanup_func(cleanup)

def cleanup_metrics() -> None:
for later_gauge in all_later_gauges_to_clean_up_on_shutdown.values():
later_gauge.unregister_hooks_for_server_name(hs.config.server.server_name)

# Register the cleanup hook for metrics
cleanup_func(cleanup_metrics)
Comment thread
MadLittleMods marked this conversation as resolved.
Outdated

# bcrypt is far too slow to be doing in unit tests
# Need to let the HS build an auth handler and then mess with it
# because AuthHandler's constructor requires the HS, so we can't make one
Expand Down
Loading