Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
c07d310
Move a couple metrics to homeserver specific `CollectorRegistry`
MadLittleMods May 15, 2025
9c6863e
Remove `metrics` listener type in favor of `http` listener with `metr…
MadLittleMods May 15, 2025
a6dd8a9
Refactor our custom `Collector` to accept a `registry`
MadLittleMods May 15, 2025
72b3bec
Refactor more custom `Collector` to be stubbed to use `hs.metrics_col…
MadLittleMods May 15, 2025
86f39b0
Partial `Counter` update
MadLittleMods May 16, 2025
3215897
Try organize some of the global metrics
MadLittleMods May 16, 2025
3de2c09
Move jemalloc metrics further along
MadLittleMods May 19, 2025
d3c23f8
Partial changes for `CacheManager`
MadLittleMods May 19, 2025
d3d228e
More cache updates (`LruCache` not finished)
MadLittleMods May 19, 2025
9943cd3
Fix return lints
MadLittleMods May 20, 2025
f49789b
Fix `LruCache` `@overload`
MadLittleMods May 20, 2025
4018aa4
Fix incorrect `@cache_in_self` underscore prefix in tests
MadLittleMods May 20, 2025
ccd0a71
Revert `LruCache` `CacheManager` changes for now
MadLittleMods May 20, 2025
1400e15
Fix most lints
MadLittleMods May 20, 2025
65080f8
More cleanup
MadLittleMods May 20, 2025
efa6018
Move `LaterGauge` further along
MadLittleMods May 20, 2025
e72d782
Fix current lints
MadLittleMods May 20, 2025
2521a17
Add changelog
MadLittleMods May 20, 2025
fc04be1
Merge branch 'develop' into madlittlemods/hs-specific-metrics
MadLittleMods May 20, 2025
4a5c2f8
Fix lints
MadLittleMods May 20, 2025
6314f6c
Revert "Fix lints"
MadLittleMods May 20, 2025
627a77b
Update `Measure` and `@measure_func`
MadLittleMods May 20, 2025
a707a25
Revert "Revert `LruCache` `CacheManager` changes for now"
MadLittleMods May 20, 2025
f68211b
WIP: `@cached` progress
MadLittleMods May 20, 2025
f75ea5c
Fix `DictionaryCache`
MadLittleMods May 20, 2025
356792f
Add to correct registry
MadLittleMods May 20, 2025
b4ed25e
Revert "WIP: `@cached` progress"
MadLittleMods May 20, 2025
3b12305
Reapply "Revert `LruCache` `CacheManager` changes for now"
MadLittleMods May 20, 2025
9f82f66
Fixup after `LruCache` changes revert
MadLittleMods May 20, 2025
57546bf
Fix in-flight gauges
MadLittleMods May 20, 2025
827b785
Reapply "WIP: `@cached` progress"
MadLittleMods May 21, 2025
e36c6f7
Revert "Reapply "Revert `LruCache` `CacheManager` changes for now""
MadLittleMods May 21, 2025
962d5ce
Revert "Fixup after `LruCache` changes revert"
MadLittleMods May 21, 2025
6a11964
Interim comment
MadLittleMods May 21, 2025
c5bbff9
WIP: Idea on managing caches
MadLittleMods May 21, 2025
e433a68
Revert "WIP: Idea on managing caches"
MadLittleMods May 21, 2025
5f5b349
Reapply "Fixup after `LruCache` changes revert"
MadLittleMods May 21, 2025
d4d59a4
Reapply "Reapply "Revert `LruCache` `CacheManager` changes for now""
MadLittleMods May 21, 2025
6e49db7
Revert "Reapply "WIP: `@cached` progress""
MadLittleMods May 21, 2025
00140fc
Merge branch 'develop' into madlittlemods/hs-specific-metrics
MadLittleMods Jun 11, 2025
ec16224
Remove unrelated comment from reviewing another PR
MadLittleMods Jun 13, 2025
1c51b74
Merge branch 'develop' into madlittlemods/hs-specific-metrics
MadLittleMods Jun 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18443.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Refactor metrics to collect in homeserver-specific registry.
6 changes: 4 additions & 2 deletions docs/metrics-howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@
type: http
x_forwarded: true
bind_addresses: ['::1', '127.0.0.1']

resources:
- names: [client, federation]
compress: false

