Skip to content

Commit b4304f5

Browse files
committed
Replace randomKey with stable trace_processor_host_id and split metric timestamps (#6672)
- Remove randomKey UUID label that caused cardinality explosion (new throwaway time series per metric emission) - Add stable trace_processor_host_id label (hostname) to prevent cross-host collisions in AMP without creating new series each window - Split timestamp granularity by metric type: - Sum metrics (request/error/fault): seconds-truncated timestamps for stable time series with minimal late-span collision risk - Histogram metrics (latency): minutes-truncated timestamps to aggregate more samples for richer bucket distributions Signed-off-by: Vamsi Manohar <reddyvam@amazon.com>
1 parent a4f0c9b commit b4304f5

File tree

3 files changed

+346
-279
lines changed

3 files changed

+346
-279
lines changed

data-prepper-plugins/otel-apm-service-map-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/otel_apm_service_map/OTelApmServiceMapProcessor.java

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@
4545
import org.slf4j.LoggerFactory;
4646

4747
import java.io.File;
48+
import java.net.InetAddress;
4849
import java.time.Clock;
4950
import java.time.Duration;
5051
import java.time.Instant;
52+
import java.time.temporal.ChronoUnit;
5153
import java.util.ArrayList;
5254
import java.util.Collection;
5355
import java.util.Collections;
@@ -71,6 +73,7 @@ public class OTelApmServiceMapProcessor extends AbstractProcessor<Record<Event>,
7173

7274
private static final String SPANS_DB_SIZE = "spansDbSize";
7375
private static final String SPANS_DB_COUNT = "spansDbCount";
76+
private static final String HOST_ID = resolveHostId();
7477

7578
private static final Logger LOG = LoggerFactory.getLogger(OTelApmServiceMapProcessor.class);
7679
private static final String EVENT_TYPE_OTEL_APM_SERVICE_MAP = "SERVICE_MAP";
@@ -375,7 +378,8 @@ private Collection<Record<Event>> processCurrentWindowSpans() {
375378

376379
final EphemeralSpanDecorations ephemeralDecorations = new EphemeralSpanDecorations();
377380

378-
final Map<MetricKey, MetricAggregationState> metricsStateByKey = new HashMap<>();
381+
final Map<MetricKey, MetricAggregationState> sumStateByKey = new HashMap<>();
382+
final Map<MetricKey, MetricAggregationState> histogramStateByKey = new HashMap<>();
379383

380384
final Map<String, Collection<SpanStateData>> previousSpansByTraceId = buildSpansByTraceIdMap(previousWindow);
381385
final Map<String, Collection<SpanStateData>> currentSpansByTraceId = buildSpansByTraceIdMap(currentWindow);
@@ -388,11 +392,11 @@ private Collection<Record<Event>> processCurrentWindowSpans() {
388392
if (!traceData.getProcessingSpans().isEmpty()) {
389393
decorateSpansInTraceWithEphemeralStorage(traceData);
390394

391-
apmEvents.addAll(generateNodeOperationDetailEvents(traceData, currentTime, metricsStateByKey));
395+
apmEvents.addAll(generateNodeOperationDetailEvents(traceData, currentTime, sumStateByKey, histogramStateByKey));
392396
}
393397
}
394398

395-
final List<JacksonMetric> metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(metricsStateByKey);
399+
final List<JacksonMetric> metrics = ApmServiceMapMetricsUtil.createMetricsFromAggregatedState(sumStateByKey, histogramStateByKey);
396400
metrics.sort(Comparator.comparing(JacksonMetric::getTime));
397401

398402
final List<Record<Event>> apmEventsSorted = new ArrayList<>();
@@ -444,13 +448,17 @@ private Map<String, String> extractGroupByAttributes(final Span span) {
444448
}
445449

446450
/**
447-
* Get anchor timestamp from span's endTime, truncated to minute boundary
451+
* Get anchor timestamp from span's endTime, truncated to the specified unit.
452+
* Sum metrics use SECONDS to minimize collisions while keeping stable time series.
453+
* Histogram metrics use MINUTES to aggregate more samples for richer bucket distributions.
448454
*
449455
* @param spanStateData The span to extract timestamp from
450456
* @param fallbackTime Current system time to use if span endTime is null
451-
* @return Instant truncated to the lower 1-minute boundary
457+
* @param truncationUnit The ChronoUnit to truncate the timestamp to
458+
* @return Instant truncated to the specified boundary
452459
*/
453-
private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, final Instant fallbackTime) {
460+
private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, final Instant fallbackTime,
461+
final ChronoUnit truncationUnit) {
454462
Instant timestamp = fallbackTime; // Default to current system time
455463

456464
final String endTime = spanStateData.getEndTime();
@@ -463,7 +471,25 @@ private Instant getAnchorTimestampFromSpan(final SpanStateData spanStateData, fi
463471
endTime, e.getMessage());
464472
}
465473

466-
return timestamp.truncatedTo(java.time.temporal.ChronoUnit.MINUTES);
474+
return timestamp.truncatedTo(truncationUnit);
475+
}
476+
477+
/**
478+
* Resolve a stable host identifier for this Data Prepper instance.
479+
* Uses a truncated SHA-256 hash of the hostname to ensure uniqueness
480+
* without revealing the actual hostname in emitted metrics.
481+
* Falls back to a random UUID if hostname resolution fails.
482+
*/
483+
private static String resolveHostId() {
484+
try {
485+
final String hostname = InetAddress.getLocalHost().getHostName();
486+
final java.security.MessageDigest digest = java.security.MessageDigest.getInstance("SHA-256");
487+
final byte[] hash = digest.digest(hostname.getBytes(java.nio.charset.StandardCharsets.UTF_8));
488+
return Hex.encodeHexString(hash).substring(0, 16);
489+
} catch (Exception e) {
490+
LOG.warn("Failed to resolve hostname for trace_processor_host_id, using random UUID: {}", e.getMessage());
491+
return java.util.UUID.randomUUID().toString();
492+
}
467493
}
468494

