Skip to content

Commit f0e87e1

Browse files
committed
Add configurable metric_timestamp_source and shared HostContext for APM service map processor
- Add MetricTimestampSource enum (arrival_time / span_end_time) with @JsonCreator for config deserialization - Add metric_timestamp_source config field defaulting to arrival_time, which uses processing time at window evaluation to avoid late-span data loss in Prometheus/AMP - Split metric aggregation: Sum metrics use seconds-truncated timestamps, Histogram metrics use minutes-truncated timestamps - Create shared HostContext utility in data-prepper-api for consistent hostname resolution across components - Replace randomKey UUID with stable service_map_processor_host_id label derived from SHA-256 hash of hostname via HostContext - Add putCommonLabels() helper and HOST_ID_LABEL constant in ApmServiceMapMetricsUtil for consistent label construction - NodeOperationDetail events use minutes-truncated timestamps to preserve pre-existing behavior Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
1 parent b4304f5 commit f0e87e1

File tree

10 files changed

+445
-53
lines changed

10 files changed

+445
-53
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.model.host;
11+
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import java.net.InetAddress;
16+
17+
/**
18+
* Provides the hostname of the current Data Prepper instance.
19+
* This is intended as a shared utility so that hostname resolution
20+
* is consistent across all components (processors, source coordinators, etc.).
21+
*/
22+
public class HostContext {
23+
24+
private static final Logger LOG = LoggerFactory.getLogger(HostContext.class);
25+
private static final String UNKNOWN_HOST = "unknown";
26+
private static final String HOSTNAME = resolveHostname();
27+
28+
static String resolveHostname() {
29+
try {
30+
return InetAddress.getLocalHost().getHostName();
31+
} catch (final Exception e) {
32+
LOG.warn("Failed to resolve hostname, using '{}': {}", UNKNOWN_HOST, e.getMessage());
33+
return UNKNOWN_HOST;
34+
}
35+
}
36+
37+
/**
38+
* Returns the hostname of the current Data Prepper host.
39+
*
40+
* @return the hostname
41+
*/
42+
public static String getHostname() {
43+
return HOSTNAME;
44+
}
45+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*/
9+
10+
package org.opensearch.dataprepper.model.host;
11+
12+
import org.junit.jupiter.api.Test;
13+
import org.mockito.MockedStatic;
14+
15+
import java.net.InetAddress;
16+
import java.net.UnknownHostException;
17+
18+
import static org.hamcrest.CoreMatchers.equalTo;
19+
import static org.hamcrest.CoreMatchers.notNullValue;
20+
import static org.hamcrest.MatcherAssert.assertThat;
21+
import static org.hamcrest.Matchers.not;
22+
import static org.hamcrest.Matchers.emptyString;
23+
import static org.mockito.Mockito.mockStatic;
24+
25+
class HostContextTest {
26+
27+
@Test
28+
void getHostname_returns_non_null_non_empty_value() {
29+
final String hostname = HostContext.getHostname();
30+
assertThat(hostname, notNullValue());
31+
assertThat(hostname, not(emptyString()));
32+
}
33+
34+
@Test
35+
void getHostname_returns_consistent_value() {
36+
final String first = HostContext.getHostname();
37+
final String second = HostContext.getHostname();
38+
assertThat(first, equalTo(second));
39+
}
40+
41+
@Test
42+
void getHostname_matches_InetAddress_hostname() throws UnknownHostException {
43+
final String expected = InetAddress.getLocalHost().getHostName();
44+
assertThat(HostContext.getHostname(), equalTo(expected));
45+
}
46+
47+
@Test
48+
void resolveHostname_returns_valid_hostname() throws UnknownHostException {
49+
final String hostname = HostContext.resolveHostname();
50+
assertThat(hostname, equalTo(InetAddress.getLocalHost().getHostName()));
51+
}
52+
53+
@Test
54+
void resolveHostname_returns_unknown_when_hostname_cannot_be_resolved() {
55+
try (final MockedStatic<InetAddress> inetAddressMock = mockStatic(InetAddress.class)) {
56+
inetAddressMock.when(InetAddress::getLocalHost)
57+
.thenThrow(new UnknownHostException("test exception"));
58+
59+
assertThat(HostContext.resolveHostname(), equalTo("unknown"));
60+
}
61+
}
62+
63+
@Test
64+
void constructor_can_be_created() {
65+
final HostContext hostContext = new HostContext();
66+
assertThat(hostContext, notNullValue());
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
package org.opensearch.dataprepper.plugins.processor.otel_apm_service_map;
12+
13+
import com.fasterxml.jackson.annotation.JsonCreator;
14+
15+
import java.util.Arrays;
16+
import java.util.Map;
17+
import java.util.stream.Collectors;
18+
19+
public enum MetricTimestampSource {
20+
ARRIVAL_TIME("arrival_time"),
21+
SPAN_END_TIME("span_end_time");
22+
23+
private static final Map<String, MetricTimestampSource> OPTIONS_MAP = Arrays.stream(MetricTimestampSource.values())
24+
.collect(Collectors.toMap(
25+
value -> value.option,
26+
value -> value
27+
));
28+
29+
private final String option;
30+
31+
MetricTimestampSource(final String option) {
32+
this.option = option;
33+
}
34+
35+
public String getOption() {
36+
return option;
37+
}
38+
39+
@JsonCreator
40+
public static MetricTimestampSource fromOptionValue(final String option) {
41+
return OPTIONS_MAP.get(option);
42+
}
43+
}

data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@
4545
import org.slf4j.LoggerFactory;
4646

4747
import java.io.File;
48-
import java.net.InetAddress;
48+
import org.opensearch.dataprepper.model.host.HostContext;
49+
50+
import java.nio.charset.StandardCharsets;
51+
import java.security.MessageDigest;
4952
import java.time.Clock;
5053
import java.time.Duration;
5154
import java.time.Instant;
@@ -73,7 +76,6 @@ public class OTelApmServiceMapProcessor extends AbstractProcessor<Record<Event>,
7376

7477
private static final String SPANS_DB_SIZE = "spansDbSize";
7578
private static final String SPANS_DB_COUNT = "spansDbCount";
76-
private static final String HOST_ID = resolveHostId();
7779

7880
private static final Logger LOG = LoggerFactory.getLogger(OTelApmServiceMapProcessor.class);
7981
private static final String EVENT_TYPE_OTEL_APM_SERVICE_MAP = "SERVICE_MAP";
@@ -95,7 +97,9 @@ public class OTelApmServiceMapProcessor extends AbstractProcessor<Record<Event>,
9597
private static Clock clock;
9698

9799
private final int thisProcessorId;
100+
private final String hostId;
98101
private final List<String> groupByAttributes;
102+
private final MetricTimestampSource metricTimestampSource;
99103
private final EventFactory eventFactory;
100104

101105
@DataPrepperPluginConstructor
@@ -110,7 +114,8 @@ public OTelApmServiceMapProcessor(
110114
pipelineDescription.getNumberOfProcessWorkers(),
111115
eventFactory,
112116
pluginMetrics,
113-
config.getGroupByAttributes());
117+
config.getGroupByAttributes(),
118+
config.getMetricTimestampSource());
114119
}
115120

116121
OTelApmServiceMapProcessor(final Duration windowDuration,
@@ -119,7 +124,8 @@ public OTelApmServiceMapProcessor(
119124
final int processWorkers,
120125
final EventFactory eventFactory,
121126
final PluginMetrics pluginMetrics) {
122-
this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics, Collections.emptyList());
127+
this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics,
128+
Collections.emptyList(), MetricTimestampSource.SPAN_END_TIME);
123129
}
124130

