|
| 1 | +from collections.abc import Iterable |
| 2 | +import json |
| 3 | +import os |
| 4 | +from urllib.parse import urlsplit |
| 5 | + |
| 6 | +from .metrics_core import Metric |
| 7 | +from .registry import Collector, CollectorRegistry |
| 8 | +from .samples import Sample |
| 9 | + |
| 10 | + |
| 11 | +def redis_client(): |
| 12 | + """ |
| 13 | + Create a redis client for PROMETHEUS_REDIS_URL. |
| 14 | +
|
| 15 | + Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form |
| 16 | + redis://localhost:6379/0 |
| 17 | + """ |
| 18 | + from redis import Redis |
| 19 | + |
| 20 | + parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"]) |
| 21 | + assert parsed_url.scheme == "redis" |
| 22 | + assert parsed_url.path.startswith("/") |
| 23 | + assert parsed_url.path[1:].isdigit() |
| 24 | + port = parsed_url.port or 6379 |
| 25 | + db = int(parsed_url.path[1:]) |
| 26 | + return Redis(host=parsed_url.hostname, port=port, db=db) |
| 27 | + |
| 28 | + |
| 29 | +class RedisCollector(Collector): |
| 30 | + """Collector for redis mode.""" |
| 31 | + |
| 32 | + def __init__(self, registry: CollectorRegistry) -> None: |
| 33 | + self._client = redis_client() |
| 34 | + if registry: |
| 35 | + registry.register(self) |
| 36 | + |
| 37 | + def _iter_values(self) -> Iterable[tuple[bytes, str]]: |
| 38 | + cursor = 0 |
| 39 | + while True: |
| 40 | + cursor, keys = self._client.scan(cursor=cursor, match="value:*") |
| 41 | + values = self._client.mget(keys) |
| 42 | + yield from zip(keys, values) |
| 43 | + if cursor == 0: |
| 44 | + break |
| 45 | + |
| 46 | + def collect(self) -> Iterable[Metric]: |
| 47 | + metrics: dict[str, Metric] = {} |
| 48 | + histograms: set[str] = set() |
| 49 | + |
| 50 | + for key, value_s in self._iter_values(): |
| 51 | + # FIXME: Catch ValueError here, just in case? |
| 52 | + prefix_b, typ_b, mmap_key = key.split(b":", 2) |
| 53 | + assert prefix_b == b"value" |
| 54 | + typ = typ_b.decode() |
| 55 | + value = float(value_s) |
| 56 | + |
| 57 | + metric_name, name, labels, help_text = json.loads(mmap_key) |
| 58 | + |
| 59 | + metric = metrics.get(metric_name) |
| 60 | + if metric is None: |
| 61 | + metric = Metric(metric_name, help_text, typ) |
| 62 | + metrics[metric_name] = metric |
| 63 | + if typ in ("histogram", "gaugehistogram"): |
| 64 | + histograms.add(metric_name) |
| 65 | + |
| 66 | + metric.add_sample(name, labels, value) |
| 67 | + |
| 68 | + for name in histograms: |
| 69 | + self._fix_histogram(metrics[name]) |
| 70 | + |
| 71 | + return metrics.values() |
| 72 | + |
| 73 | + def _fix_histogram(self, metric: Metric) -> None: |
| 74 | + """ |
| 75 | + Fix-up histogram samples. |
| 76 | +
|
| 77 | + Sort the buckets as expected by a client, and accumulate the values. |
| 78 | + The Histogram class is optimized to only increment the bucket that a |
| 79 | + value first appears in, not larger ones that would also contain it. |
| 80 | + """ |
| 81 | + by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {} |
| 82 | + |
| 83 | + # Organize into lists of samples by label |
| 84 | + for sample in metric.samples: |
| 85 | + if "le" in sample.labels: |
| 86 | + labels_without_le = sample.labels.copy() |
| 87 | + labels_without_le.pop("le") |
| 88 | + key = (tuple(labels_without_le.values()), sample.name) |
| 89 | + else: |
| 90 | + key = (tuple(sample.labels.values()), sample.name) |
| 91 | + by_label.setdefault(key, []).append(sample) |
| 92 | + |
| 93 | + metric.samples = [] |
| 94 | + |
| 95 | + for (labels, name), samples in sorted(by_label.items()): |
| 96 | + if name.endswith("_bucket"): |
| 97 | + # Sort buckets within each label |
| 98 | + samples.sort(key=lambda sample: float(sample.labels["le"])) |
| 99 | + |
| 100 | + # Accumulate values into larger buckets |
| 101 | + value = 0.0 |
| 102 | + for sample in samples: |
| 103 | + value += sample.value |
| 104 | + metric.samples.append(Sample(sample.name, sample.labels, value)) |
| 105 | + |
| 106 | + else: |
| 107 | + metric.samples.extend(samples) |
0 commit comments