Skip to content

Commit bb5fc82

Browse files
committed
otel_apm_service_map: Added support for deriving remote service and remote operation
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 08e2f1d commit bb5fc82

File tree

2 files changed

+368
-3
lines changed

2 files changed

+368
-3
lines changed

data-prepper-plugins/otel-proto-common/src/main/java/org/opensearch/dataprepper/plugins/otel/common/OTelSpanDerivationUtil.java

Lines changed: 172 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@
1010

1111
package org.opensearch.dataprepper.plugins.otel.common;
1212

13+
import org.apache.commons.lang3.tuple.Pair;
1314
import org.opensearch.dataprepper.model.trace.Span;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
1617

18+
import java.net.URL;
19+
import java.net.MalformedURLException;
1720
import java.util.HashMap;
1821
import java.util.List;
1922
import java.util.Map;
@@ -33,8 +36,11 @@ public class OTelSpanDerivationUtil {
3336
public static final String DERIVED_ERROR_ATTRIBUTE = "derived.error";
3437
public static final String DERIVED_OPERATION_ATTRIBUTE = "derived.operation";
3538
public static final String DERIVED_ENVIRONMENT_ATTRIBUTE = "derived.environment";
39+
public static final String DERIVED_REMOTE_SERVICE_ATTRIBUTE = "derived.remote_service";
3640
private static final Logger LOG = LoggerFactory.getLogger(OTelSpanDerivationUtil.class);
3741
private static final String SPAN_KIND_SERVER = "SPAN_KIND_SERVER";
42+
private static final String SPAN_KIND_CLIENT = "SPAN_KIND_CLIENT";
43+
private static final String SPAN_KIND_PRODUCER = "SPAN_KIND_PRODUCER";
3844

3945
/**
4046
* Derives fault, error, operation, and environment attributes for SERVER spans in the provided list.
@@ -48,12 +54,23 @@ public static void deriveServerSpanAttributes(final List<Span> spans) {
4854
}
4955

5056
for (final Span span : spans) {
51-
if (span != null && SPAN_KIND_SERVER.equals(span.getKind())) {
57+
if (span != null && isServerSpan(span)) {
5258
deriveAttributesForSpan(span);
5359
}
5460
}
5561
}
5662

63+
private static boolean isServerSpan(Span span) {
64+
return SPAN_KIND_SERVER.equals(span.getKind());
65+
}
66+
67+
private static boolean isClientSpan(Span span) {
68+
return SPAN_KIND_CLIENT.equals(span.getKind());
69+
}
70+
71+
private static boolean isProducerSpan(Span span) {
72+
return SPAN_KIND_PRODUCER.equals(span.getKind());
73+
}
5774

5875
/**
5976
* Adds an attribute to the span. This method just delegates to span.putAttribute()
@@ -85,14 +102,23 @@ public static void deriveAttributesForSpan(final Span span) {
85102

86103
final ErrorFaultResult errorFault = computeErrorAndFault(span.getStatus(), spanAttributes);
87104

88-
final String operationName = computeOperationName(span.getName(), spanAttributes);
105+
String operationName = null;
106+
if (isServerSpan(span)) {
107+
operationName = computeOperationName(span.getName(), spanAttributes);
108+
} else if (isClientSpan(span) || isProducerSpan(span)) {
109+
final Pair<String, String> remoteOperationAndService = computeRemoteOperationAndService(spanAttributes);
110+
operationName = remoteOperationAndService.getLeft();
111+
putAttribute(span, DERIVED_REMOTE_SERVICE_ATTRIBUTE, remoteOperationAndService.getRight());
112+
}
89113

90114
final String environment = computeEnvironment(spanAttributes);
91115

92116
// Add derived attributes using our safe attribute setting method
93117
putAttribute(span, DERIVED_FAULT_ATTRIBUTE, String.valueOf(errorFault.fault));
94118
putAttribute(span, DERIVED_ERROR_ATTRIBUTE, String.valueOf(errorFault.error));
95-
putAttribute(span, DERIVED_OPERATION_ATTRIBUTE, operationName);
119+
if (operationName != null) {
120+
putAttribute(span, DERIVED_OPERATION_ATTRIBUTE, operationName);
121+
}
96122
putAttribute(span, DERIVED_ENVIRONMENT_ATTRIBUTE, environment);
97123

98124
LOG.debug("Derived attributes for SERVER span {}: fault={}, error={}, operation={}, environment={}",
@@ -186,6 +212,15 @@ private static boolean isSpanStatusError(final Object spanStatus) {
186212
return "ERROR".equalsIgnoreCase(statusString) ||
187213
"2".equals(statusString) ||
188214
statusString.toLowerCase().contains("error");
215+
216+
}
217+
218+
private static String extractFirstPathFromUrl(final String url) {
219+
int colonDoubleSlash = url.indexOf("://");
220+
int firstSlash = url.indexOf("/", colonDoubleSlash+3);
221+
int secondSlash = url.indexOf("/", firstSlash+1);
222+
String result= (secondSlash > 0) ? url.substring(firstSlash, secondSlash) : url.substring(firstSlash);
223+
return result;
189224
}
190225

191226
/**
@@ -231,6 +266,140 @@ public static String computeOperationName(final String spanName, final Map<Strin
231266
return spanName != null ? spanName : "UnknownOperation";
232267
}
233268

269+
private static final Map<String, String> AWS_SERVICE_MAPPINGS = Map.of(
270+
"AmazonDynamoDBv2", "AWS::DynamoDB",
271+
"DynamoDb", "AWS::DynamoDB",
272+
"dynamodb", "AWS::DynamoDB",
273+
"DynamoDBv2", "AWS::DynamoDB",
274+
"sns", "AWS::SNS",
275+
"AmazonSNS", "AWS::SNS",
276+
"SimpleNotificationService", "AWS::SNS",
277+
"AmazonKinesis", "AWS::Kinesis",
278+
"Amazon S3", "AWS::S3",
279+
"s3", "AWS::S3");
280+
281+
public static Pair<String,String> computeRemoteOperationAndService(final Map<String, Object> spanAttributes) {
282+
String remoteOperation = null;
283+
String remoteService = null;
284+
285+
// RPC attributes
286+
final String rpcService = getStringAttribute(spanAttributes, "rpc.service");
287+
final String rpcMethod = getStringAttribute(spanAttributes, "rpc.method");
288+
if (rpcService != null && "aws-api".equals(rpcMethod)) {
289+
remoteService = AWS_SERVICE_MAPPINGS.getOrDefault(rpcService, "AWS::" + rpcService);
290+
}
291+
if (rpcMethod != null) {
292+
remoteOperation = rpcMethod;
293+
}
294+
295+
// Database attributes
296+
if (remoteService == null || remoteOperation == null) {
297+
final String dbSystem = getStringAttribute(spanAttributes, "db.system");
298+
final String dbOperation = getStringAttribute(spanAttributes, "db.operation");
299+
final String dbStatement = getStringAttribute(spanAttributes, "db.statement");
300+
if (dbSystem != null) remoteService = dbSystem;
301+
if (dbOperation != null) {
302+
remoteOperation = dbOperation;
303+
} else if (dbStatement != null) {
304+
remoteOperation = dbStatement.trim().split("\\s+")[0].toUpperCase();
305+
}
306+
}
307+
308+
// Database attributes with name
309+
if (remoteService == null || remoteOperation == null) {
310+
final String dbSystem = getStringAttribute(spanAttributes, "db.system.name");
311+
final String dbOperation = getStringAttribute(spanAttributes, "db.operation.name");
312+
final String dbStatement = getStringAttribute(spanAttributes, "db.statement.name");
313+
if (dbSystem != null) remoteService = dbSystem;
314+
if (dbOperation != null) {
315+
remoteOperation = dbOperation;
316+
} else if (dbStatement != null) {
317+
remoteOperation = dbStatement.trim().split("\\s+")[0].toUpperCase();
318+
}
319+
}
320+
321+
// FaaS attributes
322+
if (remoteService == null || remoteOperation == null) {
323+
final String faasInvokedName = getStringAttribute(spanAttributes, "faas.invoked_name");
324+
final String faasTrigger = getStringAttribute(spanAttributes, "faas.trigger");
325+
if (faasInvokedName != null) remoteService = faasInvokedName;
326+
if (faasTrigger != null) remoteOperation = faasTrigger;
327+
}
328+
329+
// Messaging attributes
330+
if (remoteService == null || remoteOperation == null) {
331+
final String messagingSystem = getStringAttribute(spanAttributes, "messaging.system");
332+
final String messagingOperation = getStringAttribute(spanAttributes, "messaging.operation");
333+
if (messagingSystem != null) remoteService = messagingSystem;
334+
if (messagingOperation != null) remoteOperation = messagingOperation;
335+
}
336+
337+
// GraphQL attributes
338+
if (remoteService == null || remoteOperation == null) {
339+
final String graphQLOperation = getStringAttribute(spanAttributes, "graphql.operation.type");
340+
if (graphQLOperation != null) {
341+
remoteService = "graphql";
342+
remoteOperation = graphQLOperation;
343+
}
344+
}
345+
346+
// Peer service
347+
if (remoteService == null) {
348+
remoteService = getStringAttribute(spanAttributes, "peer.service");
349+
}
350+
351+
if (remoteService != null && remoteOperation != null) {
352+
return Pair.of(remoteOperation, remoteService);
353+
}
354+
355+
// Fallback: derive from URL or network attributes
356+
final String urlString = getStringAttribute(spanAttributes, "url.full") != null
357+
? getStringAttribute(spanAttributes, "url.full")
358+
: getStringAttribute(spanAttributes, "http.url");
359+
360+
if (remoteService == null) {
361+
remoteService = deriveServiceFromNetwork(spanAttributes, urlString);
362+
}
363+
364+
if (remoteOperation == null && urlString != null) {
365+
final String httpMethod = getStringAttribute(spanAttributes, "http.request.method") != null
366+
? getStringAttribute(spanAttributes, "http.request.method")
367+
: getStringAttribute(spanAttributes, "http.method");
368+
remoteOperation = httpMethod != null ? httpMethod + " " + extractFirstPathFromUrl(urlString) : urlString;
369+
}
370+
371+
return Pair.of(
372+
remoteOperation != null ? remoteOperation : "UnknownRemoteOperation",
373+
remoteService != null ? remoteService : "UnknownRemoteService");
374+
}
375+
376+
private static String deriveServiceFromNetwork(final Map<String, Object> spanAttributes, final String urlString) {
377+
final String[][] addressPortPairs = {
378+
{"server.address", "server.port"},
379+
{"net.peer.name", "net.peer.port"},
380+
{"network.peer.address", "network.peer.port"},
381+
{"net.sock.peer.addr", "net.sock.peer.port"}
382+
};
383+
384+
for (String[] pair : addressPortPairs) {
385+
final String address = getStringAttribute(spanAttributes, pair[0]);
386+
if (address != null) {
387+
final String port = getStringAttribute(spanAttributes, pair[1]);
388+
return port != null ? address + ":" + port : address;
389+
}
390+
}
391+
392+
if (urlString != null) {
393+
try {
394+
final URL url = new URL(urlString);
395+
final int port = url.getPort() == -1 ? url.getDefaultPort() : url.getPort();
396+
return url.getHost() + ":" + port;
397+
} catch (MalformedURLException ignored) {}
398+
}
399+
400+
return null;
401+
}
402+
234403
/**
235404
* Compute environment from resource attributes.
236405
* Package-private for testing purposes only.

0 commit comments

Comments
 (0)