125131
OTelApmServiceMapProcessor(final Duration windowDuration,
@@ -129,9 +135,23 @@ public OTelApmServiceMapProcessor(
129135
final EventFactory eventFactory,
130136
final PluginMetrics pluginMetrics,
131137
final List<String> groupByAttributes) {
138+
this(windowDuration, databasePath, clock, processWorkers, eventFactory, pluginMetrics,
139+
groupByAttributes, MetricTimestampSource.SPAN_END_TIME);
140+
}
141+
142+
OTelApmServiceMapProcessor(final Duration windowDuration,
143+
final File databasePath,
144+
final Clock clock,
145+
final int processWorkers,
146+
final EventFactory eventFactory,
147+
final PluginMetrics pluginMetrics,
148+
final List<String> groupByAttributes,
149+
final MetricTimestampSource metricTimestampSource) {
132150
super(pluginMetrics);
133151

152+
this.hostId = resolveHostId();
134153
this.groupByAttributes = groupByAttributes != null ? Collections.unmodifiableList(groupByAttributes) : Collections.emptyList();
154+
this.metricTimestampSource = metricTimestampSource != null ? metricTimestampSource : MetricTimestampSource.ARRIVAL_TIME;
135155

136156
this.eventFactory = eventFactory;
137157
OTelApmServiceMapProcessor.clock = clock;
@@ -448,19 +468,23 @@ private Map<String, String> extractGroupByAttributes(final Span span) {
448468
}
449469

450470
/**
451-
* Get anchor timestamp from span's endTime, truncated to the specified unit.
452-
* Sum metrics use SECONDS to minimize collisions while keeping stable time series.
453-
* Histogram metrics use MINUTES to aggregate more samples for richer bucket distributions.
471+
* Get anchor timestamp for metrics, truncated to the specified unit.
472+
* When metric_timestamp_source is ARRIVAL_TIME, uses fallbackTime (clock.instant()).
473+
* When metric_timestamp_source is SPAN_END_TIME, uses the span's endTime field.
454474
*
455475
* @param spanStateData The span to extract timestamp from
456-
* @param fallbackTime Current system time to use if span endTime is null
476+
* @param fallbackTime Current system time to use as arrival time or if span endTime is null
457477
* @param truncationUnit The ChronoUnit to truncate the timestamp to
458478
* @return Instant truncated to the specified boundary
459479
*/
460480
private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, final Instant fallbackTime,
461481
final ChronoUnit truncationUnit) {
462-
Instant timestamp = fallbackTime; // Default to current system time
482+
if (metricTimestampSource == MetricTimestampSource.ARRIVAL_TIME) {
483+
return fallbackTime.truncatedTo(truncationUnit);
484+
}
463485

486+
// SPAN_END_TIME mode: parse span's endTime, fall back to system time
487+
Instant timestamp = fallbackTime;
464488
final String endTime = spanStateData.getEndTime();
465489
try {
466490
if (endTime != null && !endTime.isEmpty()) {
@@ -476,19 +500,17 @@ private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, fi
476500

477501
/**
478502
* Resolve a stable host identifier for this Data Prepper instance.
479-
* Uses a truncated SHA-256 hash of the hostname to ensure uniqueness
480-
* without revealing the actual hostname in emitted metrics.
481-
* Falls back to a random UUID if hostname resolution fails.
503+
* Uses a truncated SHA-256 hash of the hostname (from {@link HostContext})
504+
* to ensure uniqueness without revealing the actual hostname in emitted metrics.
482505
*/
483-
private static String resolveHostId() {
506+
private String resolveHostId() {
484507
try {
485-
final String hostname = InetAddress.getLocalHost().getHostName();
486-
final java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256");
487-
final byte[] hash = digest.digest(hostname.getBytes(java.nio.charset.StandardCharsets.UTF_8));
508+
final String hostname = HostContext.getHostname();
509+
final MessageDigest digest = MessageDigest.getInstance("SHA-256");
510+
final byte[] hash = digest.digest(hostname.getBytes(StandardCharsets.UTF_8));
488511
return Hex.encodeHexString(hash).substring(0, 16);
489-
} catch (Exception e) {
490-
LOG.warn("Failed to resolve hostname for trace_processor_host_id, using random UUID: {}", e.getMessage());
491-
return java.util.UUID.randomUUID().toString();
512+
} catch (final java.security.NoSuchAlgorithmException e) {
513+
throw new RuntimeException("SHA-256 algorithm not available", e);
492514
}
493515
}
494516

@@ -777,7 +799,7 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
777799
final Instant histAnchor = getAnchorTimestampFromSpan(clientSpan, currentTime, ChronoUnit.MINUTES);
778800

779801
final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail(
780-
sourceNode, targetNode, sourceOp, targetOp, sumAnchor);
802+
sourceNode, targetNode, sourceOp, targetOp, histAnchor);
781803

782804
final EventMetadata eventMetadata = new DefaultEventMetadata.Builder()
783805
.withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build();
@@ -792,7 +814,7 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
792814
if (decoration.getParentServerOperationName() != null) {
793815
ApmServiceMapMetricsUtil.generateMetricsForClientSpan(
794816
clientSpan, decoration, currentTime, sumStateByKey, histogramStateByKey,
795-
sumAnchor, histAnchor, HOST_ID);
817+
sumAnchor, histAnchor, hostId);
796818
}
797819
}
798820
}
@@ -805,7 +827,7 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
805827
final Instant histAnchor = getAnchorTimestampFromSpan(serverSpan, currentTime, ChronoUnit.MINUTES);
806828
ApmServiceMapMetricsUtil.generateMetricsForServerSpan(
807829
serverSpan, currentTime, sumStateByKey, histogramStateByKey,
808-
sumAnchor, histAnchor, HOST_ID);
830+
sumAnchor, histAnchor, hostId);
809831

