-
Notifications
You must be signed in to change notification settings - Fork 522
Expand file tree
/
Copy path__init__.py
More file actions
700 lines (567 loc) · 23.2 KB
/
__init__.py
File metadata and controls
700 lines (567 loc) · 23.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2022 The Matrix.org Foundation C.I.C.
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
import itertools
import logging
import os
import platform
import threading
from importlib import metadata
from typing import (
Callable,
Dict,
Generic,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)
import attr
from pkg_resources import parse_version
from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
Metric,
generate_latest,
)
from prometheus_client.core import (
REGISTRY,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
)
from twisted.python.threadpool import ThreadPool
from twisted.web.resource import Resource
from twisted.web.server import Request
# This module is imported for its side effects; flake8 needn't warn that it's unused.
import synapse.metrics._reactor_metrics # noqa: F401
from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
from synapse.metrics._types import Collector
from synapse.types import StrSequence
from synapse.util import SYNAPSE_VERSION
logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
all_gauges: Dict[str, Collector] = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
SERVER_NAME_LABEL = "server_name"
"""
The `server_name` label is used to identify the homeserver that the metrics correspond
to. Because we support multiple instances of Synapse running in the same process and all
metrics are in a single global `REGISTRY`, we need to manually label any metrics.
In the case of a Synapse homeserver, this should be set to the homeserver name
(`hs.hostname`).
We're purposely not using the `instance` label for this purpose as that should be "The
<host>:<port> part of the target's URL that was scraped.". Also: "In Prometheus
terms, an endpoint you can scrape is called an *instance*, usually corresponding to a
single process." (source: https://prometheus.io/docs/concepts/jobs_instances/)
"""
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
"""
Content type of the latest text format for Prometheus metrics.
Pulled directly from the prometheus_client library.
"""
def _set_prometheus_client_use_created_metrics(new_value: bool) -> None:
"""
Sets whether prometheus_client should expose `_created`-suffixed metrics for
all gauges, histograms and summaries.
There is no programmatic way in the old versions of `prometheus_client` to disable
this without poking at internals; the proper way in the old `prometheus_client`
versions (> `0.14.0` < `0.18.0`) is to use an environment variable which
prometheus_client loads at import time. For versions > `0.18.0`, we can use the
dedicated `disable_created_metrics()`/`enable_created_metrics()`.
The motivation for disabling these `_created` metrics is that they're a waste of
space as they're not useful but they take up space in Prometheus. It's not the end
of the world if this doesn't work.
"""
import prometheus_client.metrics
if hasattr(prometheus_client.metrics, "_use_created"):
prometheus_client.metrics._use_created = new_value
# Just log an error for old versions that don't support disabling the unecessary
# metrics. It's not the end of the world if this doesn't work as it just means extra
# wasted space taken up in Prometheus but things keep working.
elif parse_version(metadata.version("prometheus_client")) < parse_version("0.14.0"):
logger.error(
"Can't disable `_created` metrics in prometheus_client (unsupported `prometheus_client` version, too old)"
)
# If the attribute doesn't exist on a newer version, this is a sign that the brittle
# hack is broken. We should consider updating the minimum version of
# `prometheus_client` to a version (> `0.18.0`) where we can use dedicated
# `disable_created_metrics()`/`enable_created_metrics()` functions.
else:
raise Exception(
"Can't disable `_created` metrics in prometheus_client (brittle hack broken?)"
)
# Set this globally so it applies wherever we generate/collect metrics
_set_prometheus_client_use_created_metrics(False)
class _RegistryProxy:
@staticmethod
def collect() -> Iterable[Metric]:
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
# A little bit nasty, but collect() above is static so a Protocol doesn't work.
# _RegistryProxy matches the signature of a CollectorRegistry instance enough
# for it to be usable in the contexts in which we use it.
# TODO Do something nicer about this.
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
@attr.s(slots=True, hash=True, auto_attribs=True, kw_only=True)
class LaterGauge(Collector):
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
name: str
desc: str
labelnames: Optional[StrSequence] = attr.ib(hash=False)
# callback: should either return a value (if there are no labels for this metric),
# or dict mapping from a label tuple to a value
caller: Callable[
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
]
def collect(self) -> Iterable[Metric]:
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
# (we don't enforce it here, one level up).
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
try:
calls = self.caller()
except Exception:
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
if isinstance(calls, (int, float)):
g.add_metric([], calls)
else:
for k, v in calls.items():
g.add_metric(k, v)
yield g
def __attrs_post_init__(self) -> None:
self._register()
def _register(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# `MetricsEntry` only makes sense when it is a `Protocol`,
# but `Protocol` can't be used as a `TypeVar` bound.
MetricsEntry = TypeVar("MetricsEntry")
class InFlightGauge(Generic[MetricsEntry], Collector):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name
desc
labels
sub_metrics: A list of sub metrics that the callbacks will update.
"""
def __init__(
self,
name: str,
desc: str,
labels: StrSequence,
sub_metrics: StrSequence,
):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class: Type[MetricsEntry] = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(default=0) for x in sub_metrics},
slots=True,
)
# Counts number of in flight blocks for a given set of label values
self._registrations: Dict[
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
] = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
"""
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(
self,
key: Tuple[str, ...],
callback: Callable[[MetricsEntry], None],
) -> None:
"""
Registers that we've exited a block with labels `key`.
Args:
key: A tuple of label values, which must match the order of the
`labels` given to the constructor.
callback
"""
assert len(key) == len(self.labels), (
f"Expected {len(self.labels)} labels in `key`, got {len(key)}: {key}"
)
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self) -> Iterable[Metric]:
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
# The decision to add `SERVER_NAME_LABEL` is from the `GaugeBucketCollector`
# usage itself (we don't enforce it here, one level up).
in_flight = GaugeMetricFamily( # type: ignore[missing-server-name-label]
self.name + "_total", self.desc, labels=self.labels
)
metrics_by_key = {}
# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)
for key in keys:
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(labels=key, value=len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)
yield in_flight
for name in self.sub_metrics:
# The decision to add `SERVER_NAME_LABEL` is from the `InFlightGauge` usage
# itself (we don't enforce it here, one level up).
gauge = GaugeMetricFamily( # type: ignore[missing-server-name-label]
"_".join([self.name, name]), "", labels=self.labels
)
for key, metrics in metrics_by_key.items():
gauge.add_metric(labels=key, value=getattr(metrics, name))
yield gauge
def _register_with_collector(self) -> None:
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering", self.name)
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
"""
Custom version of `GaugeHistogramMetricFamily` from `prometheus_client` that allows
specifying labels and label values.
A single gauge histogram and its samples.
For use by custom collectors.
"""
def __init__(
self,
*,
name: str,
documentation: str,
gsum_value: float,
buckets: Optional[Sequence[Tuple[str, float]]] = None,
labelnames: StrSequence = (),
labelvalues: StrSequence = (),
unit: str = "",
):
# Sanity check the number of label values matches the number of label names.
if len(labelvalues) != len(labelnames):
raise ValueError(
"The number of label values must match the number of label names"
)
# Call the super to validate and set the labelnames. We use this stable API
# instead of setting the internal `_labelnames` field directly.
super().__init__(
name=name,
documentation=documentation,
labels=labelnames,
# Since `GaugeHistogramMetricFamily` doesn't support supplying `labels` and
# `buckets` at the same time (artificial limitation), we will just set these
# as `None` and set up the buckets ourselves just below.
buckets=None,
gsum_value=None,
)
# Create a gauge for each bucket.
if buckets is not None:
self.add_metric(labels=labelvalues, buckets=buckets, gsum_value=gsum_value)
class GaugeBucketCollector(Collector):
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
The data is updated by calling `update_data` with an iterable of measurements.
We assume that the data is updated less frequently than it is reported to
Prometheus, and optimise for that case.
"""
__slots__ = (
"_name",
"_documentation",
"_labelnames",
"_bucket_bounds",
"_metric",
)
def __init__(
self,
*,
name: str,
documentation: str,
labelnames: Optional[StrSequence],
buckets: Iterable[float],
registry: CollectorRegistry = REGISTRY,
):
"""
Args:
name: base name of metric to be exported to Prometheus. (a _bucket suffix
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
"""
self._name = name
self._documentation = documentation
self._labelnames = labelnames if labelnames else ()
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
if self._bucket_bounds != sorted(self._bucket_bounds):
raise ValueError("Buckets not in sorted order")
if self._bucket_bounds[-1] != float("inf"):
self._bucket_bounds.append(float("inf"))
# We initially set this to None. We won't report metrics until
# this has been initialised after a successful data update
self._metric: Optional[GaugeHistogramMetricFamilyWithLabels] = None
registry.register(self)
def collect(self) -> Iterable[Metric]:
# Don't report metrics unless we've already collected some data
if self._metric is not None:
yield self._metric
def update_data(self, values: Iterable[float], labels: StrSequence = ()) -> None:
"""Update the data to be reported by the metric
The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
Args:
values
labels
"""
self._metric = self._values_to_metric(values, labels)
def _values_to_metric(
self, values: Iterable[float], labels: StrSequence = ()
) -> GaugeHistogramMetricFamilyWithLabels:
"""
Args:
values
labels
"""
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]
for v in values:
# assign each value to a bucket
for i, bound in enumerate(self._bucket_bounds):
if v <= bound:
bucket_values[i] += 1
break
# ... and increment the sum
total += v
# now, aggregate the bucket values so that they count the number of entries in
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)
# The decision to add `SERVER_NAME_LABEL` is from the `GaugeBucketCollector`
# usage itself (we don't enforce it here, one level up).
return GaugeHistogramMetricFamilyWithLabels( # type: ignore[missing-server-name-label]
name=self._name,
documentation=self._documentation,
labelnames=self._labelnames,
labelvalues=labels,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
gsum_value=total,
)
#
# Detailed CPU metrics
#
class CPUMetrics(Collector):
def __init__(self) -> None:
ticks_per_sec = 100
try:
# Try and get the system config
ticks_per_sec = os.sysconf("SC_CLK_TCK")
except (ValueError, TypeError, AttributeError):
pass
self.ticks_per_sec = ticks_per_sec
def collect(self) -> Iterable[Metric]:
if not HAVE_PROC_SELF_STAT:
return
with open("/proc/self/stat") as s:
line = s.read()
raw_stats = line.split(") ", 1)[1].split(" ")
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
user = GaugeMetricFamily("process_cpu_user_seconds_total", "") # type: ignore[missing-server-name-label]
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
yield user
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") # type: ignore[missing-server-name-label]
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
yield sys
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`.
REGISTRY.register(CPUMetrics()) # type: ignore[missing-server-name-label]
#
# Federation Metrics
#
sent_transactions_counter = Counter(
"synapse_federation_client_sent_transactions", "", labelnames=[SERVER_NAME_LABEL]
)
events_processed_counter = Counter(
"synapse_federation_client_events_processed", "", labelnames=[SERVER_NAME_LABEL]
)
event_processing_loop_counter = Counter(
"synapse_event_processing_loop_count",
"Event processing loop iterations",
labelnames=["name", SERVER_NAME_LABEL],
)
event_processing_loop_room_count = Counter(
"synapse_event_processing_loop_room_count",
"Rooms seen per event processing loop iteration",
labelnames=["name", SERVER_NAME_LABEL],
)
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = Gauge(
"synapse_event_processing_positions", "", labelnames=["name", SERVER_NAME_LABEL]
)
# Used to track the current max events stream position
event_persisted_position = Gauge(
"synapse_event_persisted_position", "", labelnames=[SERVER_NAME_LABEL]
)
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = Gauge(
"synapse_event_processing_last_ts", "", labelnames=["name", SERVER_NAME_LABEL]
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = Gauge(
"synapse_event_processing_lag", "", labelnames=["name", SERVER_NAME_LABEL]
)
event_processing_lag_by_event = Histogram(
"synapse_event_processing_lag_by_event",
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
labelnames=["name", SERVER_NAME_LABEL],
)
# Build info of the running server.
#
# This is a process-level metric, so it does not have the `SERVER_NAME_LABEL`. We
# consider this process-level because all Synapse homeservers running in the process
# will use the same Synapse version.
build_info = Gauge( # type: ignore[missing-server-name-label]
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
)
build_info.labels(
" ".join([platform.python_implementation(), platform.python_version()]),
SYNAPSE_VERSION,
" ".join([platform.system(), platform.release()]),
).set(1)
# 3PID send info
threepid_send_requests = Histogram(
"synapse_threepid_send_requests_with_tries",
documentation="Number of requests for a 3pid token by try count. Note if"
" there is a request with try count of 4, then there would have been one"
" each for 1, 2 and 3",
buckets=(1, 2, 3, 4, 5, 10),
labelnames=("type", "reason", SERVER_NAME_LABEL),
)
threadpool_total_threads = Gauge(
"synapse_threadpool_total_threads",
"Total number of threads currently in the threadpool",
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_working_threads = Gauge(
"synapse_threadpool_working_threads",
"Number of threads currently working in the threadpool",
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_min_threads = Gauge(
"synapse_threadpool_min_threads",
"Minimum number of threads configured in the threadpool",
labelnames=["name", SERVER_NAME_LABEL],
)
threadpool_total_max_threads = Gauge(
"synapse_threadpool_max_threads",
"Maximum number of threads configured in the threadpool",
labelnames=["name", SERVER_NAME_LABEL],
)
def register_threadpool(*, name: str, server_name: str, threadpool: ThreadPool) -> None:
"""
Add metrics for the threadpool.
Args:
name: The name of the threadpool, used to identify it in the metrics.
server_name: The homeserver name (used to label metrics) (this should be `hs.hostname`).
threadpool: The threadpool to register metrics for.
"""
threadpool_total_min_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set(threadpool.min)
threadpool_total_max_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set(threadpool.max)
threadpool_total_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set_function(lambda: len(threadpool.threads))
threadpool_total_working_threads.labels(
name=name, **{SERVER_NAME_LABEL: server_name}
).set_function(lambda: len(threadpool.working))
class MetricsResource(Resource):
"""
Twisted ``Resource`` that serves prometheus metrics.
"""
isLeaf = True
def __init__(self, registry: CollectorRegistry = REGISTRY):
self.registry = registry
def render_GET(self, request: Request) -> bytes:
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
response = generate_latest(self.registry)
request.setHeader(b"Content-Length", str(len(response)))
return response
__all__ = [
"Collector",
"MetricsResource",
"generate_latest",
"LaterGauge",
"InFlightGauge",
"GaugeBucketCollector",
"MIN_TIME_BETWEEN_GCS",
"install_gc_manager",
]