Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 106 additions & 8 deletions synapse/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,53 +20,55 @@
#
#

import itertools
import logging
import os
import platform
import threading
from importlib import metadata
from typing import (
Callable,
Dict,
Generic,
Iterable,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
)

import attr
from pkg_resources import parse_version
from prometheus_client import (
CollectorRegistry,
Counter,
Gauge,
Histogram,
Metric,
generate_latest,
)
from prometheus_client.core import (
REGISTRY,
GaugeHistogramMetricFamily,
GaugeMetricFamily,
Timestamp,
Sample,
)

from twisted.python.threadpool import ThreadPool
from twisted.web.resource import Resource
from twisted.web.server import Request

# This module is imported for its side effects; flake8 needn't warn that it's unused.
import synapse.metrics._reactor_metrics # noqa: F401
from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
from synapse.metrics._types import Collector
from synapse.types import StrSequence
from synapse.util import SYNAPSE_VERSION

Check failure on line 71 in synapse/metrics/__init__.py

View workflow job for this annotation

GitHub Actions / lint

Ruff (I001)

synapse/metrics/__init__.py:23:1: I001 Import block is un-sorted or un-formatted

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -342,6 +344,85 @@
all_gauges[self.name] = self


class GaugeHistogramMetricFamilyWithLabels(Metric):
"""
Custom version of `GaugeHistogramMetricFamily` from `prometheus_client` that allows
specifying labels and label values.
Comment thread
devonh marked this conversation as resolved.
Outdated

A single gauge histogram and its samples.

For use by custom collectors.
"""

def __init__(
self,
*,
name: str,
documentation: str,
buckets: Optional[Sequence[Tuple[str, float]]] = None,
gsum_value: Optional[float] = None,
labelnames: StrSequence = (),
labelvalues: StrSequence = (),
unit: str = "",
):
Metric.__init__(self, name, documentation, "gaugehistogram", unit)

# Sanity check the number of label values matches the number of label names.
if len(labelvalues) != len(labelnames):
raise ValueError("Incorrect label count")

self._labelnames = tuple(labelnames)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are relying on the inheritance now to apply the labels, we should add a comment here to describe that risk / fragility.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an alternative, I've updated to use the super() constructor to set the internal label names field. This feels a bit better to use their stable API to get everything done.


# Create a gauge for each bucket.
if buckets is not None:
self.add_metric(
labelvalues=labelvalues, buckets=buckets, gsum_value=gsum_value
)

def add_metric(
self,
labelvalues: StrSequence,
buckets: Sequence[Tuple[str, float]],
gsum_value: Optional[float],
timestamp: Optional[Union[float, Timestamp]] = None,
) -> None:
"""Add a metric to the metric family.

Args:
labelvalues: A list of label values
buckets: A list of pairs of bucket names and values.
The buckets must be sorted, and +Inf present.
gsum_value: The sum value of the metric.
"""
for bucket, value in buckets:
self.samples.append(
Sample(
self.name + "_bucket",
dict(list(zip(self._labelnames, labelvalues)) + [("le", bucket)]),
value,
timestamp,
)
)
# +Inf is last and provides the count value.
self.samples.extend(
[
Sample(
self.name + "_gcount",
dict(zip(self._labelnames, labelvalues)),
buckets[-1][1],
timestamp,
),
# TODO: Handle None gsum_value correctly. Currently a None will fail exposition but is allowed here.
Sample(
self.name + "_gsum",
dict(zip(self._labelnames, labelvalues)),
gsum_value,
timestamp,
), # type: ignore
]
)


class GaugeBucketCollector(Collector):
"""Like a Histogram, but the buckets are Gauges which are updated atomically.

Expand All @@ -354,14 +435,17 @@
__slots__ = (
"_name",
"_documentation",
"_labelnames",
"_bucket_bounds",
"_metric",
)

def __init__(
self,
*,
name: str,
documentation: str,
labelnames: Optional[StrSequence],
buckets: Iterable[float],
registry: CollectorRegistry = REGISTRY,
):
Expand All @@ -375,6 +459,7 @@
"""
self._name = name
self._documentation = documentation
self._labelnames = labelnames

# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
Expand All @@ -386,7 +471,7 @@

# We initially set this to None. We won't report metrics until
# this has been initialised after a successful data update
self._metric: Optional[GaugeHistogramMetricFamily] = None
self._metric: Optional[GaugeHistogramMetricFamilyWithLabels] = None

registry.register(self)

Expand All @@ -395,15 +480,26 @@
if self._metric is not None:
yield self._metric

def update_data(self, values: Iterable[float]) -> None:
def update_data(self, values: Iterable[float], labels: StrSequence = ()) -> None:
"""Update the data to be reported by the metric

The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.

Args:
values
labels
"""
self._metric = self._values_to_metric(values)
self._metric = self._values_to_metric(values, labels)

def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
def _values_to_metric(
self, values: Iterable[float], labels: StrSequence = ()
) -> GaugeHistogramMetricFamilyWithLabels:
"""
Args:
values
labels
"""
total = 0.0
bucket_values = [0 for _ in self._bucket_bounds]

Expand All @@ -421,9 +517,11 @@
# that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)

return GaugeHistogramMetricFamily(
self._name,
self._documentation,
return GaugeHistogramMetricFamilyWithLabels(
name=self._name,
documentation=self._documentation,
labelnames=self._labelnames,
labelvalues=labels,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
Expand Down
18 changes: 11 additions & 7 deletions synapse/storage/databases/main/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import time
from typing import TYPE_CHECKING, Dict, List, Tuple, cast

from synapse.metrics import GaugeBucketCollector
from synapse.metrics import SERVER_NAME_LABEL, GaugeBucketCollector
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
Expand All @@ -42,9 +42,10 @@

# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
name="synapse_forward_extremities",
documentation="Number of rooms on the server with the given number of forward extremities"
" or fewer",
labelnames=[SERVER_NAME_LABEL],
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
)

Expand All @@ -54,9 +55,10 @@
# we could remove from state resolution by reducing the graph to a single
# forward extremity.
_excess_state_events_collecter = GaugeBucketCollector(
"synapse_excess_extremity_events",
"Number of rooms on the server with the given number of excess extremity "
name="synapse_excess_extremity_events",
documentation="Number of rooms on the server with the given number of excess extremity "
"events, or fewer",
labelnames=[SERVER_NAME_LABEL],
buckets=[0] + [1 << n for n in range(12)],
)

Expand Down Expand Up @@ -100,10 +102,12 @@ def fetch(txn: LoggingTransaction) -> List[Tuple[int, int]]:

res = await self.db_pool.runInteraction("read_forward_extremities", fetch)

_extremities_collecter.update_data(x[0] for x in res)
_extremities_collecter.update_data(
values=(x[0] for x in res), labels=(self.server_name,)
)

_excess_state_events_collecter.update_data(
(x[0] - 1) * x[1] for x in res if x[1]
values=((x[0] - 1) * x[1] for x in res if x[1]), labels=(self.server_name,)
)

async def count_daily_e2ee_messages(self) -> int:
Expand Down
Loading