810832
final ServerSpanDecoration decoration = traceData.getDecorations().getServerDecoration(serverSpan.getSpanId());
811833

@@ -819,7 +841,7 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
819841
final Operation sourceOp = new Operation(serverSpan.getOperationName());
820842

821843
final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail(
822-
sourceNode, null, sourceOp, null, sumAnchor);
844+
sourceNode, null, sourceOp, null, histAnchor);
823845

824846
final EventMetadata eventMetadata = new DefaultEventMetadata.Builder()
825847
.withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build();

data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessorConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ public class OTelApmServiceMapProcessorConfig {
4343
"when present on the span's resource attributes. Only applied to primary Service objects, not dependency services.")
4444
private List<String> groupByAttributes = Collections.emptyList();
4545

46+
@JsonProperty("metric_timestamp_source")
47+
@JsonPropertyDescription("The timestamp source for emitted metrics. 'arrival_time' (default) uses processing time " +
48+
"at window evaluation, avoiding late-span data loss in Prometheus/AMP. 'span_end_time' uses the span's endTime field.")
49+
private MetricTimestampSource metricTimestampSource = MetricTimestampSource.ARRIVAL_TIME;
50+
4651
public Duration getWindowDuration() {
4752
return windowDuration;
4853
}
@@ -54,4 +59,8 @@ public String getDbPath() {
5459
public List<String> getGroupByAttributes() {
5560
return groupByAttributes != null ? Collections.unmodifiableList(groupByAttributes) : Collections.emptyList();
5661
}
62+
63+
public MetricTimestampSource getMetricTimestampSource() {
64+
return metricTimestampSource;
65+
}
5766
}

0 commit comments

Comments
 (0)