Skip to content

Commit b436ef6

Browse files
committed
feat(influxdb3_server): include node_id label on exported metrics
Add `with_default_attributes` to `PrometheusTextEncoder` so the same label set can be applied to every reported observation. `handle_metrics` uses it to attach `node_id` to each series, sparing operators from relabeling on hostname. `node_modes` is more relevant to Enterprise and is left for a follow-up.
1 parent db092e1 commit b436ef6

7 files changed

Lines changed: 83 additions & 2 deletions

File tree

core/metric_exporters/src/lib.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ pub struct PrometheusTextEncoder<'a, W: Write> {
2929

3030
encoder: TextEncoder,
3131
writer: &'a mut W,
32+
33+
/// Labels prepended to every reported observation. Per-metric attributes
34+
/// must not reuse these label names — Prometheus rejects duplicates within
35+
/// a series.
36+
default_attributes: Attributes,
3237
}
3338

3439
impl<'a, W: Write> PrometheusTextEncoder<'a, W> {
@@ -37,8 +42,15 @@ impl<'a, W: Write> PrometheusTextEncoder<'a, W> {
3742
metric: None,
3843
encoder: TextEncoder::new(),
3944
writer,
45+
default_attributes: Attributes::from(&[]),
4046
}
4147
}
48+
49+
/// Set labels that will be added to every reported observation.
50+
pub fn with_default_attributes(mut self, attributes: Attributes) -> Self {
51+
self.default_attributes = attributes;
52+
self
53+
}
4254
}
4355

4456
impl<W: Write> metric::Reporter for PrometheusTextEncoder<'_, W> {
@@ -79,8 +91,9 @@ impl<W: Write> metric::Reporter for PrometheusTextEncoder<'_, W> {
7991
let mut metric = Metric::default();
8092

8193
metric.set_label(
82-
attributes
94+
self.default_attributes
8395
.iter()
96+
.chain(attributes.iter())
8497
.map(|(name, value)| {
8598
let mut pair = LabelPair::default();
8699
pair.set_name(name.to_string());
@@ -355,6 +368,29 @@ foo_total{tag1="value",tag2="value2"} 7
355368
assert_not_contains!(tracing_capture.to_string(), "error");
356369
}
357370

371+
#[test]
372+
fn test_encode_with_default_attributes() {
373+
let registry = Registry::new();
374+
let counter: Metric<U64Counter> = registry.register_metric("foo", "a counter metric");
375+
counter.recorder(&[("tag1", "value")]).inc(3);
376+
377+
let mut buffer = Vec::new();
378+
let mut default_attrs = Attributes::from(&[]);
379+
default_attrs.insert("node_id", "node-1".to_string());
380+
let mut encoder =
381+
PrometheusTextEncoder::new(&mut buffer).with_default_attributes(default_attrs);
382+
registry.report(&mut encoder);
383+
384+
let buffer = String::from_utf8(buffer).unwrap();
385+
let expected = r#"
386+
# HELP foo_total a counter metric
387+
# TYPE foo_total counter
388+
foo_total{node_id="node-1",tag1="value"} 3
389+
"#
390+
.trim_start();
391+
assert_eq!(&buffer, expected, "{buffer}");
392+
}
393+
358394
#[test]
359395
fn pushgateway_path_behaviour() {
360396
let client = PushGatewayClient::new(

influxdb3/src/commands/serve.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,6 +1118,7 @@ pub async fn command(config: Config, user_params: HashMap<String, String>) -> Re
11181118
trace_exporter,
11191119
trace_header_parser,
11201120
Arc::clone(&telemetry_store),
1121+
Arc::from(node_id.as_str()),
11211122
);
11221123

11231124
if config.without_auth {

influxdb3/tests/server/metrics.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
use crate::server::TestServer;
2+
3+
#[tokio::test]
4+
async fn metrics_endpoint_includes_node_id_label() {
5+
let server = TestServer::spawn().await;
6+
let body = server
7+
.http_client()
8+
.get(format!("{}/metrics", server.client_addr()))
9+
.send()
10+
.await
11+
.unwrap()
12+
.text()
13+
.await
14+
.unwrap();
15+
16+
let series_lines: Vec<&str> = body
17+
.lines()
18+
.filter(|l| !l.is_empty() && !l.starts_with('#'))
19+
.collect();
20+
21+
assert!(
22+
!series_lines.is_empty(),
23+
"expected /metrics to expose at least one series, got:\n{body}"
24+
);
25+
26+
for line in &series_lines {
27+
assert!(
28+
line.contains(r#"node_id="test-server""#),
29+
"expected every series to carry node_id label, missing on:\n {line}\nfull body:\n{body}"
30+
);
31+
}
32+
}

influxdb3/tests/server/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod flight;
3737
mod gen1_lookback_guard;
3838
mod limits;
3939
mod logs;
40+
mod metrics;
4041
mod packages;
4142
mod ping;
4243
mod plugin_restriction;

influxdb3_server/src/http.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1204,7 +1204,10 @@ impl HttpApi {
12041204

12051205
fn handle_metrics(&self) -> Result<Response> {
12061206
let mut body: Vec<u8> = Default::default();
1207-
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body);
1207+
let mut default_attributes = metric::Attributes::from(&[]);
1208+
default_attributes.insert("node_id", self.common_state.node_id().to_string());
1209+
let mut reporter = metric_exporters::PrometheusTextEncoder::new(&mut body)
1210+
.with_default_attributes(default_attributes);
12081211
self.common_state.metrics.report(&mut reporter);
12091212

12101213
// Add required OpenMetrics EOF marker

influxdb3_server/src/lib.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub struct CommonServerState {
9696
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
9797
trace_header_parser: TraceHeaderParser,
9898
telemetry_store: Arc<TelemetryStore>,
99+
node_id: Arc<str>,
99100
}
100101

101102
impl CommonServerState {
@@ -105,16 +106,22 @@ impl CommonServerState {
105106
trace_exporter: Option<Arc<trace_exporters::export::AsyncExporter>>,
106107
trace_header_parser: TraceHeaderParser,
107108
telemetry_store: Arc<TelemetryStore>,
109+
node_id: Arc<str>,
108110
) -> Self {
109111
Self {
110112
catalog,
111113
metrics,
112114
trace_exporter,
113115
trace_header_parser,
114116
telemetry_store,
117+
node_id,
115118
}
116119
}
117120

121+
pub fn node_id(&self) -> &str {
122+
&self.node_id
123+
}
124+
118125
pub fn trace_exporter(&self) -> Option<Arc<trace_exporters::export::AsyncExporter>> {
119126
self.trace_exporter.clone()
120127
}

influxdb3_server/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1382,6 +1382,7 @@ async fn setup_server(start_time: i64) -> (String, CancellationToken, Arc<dyn Wr
13821382
None,
13831383
trace_header_parser,
13841384
Arc::clone(&sample_telem_store),
1385+
Arc::from("test-node"),
13851386
);
13861387
let query_executor = Arc::new(QueryExecutorImpl::new(CreateQueryExecutorArgs {
13871388
catalog: write_buffer.catalog(),

0 commit comments

Comments
 (0)