# beginning of the new metrics listener
- port: 9000
type: metrics
type: http
bind_addresses: ['::1', '127.0.0.1']
resources:
- names: [metrics]
compress: false
```

1. Restart Synapse.
Expand Down
3 changes: 1 addition & 2 deletions schema/synapse-config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,7 @@ properties:
type: string
description: >-
The type of listener. Normally `http`, but other valid options are
[`manhole`](../../manhole.md) and
[`metrics`](../../metrics-howto.md).
[`manhole`](../../manhole.md).
enum:
- http
- manhole
Expand Down
1 change: 1 addition & 0 deletions synapse/api/auth/msc3861_delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ def __init__(self, hs: "HomeServer"):
# the old access token to continue working for a short time.
self._introspection_cache: ResponseCache[str] = ResponseCache(
self._clock,
hs.get_cache_manager(),
"token_introspection",
timeout_ms=120_000,
# don't log because the keys are access tokens
Expand Down
40 changes: 22 additions & 18 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@
from synapse.http.site import SynapseSite
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import init_tracer
from synapse.metrics import install_gc_manager, register_threadpool
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.metrics import CPUMetrics, install_gc_manager, register_threadpool
from synapse.metrics._gc import GCCounts, PyPyGCStats, running_on_pypy
from synapse.metrics._reactor_metrics import setup_reactor_metrics
from synapse.metrics.background_process_metrics import (
BackgroundProcessCollector,
wrap_as_background_process,
)
from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
from synapse.module_api.callbacks.third_party_event_rules_callbacks import (
load_legacy_third_party_event_rules,
Expand Down Expand Up @@ -178,7 +182,6 @@ def start_reactor(

def run() -> None:
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
Expand Down Expand Up @@ -283,20 +286,6 @@ async def wrapper() -> None:
reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))


def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
"""
Start Prometheus metrics server.
"""
from prometheus_client import start_http_server as start_http_server_prometheus

from synapse.metrics import RegistryProxy

for host in bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
_set_prometheus_client_use_created_metrics(False)
start_http_server_prometheus(port, addr=host, registry=RegistryProxy)


def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
"""
Sets whether prometheus_client should expose `_created`-suffixed metrics for
Expand Down Expand Up @@ -607,6 +596,7 @@ def run_sighup(*args: Any, **kwargs: Any) -> None:
)

setup_sentry(hs)
setup_global_metrics(hs)
setup_sdnotify(hs)

# If background tasks are running on the main process or this is the worker in
Expand Down Expand Up @@ -694,6 +684,20 @@ def setup_sentry(hs: "HomeServer") -> None:
global_scope.set_tag("worker_name", name)


def setup_global_metrics(hs: "HomeServer") -> None:
"""Set up global metrics for this homeserver.

This is called after the homeserver has been set up, but before any
listeners are started.
"""
CPUMetrics(registry=hs.metrics_collector_registry)
if running_on_pypy:
PyPyGCStats(registry=hs.metrics_collector_registry)
GCCounts(registry=hs.metrics_collector_registry)
BackgroundProcessCollector(registry=hs.metrics_collector_registry)
setup_reactor_metrics(registry=hs.metrics_collector_registry)


def setup_sdnotify(hs: "HomeServer") -> None:
"""Adds process state hooks to tell systemd what we are up to."""

Expand Down
23 changes: 4 additions & 19 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.server import JsonResource, OptionsResource
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
Expand Down Expand Up @@ -186,7 +186,9 @@ def _listen_http(self, listener_config: ListenerConfig) -> None:
for res in listener_config.http_options.resources:
for name in res.names:
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
resources[METRICS_PREFIX] = MetricsResource(
self.metrics_collector_registry
)
elif name == "client":
resource: Resource = ClientRestResource(self)

Expand Down Expand Up @@ -283,23 +285,6 @@ def start_listening(self) -> None:
raise ConfigError(
"Can not using a unix socket for manhole at this time."
)

elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
"Metrics listener configured, but enable_metrics is not True!"
)
else:
if isinstance(listener, TCPListenerConfig):
_base.listen_metrics(
listener.bind_addresses,
listener.port,
)
else:
raise ConfigError(
"Can not use a unix socket for metrics at this time."
)

else:
logger.warning("Unsupported listener type: %s", listener.type)

Expand Down
22 changes: 4 additions & 18 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
StaticResource,
)
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics import METRICS_PREFIX, MetricsResource
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.rest import ClientRestResource, admin
from synapse.rest.health import HealthResource
Expand Down Expand Up @@ -252,7 +252,9 @@ def _configure_named_resource(
resources[SERVER_KEY_PREFIX] = KeyResource(self)

if name == "metrics" and self.config.metrics.enable_metrics:
metrics_resource: Resource = MetricsResource(RegistryProxy)
metrics_resource: Resource = MetricsResource(
self.metrics_collector_registry
)
if compress:
metrics_resource = gz_wrap(metrics_resource)
resources[METRICS_PREFIX] = metrics_resource
Expand Down Expand Up @@ -286,22 +288,6 @@ def start_listening(self) -> None:
raise ConfigError(
"Can not use a unix socket for manhole at this time."
)
elif listener.type == "metrics":
if not self.config.metrics.enable_metrics:
logger.warning(
"Metrics listener configured, but enable_metrics is not True!"
)
else:
if isinstance(listener, TCPListenerConfig):
_base.listen_metrics(
listener.bind_addresses,
listener.port,
)
else:
raise ConfigError(
"Can not use a unix socket for metrics at this time."
)

else:
# this shouldn't happen, as the listener type should have been checked
# during parsing
Expand Down
77 changes: 44 additions & 33 deletions synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,6 @@

logger = logging.getLogger(__name__)

sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"],
)

failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"],
)

sent_events_counter = Counter(
"synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
)

sent_ephemeral_counter = Counter(
"synapse_appservice_api_sent_ephemeral",
"Number of ephemeral events sent to the AS",
["service"],
)

sent_todevice_counter = Counter(
"synapse_appservice_api_sent_todevice",
"Number of todevice messages sent to the AS",
["service"],
)

HOUR_IN_MS = 60 * 60 * 1000

Expand Down Expand Up @@ -130,7 +103,45 @@ def __init__(self, hs: "HomeServer"):
self.config = hs.config.appservice

self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
hs.get_clock(),
hs.get_cache_manager(),
"as_protocol_meta",
timeout_ms=HOUR_IN_MS,
)

self.sent_transactions_counter = Counter(
"synapse_appservice_api_sent_transactions",
"Number of /transactions/ requests sent",
["service"],
registry=hs.metrics_collector_registry,
)

self.failed_transactions_counter = Counter(
"synapse_appservice_api_failed_transactions",
"Number of /transactions/ requests that failed to send",
["service"],
registry=hs.metrics_collector_registry,
)

self.sent_events_counter = Counter(
"synapse_appservice_api_sent_events",
"Number of events sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)

self.sent_ephemeral_counter = Counter(
"synapse_appservice_api_sent_ephemeral",
"Number of ephemeral events sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)

self.sent_todevice_counter = Counter(
"synapse_appservice_api_sent_todevice",
"Number of todevice messages sent to the AS",
["service"],
registry=hs.metrics_collector_registry,
)

def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]:
Expand Down Expand Up @@ -395,10 +406,10 @@ async def push_bulk(
service.url,
[event.get("event_id") for event in events],
)
sent_transactions_counter.labels(service.id).inc()
sent_events_counter.labels(service.id).inc(len(serialized_events))
sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
self.sent_transactions_counter.labels(service.id).inc()
self.sent_events_counter.labels(service.id).inc(len(serialized_events))
self.sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
self.sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
return True
except CodeMessageException as e:
logger.warning(
Expand All @@ -417,7 +428,7 @@ async def push_bulk(
ex.args,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
failed_transactions_counter.labels(service.id).inc()
self.failed_transactions_counter.labels(service.id).inc()
return False

async def claim_client_keys(
Expand Down
19 changes: 13 additions & 6 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@

logger = logging.getLogger(__name__)

sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])


PDU_RETRY_TIME_MS = 1 * 60 * 1000

Expand Down Expand Up @@ -145,6 +143,7 @@ def __init__(self, hs: "HomeServer"):
self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
cache_manager=hs.get_cache_manager(),
max_len=1000,
expiry_ms=120 * 1000,
reset_expiry_on_get=False,
Expand All @@ -163,11 +162,19 @@ def __init__(self, hs: "HomeServer"):
] = ExpiringCache(
cache_name="get_room_hierarchy_cache",
clock=self._clock,
cache_manager=hs.get_cache_manager(),
max_len=1000,
expiry_ms=5 * 60 * 1000,
reset_expiry_on_get=False,
)

self.sent_queries_counter = Counter(
"synapse_federation_client_sent_queries",
"",
["type"],
registry=hs.metrics_collector_registry,
)

def _clear_tried_cache(self) -> None:
"""Clear pdu_destination_tried cache"""
now = self._clock.time_msec()
Expand Down Expand Up @@ -207,7 +214,7 @@ async def make_query(
Returns:
The JSON object from the response
"""
sent_queries_counter.labels(query_type).inc()
self.sent_queries_counter.labels(query_type).inc()

return await self.transport_layer.make_query(
destination,
Expand All @@ -229,7 +236,7 @@ async def query_client_keys(
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_device_keys").inc()
self.sent_queries_counter.labels("client_device_keys").inc()
return await self.transport_layer.query_client_keys(
destination, content, timeout
)
Expand All @@ -240,7 +247,7 @@ async def query_user_devices(
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.labels("user_devices").inc()
self.sent_queries_counter.labels("user_devices").inc()
return await self.transport_layer.query_user_devices(
destination, user_id, timeout
)
Expand All @@ -262,7 +269,7 @@ async def claim_client_keys(
Returns:
The JSON object from the response
"""
sent_queries_counter.labels("client_one_time_keys").inc()
self.sent_queries_counter.labels("client_one_time_keys").inc()

# Convert the query with counts into a stable and unstable query and check
# if attempting to claim more than 1 OTK.
Expand Down
Loading
Loading