Skip to content

Commit 4651d60

Browse files
authored
Support otel metrics source with partition keys when persistent buffer is used (opensearch-project#6373)
* Support otel metrics source with partition keys when persistent buffer is used Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Addressed review comments Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> * Optimized splitExportMetricsServiceRequestByKeys Signed-off-by: Krishna Kondaka <krishkdk@amazon.com> --------- Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent a8ca515 commit 4651d60

File tree

10 files changed

+852
-30
lines changed

10 files changed

+852
-30
lines changed

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcService.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.time.Instant;
2929
import java.util.Collection;
30+
import java.util.Map;
31+
import java.util.Set;
3032
import java.util.concurrent.atomic.AtomicInteger;
3133

3234
public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImplBase {
@@ -49,15 +51,18 @@ public class OTelMetricsGrpcService extends MetricsServiceGrpc.MetricsServiceImp
4951
private final Counter recordsDroppedCounter;
5052
private final DistributionSummary payloadSizeSummary;
5153
private final Timer requestProcessDuration;
54+
private final Set<String> bufferPartitionKeys;
5255

5356
public OTelMetricsGrpcService(int bufferWriteTimeoutInMillis,
5457
final OTelProtoCodec.OTelProtoDecoder oTelProtoDecoder,
5558
Buffer<Record<? extends Metric>> buffer,
59+
final Set<String> bufferPartitionKeys,
5660
final PluginMetrics pluginMetrics,
5761
final String metricsPrefix) {
5862
this.bufferWriteTimeoutInMillis = bufferWriteTimeoutInMillis;
5963
this.buffer = buffer;
6064

65+
this.bufferPartitionKeys = bufferPartitionKeys;
6166
if (metricsPrefix != null) {
6267
requestsReceivedCounter = pluginMetrics.counter(REQUESTS_RECEIVED, metricsPrefix);
6368
successRequestsCounter = pluginMetrics.counter(SUCCESS_REQUESTS, metricsPrefix);
@@ -96,7 +101,15 @@ public void export(final ExportMetricsServiceRequest request, final StreamObserv
96101
private void processRequest(final ExportMetricsServiceRequest request, final StreamObserver<ExportMetricsServiceResponse> responseObserver) {
97102
try {
98103
if (buffer.isByteBuffer()) {
99-
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
104+
if (bufferPartitionKeys != null && !bufferPartitionKeys.isEmpty()) {
105+
Map<String, ExportMetricsServiceRequest> requestsMap =
106+
oTelProtoDecoder.splitExportMetricsServiceRequestByKeys(request, bufferPartitionKeys);
107+
for (Map.Entry<String, ExportMetricsServiceRequest> entry: requestsMap.entrySet()) {
108+
buffer.writeBytes(entry.getValue().toByteArray(), entry.getKey(), bufferWriteTimeoutInMillis);
109+
}
110+
} else {
111+
buffer.writeBytes(request.toByteArray(), null, bufferWriteTimeoutInMillis);
112+
}
100113
} else {
101114
Collection<Record<? extends Metric>> metrics;
102115

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public void start(Buffer<Record<? extends Metric>> buffer) {
8585
(int) (oTelMetricsSourceConfig.getRequestTimeoutInMillis() * 0.8),
8686
oTelMetricsSourceConfig.getOutputFormat() == OTelOutputFormat.OPENSEARCH ? new OTelProtoOpensearchCodec.OTelProtoDecoder() : new OTelProtoStandardCodec.OTelProtoDecoder(),
8787
buffer,
88+
oTelMetricsSourceConfig.getBufferPartitionKeys(),
8889
pluginMetrics,
8990
null
9091
);

data-prepper-plugins/otel-metrics-source/src/main/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceConfig.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.opensearch.dataprepper.model.configuration.PluginModel;
1616
import org.opensearch.dataprepper.plugins.otel.codec.OTelOutputFormat;
1717

18+
import java.util.Set;
19+
1820
public class OTelMetricsSourceConfig {
1921
static final String REQUEST_TIMEOUT = "request_timeout";
2022
static final String PORT = "port";
@@ -23,6 +25,8 @@ public class OTelMetricsSourceConfig {
2325
static final String USE_ACM_CERT_FOR_SSL = "useAcmCertForSSL";
2426
static final String ACM_CERT_ISSUE_TIME_OUT_MILLIS = "acmCertIssueTimeOutMillis";
2527
static final String HEALTH_CHECK_SERVICE = "health_check_service";
28+
static final String OUTPUT_FORMAT = "output_format";
29+
static final String BUFFER_PARTITION_KEYS = "partition_keys";
2630
static final String PROTO_REFLECTION_SERVICE = "proto_reflection_service";
2731
static final String SSL_KEY_CERT_FILE = "sslKeyCertChainFile";
2832
static final String SSL_KEY_FILE = "sslKeyFile";
@@ -46,6 +50,8 @@ public class OTelMetricsSourceConfig {
4650
static final int DEFAULT_ACM_CERT_ISSUE_TIME_OUT_MILLIS = 120000;
4751
private static final String S3_PREFIX = "s3://";
4852
static final String UNAUTHENTICATED_HEALTH_CHECK = "unauthenticated_health_check";
53+
private static final String NAME_KEY = "name";
54+
private static final String SERVICE_NAME_KEY = "service_name";
4955

5056
@JsonProperty(REQUEST_TIMEOUT)
5157
private int requestTimeoutInMillis = DEFAULT_REQUEST_TIMEOUT_MS;
@@ -69,9 +75,13 @@ public class OTelMetricsSourceConfig {
6975
@JsonProperty(SSL)
7076
private boolean ssl = DEFAULT_SSL;
7177

72-
@JsonProperty("output_format")
78+
@JsonProperty(OUTPUT_FORMAT)
7379
private OTelOutputFormat outputFormat = OTelOutputFormat.OPENSEARCH;
7480

81+
@JsonProperty(BUFFER_PARTITION_KEYS)
82+
@Size(min = 1, max = 2, message = "partition_keys must contain 1 or 2 elements")
83+
private Set<String> bufferPartitionKeys = null;
84+
7585
@JsonProperty(USE_ACM_CERT_FOR_SSL)
7686
private boolean useAcmCertForSSL = DEFAULT_USE_ACM_CERT_FOR_SSL;
7787

@@ -116,6 +126,17 @@ public class OTelMetricsSourceConfig {
116126
@JsonProperty(RETRY_INFO)
117127
private RetryInfoConfig retryInfo;
118128

129+
@AssertTrue(message = "buffer_partition_keys only supports 'name' and 'service_name'. 'name' is mandatory")
130+
boolean isBufferKeysValid() {
131+
if (bufferPartitionKeys == null) {
132+
return true;
133+
}
134+
135+
return bufferPartitionKeys.contains(NAME_KEY) &&
136+
(bufferPartitionKeys.size() == 1 || (bufferPartitionKeys.size() == 2
137+
&& bufferPartitionKeys.contains(SERVICE_NAME_KEY)));
138+
}
139+
119140
@AssertTrue(message = "path should start with /")
120141
boolean isPathValid() {
121142
return path == null || path.startsWith("/");
@@ -168,6 +189,10 @@ public boolean hasHealthCheck() {
168189
return healthCheck;
169190
}
170191

192+
public Set<String> getBufferPartitionKeys() {
193+
return bufferPartitionKeys;
194+
}
195+
171196
public boolean enableHttpHealthCheck() {
172197
return enableUnframedRequests() && hasHealthCheck();
173198
}

data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsGrpcServiceTest.java

Lines changed: 108 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.dataprepper.plugins.source.otelmetrics;
77

8+
import org.apache.commons.lang3.tuple.Pair;
89
import com.linecorp.armeria.server.ServiceRequestContext;
910
import io.grpc.stub.StreamObserver;
1011
import io.micrometer.core.instrument.Counter;
@@ -44,6 +45,7 @@
4445
import java.util.Collections;
4546
import java.util.List;
4647
import java.util.Map;
48+
import java.util.Set;
4749
import java.util.concurrent.TimeoutException;
4850
import java.util.stream.Stream;
4951

@@ -85,6 +87,10 @@ public class OTelMetricsGrpcServiceTest {
8587
private static final int TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT = 11;
8688
private static final int TEST_SCOPE_DROPPED_ATTRIBUTES_COUNT = 22;
8789

90+
private static final String SERVICE_NAME_KEY="service.name";
91+
private static final String TEST_SERVICE_NAME1="testService1";
92+
private static final String TEST_SERVICE_NAME2="testService2";
93+
8894
private static final String IS_SCOPE_NAME="testISScopeName";
8995
private static final String IS_SCOPE_VERSION="testISScopeVersion";
9096
private static final String IS_SCOPE_ATTR_KEY="scope.attr";
@@ -167,6 +173,9 @@ public class OTelMetricsGrpcServiceTest {
167173
@Captor
168174
private ArgumentCaptor<Collection<Record>> recordCaptor;
169175

176+
@Captor
177+
private ArgumentCaptor<byte[]> byteBufferRecordCaptor;
178+
170179
@Captor
171180
ArgumentCaptor<byte[]> bytesCaptor;
172181

@@ -195,8 +204,12 @@ public void setup() {
195204

196205
}
197206

207+
OTelMetricsGrpcService createObjectUnderTestWithKeys(final OTelProtoCodec.OTelProtoDecoder decoder, final Set<String> keys) {
208+
return new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, decoder, buffer, keys, mockPluginMetrics, null);
209+
}
210+
198211
OTelMetricsGrpcService createObjectUnderTest(OTelProtoCodec.OTelProtoDecoder decoder) {
199-
return new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, decoder, buffer, mockPluginMetrics, null);
212+
return new OTelMetricsGrpcService(bufferWriteTimeoutInMillis, decoder, buffer, null, mockPluginMetrics, null);
200213
}
201214

202215
@ParameterizedTest
@@ -273,6 +286,43 @@ public void export_BufferTimeout_responseObserverOnError(OTelProtoCodec.OTelProt
273286
verify(requestProcessDuration, times(1)).record(ArgumentMatchers.<Runnable>any());
274287
}
275288

289+
@Test
290+
public void test_MetricsSource_output_with_PartitionKeyName() throws Exception {
291+
final ExportMetricsServiceRequest METRIC_REQUEST = createMetricsRequest();
292+
final OTelProtoStandardCodec.OTelProtoDecoder decoder = new OTelProtoStandardCodec.OTelProtoDecoder();
293+
when(buffer.isByteBuffer()).thenReturn(true);
294+
295+
sut = createObjectUnderTestWithKeys(decoder, Set.of("name"));
296+
try (MockedStatic<ServiceRequestContext> mockedStatic = mockStatic(ServiceRequestContext.class)) {
297+
mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext);
298+
sut.export(METRIC_REQUEST, responseObserver);
299+
}
300+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_GAUGE_METRIC_NAME), anyInt());
301+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SUM_METRIC_NAME), anyInt());
302+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SUMMARY_METRIC_NAME), anyInt());
303+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_HISTOGRAM_METRIC_NAME), anyInt());
304+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_EXPONENTIAL_HISTOGRAM_METRIC_NAME), anyInt());
305+
}
306+
307+
308+
@Test
309+
public void test_MetricsSource_output_with_PartitionKeysNameAndServiceName() throws Exception {
310+
final ExportMetricsServiceRequest METRIC_REQUEST = createMetricsRequestWithTwoServices();
311+
final OTelProtoStandardCodec.OTelProtoDecoder decoder = new OTelProtoStandardCodec.OTelProtoDecoder();
312+
when(buffer.isByteBuffer()).thenReturn(true);
313+
314+
sut = createObjectUnderTestWithKeys(decoder, Set.of("name", "service_name"));
315+
try (MockedStatic<ServiceRequestContext> mockedStatic = mockStatic(ServiceRequestContext.class)) {
316+
mockedStatic.when(ServiceRequestContext::current).thenReturn(serviceRequestContext);
317+
sut.export(METRIC_REQUEST, responseObserver);
318+
}
319+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SERVICE_NAME1+":"+TEST_GAUGE_METRIC_NAME), anyInt());
320+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SERVICE_NAME1+":"+TEST_SUM_METRIC_NAME), anyInt());
321+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SERVICE_NAME2+":"+TEST_SUMMARY_METRIC_NAME), anyInt());
322+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SERVICE_NAME2+":"+TEST_HISTOGRAM_METRIC_NAME), anyInt());
323+
verify(buffer, times(1)).writeBytes(byteBufferRecordCaptor.capture(), eq(TEST_SERVICE_NAME2+":"+TEST_EXPONENTIAL_HISTOGRAM_METRIC_NAME), anyInt());
324+
}
325+
276326
@Test
277327
public void test_MetricsSource_output_with_OpensearchFormat() throws Exception {
278328
final ExportMetricsServiceRequest METRIC_REQUEST = createMetricsRequest();
@@ -336,19 +386,12 @@ private static Stream<Arguments> getDecoderArguments() {
336386
);
337387
}
338388