469495
/**
@@ -720,7 +746,8 @@ private void decorateServerSpansSecondPassWithEphemeralStorage(final ThreeWindow
720746
*/
721747
private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeWindowTraceDataWithDecorations traceData,
722748
final Instant currentTime,
723-
final Map<MetricKey, MetricAggregationState> metricsStateByKey) {
749+
final Map<MetricKey, MetricAggregationState> sumStateByKey,
750+
final Map<MetricKey, MetricAggregationState> histogramStateByKey) {
724751
final Collection<Record<Event>> events = new HashSet<>();
725752

726753
// Step 1: CLIENT spans — primary emission path
@@ -746,10 +773,11 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
746773
: null;
747774
final Operation targetOp = new Operation(decoration.getRemoteOperation());
748775

749-
final Instant anchorTimestamp = getAnchorTimestampFromSpan(clientSpan, currentTime);
776+
final Instant sumAnchor = getAnchorTimestampFromSpan(clientSpan, currentTime, ChronoUnit.SECONDS);
777+
final Instant histAnchor = getAnchorTimestampFromSpan(clientSpan, currentTime, ChronoUnit.MINUTES);
750778

751779
final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail(
752-
sourceNode, targetNode, sourceOp, targetOp, anchorTimestamp);
780+
sourceNode, targetNode, sourceOp, targetOp, sumAnchor);
753781

754782
final EventMetadata eventMetadata = new DefaultEventMetadata.Builder()
755783
.withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build();
@@ -763,7 +791,8 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
763791

764792
if (decoration.getParentServerOperationName() != null) {
765793
ApmServiceMapMetricsUtil.generateMetricsForClientSpan(
766-
clientSpan, decoration, currentTime, metricsStateByKey, anchorTimestamp);
794+
clientSpan, decoration, currentTime, sumStateByKey, histogramStateByKey,
795+
sumAnchor, histAnchor, HOST_ID);
767796
}
768797
}
769798
}
@@ -772,9 +801,11 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
772801
// Step 2: SERVER spans — metrics for all, leaf NodeOperationDetail for those with no CLIENT descendants
773802
for (SpanStateData serverSpan : traceData.getProcessingSpans()) {
774803
if (SPAN_KIND_SERVER.equals(serverSpan.getSpanKind())) {
775-
final Instant anchorTimestamp = getAnchorTimestampFromSpan(serverSpan, currentTime);
804+
final Instant sumAnchor = getAnchorTimestampFromSpan(serverSpan, currentTime, ChronoUnit.SECONDS);
805+
final Instant histAnchor = getAnchorTimestampFromSpan(serverSpan, currentTime, ChronoUnit.MINUTES);
776806
ApmServiceMapMetricsUtil.generateMetricsForServerSpan(
777-
serverSpan, currentTime, metricsStateByKey, anchorTimestamp);
807+
serverSpan, currentTime, sumStateByKey, histogramStateByKey,
808+
sumAnchor, histAnchor, HOST_ID);
778809

779810
final ServerSpanDecoration decoration = traceData.getDecorations().getServerDecoration(serverSpan.getSpanId());
780811

@@ -788,7 +819,7 @@ private Collection<Record<Event>> generateNodeOperationDetailEvents(final ThreeW
788819
final Operation sourceOp = new Operation(serverSpan.getOperationName());
789820

790821
final NodeOperationDetail nodeOperationDetail = new NodeOperationDetail(
791-
sourceNode, null, sourceOp, null, anchorTimestamp);
822+
sourceNode, null, sourceOp, null, sumAnchor);
792823

793824
final EventMetadata eventMetadata = new DefaultEventMetadata.Builder()
794825
.withEventType(EVENT_TYPE_OTEL_APM_SERVICE_MAP).build();

0 commit comments

Comments
 (0)