Skip to content

Commit ef51c95

Browse files
authored
Fix lambda plugin threshold and flaky test (opensearch-project#5523)
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
1 parent f32d12e commit ef51c95

File tree

4 files changed

+27
-6
lines changed

4 files changed

+27
-6
lines changed

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/LambdaCommonHandler.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,18 @@ private static List<Buffer> createBufferBatches(Collection<Record<Event>> record
6363

6464
LOG.debug("Batch size received to lambda processor: {}", records.size());
6565
for (Record<Event> record : records) {
66-
//check size or time has exceeded threshold
67-
if (ThresholdCheck.checkSizeThresholdExceed(currentBufferPerBatch, maxBytes, record)) {
66+
//check size has exceeded threshold
67+
if (currentBufferPerBatch.getEventCount() > 0 &&
68+
ThresholdCheck.checkSizeThresholdExceed(currentBufferPerBatch, maxBytes, record)) {
6869
batchedBuffers.add(currentBufferPerBatch);
6970
currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext);
7071
}
7172

7273
currentBufferPerBatch.addRecord(record);
7374

7475
// After adding, check if the event count threshold is reached.
75-
if (ThresholdCheck.checkEventCountThresholdExceeded(currentBufferPerBatch, maxEvents)) {
76+
if (currentBufferPerBatch.getEventCount() > 0 &&
77+
ThresholdCheck.checkEventCountThresholdExceeded(currentBufferPerBatch, maxEvents)) {
7678
batchedBuffers.add(currentBufferPerBatch);
7779
currentBufferPerBatch = new InMemoryBuffer(keyName, outputCodecContext);
7880
}
@@ -102,8 +104,12 @@ public static Map<Buffer, CompletableFuture<InvokeResponse>> invokeLambdaAndGetF
102104
for (Buffer buffer : batchedBuffers) {
103105
InvokeRequest requestPayload = buffer.getRequestPayload(config.getFunctionName(),
104106
config.getInvocationType().getAwsLambdaValue());
105-
CompletableFuture<InvokeResponse> future = lambdaAsyncClient.invoke(requestPayload);
106-
bufferToFutureMap.put(buffer, future);
107+
if(requestPayload!=null) {
108+
CompletableFuture<InvokeResponse> future = lambdaAsyncClient.invoke(requestPayload);
109+
bufferToFutureMap.put(buffer, future);
110+
}else{
111+
LOG.warn("Request Payload is null, skipping lambda invocation");
112+
}
107113
}
108114
return bufferToFutureMap;
109115
}

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/util/ThresholdCheck.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,24 @@
1919
public class ThresholdCheck {
2020

2121
public static boolean checkTimeoutExceeded(final Buffer currentBuffer, final Duration maxCollectionDuration) {
22+
if (currentBuffer.getEventCount() == 0) {
23+
return false;
24+
}
2225
return currentBuffer.getDuration().compareTo(maxCollectionDuration) > 0;
2326
}
2427

2528
public static boolean checkSizeThresholdExceed(final Buffer currentBuffer, final ByteCount maxBytes, Record<Event> nextRecord) {
29+
if (currentBuffer.getEventCount() == 0) {
30+
return false;
31+
}
2632
int estimatedRecordSize = estimateRecordSize(nextRecord);
2733
return (currentBuffer.getSize() + estimatedRecordSize) > maxBytes.getBytes();
2834
}
2935

3036
public static boolean checkEventCountThresholdExceeded(final Buffer currentBuffer, final int maxEvents) {
37+
if (currentBuffer.getEventCount() == 0) {
38+
return false;
39+
}
3140
return currentBuffer.getEventCount() >= maxEvents;
3241
}
3342

data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import software.amazon.awssdk.core.SdkBytes;
3737
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
3838
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
39+
import software.amazon.awssdk.services.lambda.model.LambdaException;
3940

4041
import java.io.ByteArrayInputStream;
4142
import java.io.IOException;
@@ -201,7 +202,9 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
201202

202203
} catch (Exception e) {
203204
LOG.error(NOISY, e.getMessage(), e);
204-
if(e.getMessage().contains(EXCEEDING_PAYLOAD_LIMIT_EXCEPTION)){
205+
if (e instanceof LambdaException &&
206+
e.getMessage() != null &&
207+
e.getMessage().contains(EXCEEDING_PAYLOAD_LIMIT_EXCEPTION)) {
205208
batchExceedingThresholdCounter.increment();
206209
}
207210
/* fall through */

data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/ThresholdCheckTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ void setUp() {
5555
void testTimeoutExceededTrue() {
5656
// Simulate a buffer that has been open for 6 minutes (exceeding the 5-minute limit)
5757
when(buffer.getDuration()).thenReturn(Duration.ofMinutes(6));
58+
when(buffer.getEventCount()).thenReturn(1);
5859
assertTrue(ThresholdCheck.checkTimeoutExceeded(buffer, maxCollectionDuration),
5960
"Expected timeout threshold to be exceeded.");
6061
}
@@ -63,6 +64,7 @@ void testTimeoutExceededTrue() {
6364
void testTimeoutExceededFalse() {
6465
// Simulate a buffer that has been open for 4 minutes (within the limit)
6566
when(buffer.getDuration()).thenReturn(Duration.ofMinutes(4));
67+
when(buffer.getEventCount()).thenReturn(1);
6668
assertFalse(ThresholdCheck.checkTimeoutExceeded(buffer, maxCollectionDuration),
6769
"Expected timeout threshold to NOT be exceeded.");
6870
}
@@ -71,6 +73,7 @@ void testTimeoutExceededFalse() {
7173
@Test
7274
void testSizeThresholdExceedTrue() {
7375
long maxBytesValue = maxBytes.getBytes();
76+
when(buffer.getEventCount()).thenReturn(1);
7477
when(buffer.getSize()).thenReturn(maxBytesValue - 1);
7578
// The record's estimated size is 2 bytes (from "{}").
7679
// So, adding the record yields: (maxBytes - 1) + 2 = maxBytes + 1 (exceeds limit).

0 commit comments

Comments
 (0)