339-
private ExportMetricsServiceRequest createMetricsRequest() {
389+
private Pair<ScopeMetrics, ScopeMetrics> createScopeMetrics() {
340390
Instant currentTime = Instant.now();
341391
startTime = currentTime.toString();
342392
final long currentUnixTimeNano = ((long)currentTime.getEpochSecond() * 1000_000_000L) + currentTime.getNano();
343393
final long endUnixTimeNano = currentUnixTimeNano + (TIME_DELTA*1000_000_000L);
344394
endTime = currentTime.plusSeconds(TIME_DELTA).toString();
345-
final Resource resource = Resource.newBuilder()
346-
.setDroppedAttributesCount(TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT)
347-
.addAttributes(KeyValue.newBuilder()
348-
.setKey(TEST_RESOURCE_ATTR_KEY)
349-
.setValue(AnyValue.newBuilder().setStringValue(TEST_RESOURCE_ATTR_VALUE).build())
350-
).build();
351-
352395
final InstrumentationScope instrumentationScope = InstrumentationScope.newBuilder()
353396
.setName(IS_SCOPE_NAME)
354397
.setVersion(IS_SCOPE_VERSION)
@@ -357,7 +400,7 @@ private ExportMetricsServiceRequest createMetricsRequest() {
357400
.setKey(IS_SCOPE_ATTR_KEY)
358401
.setValue(AnyValue.newBuilder().setStringValue(IS_SCOPE_ATTR_VALUE).build())
359402
).build();
360-
ScopeMetrics scopeMetrics = ScopeMetrics.newBuilder()
403+
ScopeMetrics scopeMetrics1 = ScopeMetrics.newBuilder()
361404
.setScope(instrumentationScope)
362405
.setSchemaUrl(TEST_SCHEMA_URL)
363406
.addMetrics(io.opentelemetry.proto.metrics.v1.Metric.newBuilder()
@@ -395,6 +438,11 @@ private ExportMetricsServiceRequest createMetricsRequest() {
395438
.setAggregationTemporality(io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_CUMULATIVE)
396439
.build())
397440
.build())
441+
.build();
442+
443+
ScopeMetrics scopeMetrics2 = ScopeMetrics.newBuilder()
444+
.setScope(instrumentationScope)
445+
.setSchemaUrl(TEST_SCHEMA_URL)
398446
.addMetrics(io.opentelemetry.proto.metrics.v1.Metric.newBuilder()
399447
.setName(TEST_EXPONENTIAL_HISTOGRAM_METRIC_NAME)
400448
.setDescription(TEST_EXPONENTIAL_HISTOGRAM_DESCRIPTION)
@@ -473,14 +521,63 @@ private ExportMetricsServiceRequest createMetricsRequest() {
473521
.build())
474522
.build())
475523
.build();
524+
return Pair.of(scopeMetrics1, scopeMetrics2);
525+
}
476526

527+
private ExportMetricsServiceRequest createMetricsRequest() {
528+
final Resource resource = Resource.newBuilder()
529+
.setDroppedAttributesCount(TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT)
530+
.addAttributes(KeyValue.newBuilder()
531+
.setKey(TEST_RESOURCE_ATTR_KEY)
532+
.setValue(AnyValue.newBuilder().setStringValue(TEST_RESOURCE_ATTR_VALUE).build())
533+
).build();
534+
535+
Pair<ScopeMetrics, ScopeMetrics> scopeMetrics = createScopeMetrics();
477536
ResourceMetrics resourceMetrics = ResourceMetrics.newBuilder()
478537
.setResource(resource)
479-
.addScopeMetrics(scopeMetrics)
538+
.addScopeMetrics(scopeMetrics.getLeft())
539+
.addScopeMetrics(scopeMetrics.getRight())
480540
.build();
481541

482542
return ExportMetricsServiceRequest.newBuilder()
483543
.addResourceMetrics(resourceMetrics)
484544
.build();
485545
}
546+
547+
private ExportMetricsServiceRequest createMetricsRequestWithTwoServices() {
548+
Instant currentTime = Instant.now();
549+
startTime = currentTime.toString();
550+
final long currentUnixTimeNano = ((long)currentTime.getEpochSecond() * 1000_000_000L) + currentTime.getNano();
551+
final long endUnixTimeNano = currentUnixTimeNano + (TIME_DELTA*1000_000_000L);
552+
endTime = currentTime.plusSeconds(TIME_DELTA).toString();
553+
final Resource resource1 = Resource.newBuilder()
554+
.setDroppedAttributesCount(TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT)
555+
.addAttributes(KeyValue.newBuilder()
556+
.setKey(SERVICE_NAME_KEY)
557+
.setValue(AnyValue.newBuilder().setStringValue(TEST_SERVICE_NAME1).build())
558+
).build();
559+
final Resource resource2 = Resource.newBuilder()
560+
.setDroppedAttributesCount(TEST_RESOURCE_DROPPED_ATTRIBUTES_COUNT)
561+
.addAttributes(KeyValue.newBuilder()
562+
.setKey(SERVICE_NAME_KEY)
563+
.setValue(AnyValue.newBuilder().setStringValue(TEST_SERVICE_NAME2).build())
564+
).build();
565+
566+
Pair<ScopeMetrics, ScopeMetrics> scopeMetrics = createScopeMetrics();
567+
568+
ResourceMetrics resourceMetrics1 = ResourceMetrics.newBuilder()
569+
.setResource(resource1)
570+
.addScopeMetrics(scopeMetrics.getLeft())
571+
.build();
572+
573+
ResourceMetrics resourceMetrics2 = ResourceMetrics.newBuilder()
574+
.setResource(resource2)
575+
.addScopeMetrics(scopeMetrics.getRight())
576+
.build();
577+
578+
return ExportMetricsServiceRequest.newBuilder()
579+
.addResourceMetrics(resourceMetrics1)
580+
.addResourceMetrics(resourceMetrics2)
581+
.build();
582+
}
486583
}

0 commit comments

Comments
 (0)