Skip to content

Commit be66455

Browse files
committed
Implement a Redis mode
Signed-off-by: Stefano Rivera <stefano@rivera.za.net>
1 parent c7e74f3 commit be66455

7 files changed

Lines changed: 627 additions & 4 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ jobs:
5959
uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 # v6.1.0
6060
with:
6161
python-version: ${{ matrix.python-version }}
62+
- name: Install Redis
63+
run: |
64+
apt-get -y install redis-server
6265
- name: Install dependencies
6366
run: |
6467
pip install --user tox "virtualenv<20.22.0"

prometheus_client/metrics.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ def remove(self, *labelvalues: Any) -> None:
207207
warnings.warn(
208208
"Removal of labels has not been implemented in multi-process mode yet.",
209209
UserWarning)
210+
if 'PROMETHEUS_REDIS_URL' in os.environ:
211+
warnings.warn(
212+
"Removal of labels has not been implemented in redis mode yet.",
213+
UserWarning)
210214

211215
if not self._labelnames:
212216
raise ValueError('No label names were set when constructing %s' % self)
@@ -226,6 +230,10 @@ def remove_by_labels(self, labels: dict[str, str]) -> None:
226230
"Removal of labels has not been implemented in multi-process mode yet.",
227231
UserWarning
228232
)
233+
if 'PROMETHEUS_REDIS_URL' in os.environ:
234+
warnings.warn(
235+
"Removal of labels has not been implemented in redis mode yet.",
236+
UserWarning)
229237

230238
if not self._labelnames:
231239
raise ValueError('No label names were set when constructing %s' % self)
@@ -258,6 +266,10 @@ def clear(self) -> None:
258266
warnings.warn(
259267
"Clearing labels has not been implemented in multi-process mode yet",
260268
UserWarning)
269+
if 'PROMETHEUS_REDIS_URL' in os.environ:
270+
warnings.warn(
271+
"Clearing of labels has not been implemented in redis mode yet.",
272+
UserWarning)
261273
with self._lock:
262274
self._metrics = {}
263275

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
from collections.abc import Iterable
2+
import json
3+
import os
4+
from urllib.parse import urlsplit
5+
6+
from .metrics_core import Metric
7+
from .registry import Collector, CollectorRegistry
8+
from .samples import Sample
9+
10+
11+
def redis_client():
12+
"""
13+
Create a redis client for PROMETHEUS_REDIS_URL.
14+
15+
Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form
16+
redis://localhost:6379/0
17+
"""
18+
from redis import Redis
19+
20+
parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"])
21+
assert parsed_url.scheme == "redis"
22+
assert parsed_url.path.startswith("/")
23+
assert parsed_url.path[1:].isdigit()
24+
port = parsed_url.port or 6379
25+
db = int(parsed_url.path[1:])
26+
return Redis(host=parsed_url.hostname, port=port, db=db)
27+
28+
29+
class RedisCollector(Collector):
30+
"""Collector for redis mode."""
31+
32+
def __init__(self, registry: CollectorRegistry | None) -> None:
33+
self._client = redis_client()
34+
if registry:
35+
registry.register(self)
36+
37+
def _iter_values(self) -> Iterable[tuple[bytes, str]]:
38+
cursor = 0
39+
while True:
40+
cursor, keys = self._client.scan(cursor=cursor, match="value:*")
41+
values = self._client.mget(keys)
42+
yield from zip(keys, values)
43+
if cursor == 0:
44+
break
45+
46+
def collect(self) -> Iterable[Metric]:
47+
metrics: dict[str, Metric] = {}
48+
histograms: set[str] = set()
49+
50+
for key, value_s in self._iter_values():
51+
# FIXME: Catch ValueError here, just in case?
52+
prefix_b, typ_b, mmap_key = key.split(b":", 2)
53+
assert prefix_b == b"value"
54+
typ = typ_b.decode()
55+
value = float(value_s)
56+
57+
metric_name, name, labels, help_text = json.loads(mmap_key)
58+
59+
metric = metrics.get(metric_name)
60+
if metric is None:
61+
metric = Metric(metric_name, help_text, typ)
62+
metrics[metric_name] = metric
63+
if typ in ("histogram", "gaugehistogram"):
64+
histograms.add(metric_name)
65+
66+
metric.add_sample(name, labels, value)
67+
68+
for name in histograms:
69+
self._fix_histogram(metrics[name])
70+
71+
return metrics.values()
72+
73+
def _fix_histogram(self, metric: Metric) -> None:
74+
"""
75+
Fix-up histogram samples.
76+
77+
Sort the buckets as expected by a client, and accumulate the values.
78+
The Histogram class is optimized to only increment the bucket that a
79+
value first appears in, not larger ones that would also contain it.
80+
"""
81+
by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {}
82+
83+
# Organize into lists of samples by label
84+
for sample in metric.samples:
85+
if "le" in sample.labels:
86+
labels_without_le = sample.labels.copy()
87+
labels_without_le.pop("le")
88+
key = (tuple(labels_without_le.values()), sample.name)
89+
else:
90+
key = (tuple(sample.labels.values()), sample.name)
91+
by_label.setdefault(key, []).append(sample)
92+
93+
metric.samples = []
94+
95+
for (labels, name), samples in sorted(by_label.items()):
96+
if name.endswith("_bucket"):
97+
# Sort buckets within each label
98+
samples.sort(key=lambda sample: float(sample.labels["le"]))
99+
100+
# Accumulate values into larger buckets
101+
value = 0.0
102+
for sample in samples:
103+
value += sample.value
104+
metric.samples.append(Sample(sample.name, sample.labels, value))
105+
106+
else:
107+
metric.samples.extend(samples)

prometheus_client/values.py

Lines changed: 96 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,48 @@
11
import os
22
from threading import Lock
3+
from typing import Any, Protocol
34
import warnings
45

56
from .mmap_dict import mmap_key, MmapedDict
7+
from .redis_collector import redis_client
8+
from .samples import Exemplar
69

710

8-
class MutexValue:
11+
class Value(Protocol):
12+
"""Prometheus Client Metric implementation."""
13+
14+
_multiprocess: bool
15+
16+
def __init__(
17+
self,
18+
typ: str,
19+
metric_name: str,
20+
name: str,
21+
labelnames: list[str],
22+
labelvalues: list[str],
23+
help_text: str,
24+
**kwargs: Any,
25+
) -> None:
26+
"""Initialize a metric."""
27+
28+
def inc(self, amount: float) -> None:
29+
"""Increment the metric by amount."""
30+
31+
def set(self, value: float, timestamp: float | None = None) -> None:
32+
"""Set the metric to value."""
33+
34+
def get(self) -> float:
35+
"""Get the current metric value."""
36+
37+
def set_exemplar(self, exemplar: Exemplar) -> None:
38+
"""Set an exemplar value."""
39+
exemplar # For vulture
40+
41+
def get_exemplar(self) -> Exemplar | None:
42+
"""Get any set exemplar value."""
43+
44+
45+
class MutexValue(Value):
946
"""A float protected by a mutex."""
1047

1148
_multiprocess = False
@@ -52,7 +89,7 @@ def MultiProcessValue(process_identifier=os.getpid):
5289
# This avoids the need to also have mutexes in __MmapDict.
5390
lock = Lock()
5491

55-
class MmapedValue:
92+
class MmapedValue(Value):
5693
"""A float protected by a mutex backed by a per-process mmaped file."""
5794

5895
_multiprocess = True
@@ -125,12 +162,67 @@ def get_exemplar(self):
125162
return MmapedValue
126163

127164

128-
def get_value_class():
165+
def RedisValue():
166+
"""
167+
A value implementation that stores data in a redis/valkey database.
168+
169+
Key scheme:
170+
* value:typ:MMAP_KEY
171+
"""
172+
client = redis_client()
173+
174+
class RedisValueImpl(Value):
175+
"""A float stored by redis."""
176+
177+
_multiprocess = False
178+
179+
def __init__(
180+
self,
181+
typ: str,
182+
metric_name: str,
183+
name: str,
184+
labelnames: list[str],
185+
labelvalues: list[str],
186+
help_text: str,
187+
**kwargs: Any,
188+
) -> None:
189+
key = mmap_key(metric_name, name, labelnames, labelvalues, help_text)
190+
self._key = f"value:{typ}:{key}"
191+
self._redis = client
192+
self._redis.setnx(self._key, 0.0)
193+
194+
def inc(self, amount: float) -> None:
195+
self._redis.incrbyfloat(self._key, amount)
196+
197+
def set(self, value: float, timestamp: float | None = None) -> None:
198+
# TODO: Implement timestamps
199+
self._redis.set(self._key, value)
200+
201+
def get(self) -> float:
202+
value = self._redis.get(self._key)
203+
if value is None:
204+
return 0.0
205+
return float(value)
206+
207+
def set_exemplar(self, exemplar: Exemplar) -> None:
208+
# TODO: Implement exemplars for redis.
209+
return
210+
211+
def get_exemplar(self) -> Exemplar | None:
212+
# TODO: Implement exemplars for redis.
213+
return None
214+
215+
return RedisValueImpl
216+
217+
218+
def get_value_class() -> type[Value]:
129219
# Should we enable multi-process mode?
130220
# This needs to be chosen before the first metric is constructed,
131221
# and as that may be in some arbitrary library the user/admin has
132222
# no control over we use an environment variable.
133-
if 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
223+
if "PROMETHEUS_REDIS_URL" in os.environ:
224+
return RedisValue()
225+
elif 'prometheus_multiproc_dir' in os.environ or 'PROMETHEUS_MULTIPROC_DIR' in os.environ:
134226
return MultiProcessValue()
135227
else:
136228
return MutexValue

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ aiohttp = [
5050
django = [
5151
"django",
5252
]
53+
redis = [
54+
"redis",
55+
]
5356

5457
[project.urls]
5558
Homepage = "https://github.com/prometheus/client_python"

0 commit comments

Comments
 (0)