Skip to content

Commit 47a35e6

Browse files
committed
addressed review comments
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent f45865d commit 47a35e6

File tree

19 files changed

+517
-195
lines changed

19 files changed

+517
-195
lines changed

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectWorkerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n
147147

148148
private void parseObject(final String key, final S3ObjectWorker objectUnderTest) throws IOException {
149149
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
150-
objectUnderTest.parseS3Object(s3ObjectReference, null, null, null);
150+
objectUnderTest.processS3Object(s3ObjectReference, null, null, null);
151151
}
152152

153153
static class IntegrationTestArguments implements ArgumentsProvider {

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3ScanObjectWorkerIT.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
16
package org.opensearch.dataprepper.plugins.source.s3;
27

38
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -15,6 +20,7 @@
1520
import org.junit.jupiter.api.extension.ExtendWith;
1621
import org.junit.jupiter.api.extension.ExtensionContext;
1722
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.api.Test;
1824
import org.junit.jupiter.params.provider.Arguments;
1925
import org.junit.jupiter.params.provider.ArgumentsProvider;
2026
import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -367,6 +373,57 @@ void parseS3Object_correctly_with_bucket_scan_and_loads_data_into_Buffer_and_del
367373
executorService.shutdownNow();
368374
}
369375

376+
@Test
377+
public void processS3Object_test_metadata_only() throws Exception {
378+
String keyPrefix = "s3source/s3-scan/metadataTest/" + Instant.now().toEpochMilli();
379+
final String buketOptionYaml = "name: " + bucket + "\n" +
380+
"filter:\n" +
381+
" include_prefix:\n" +
382+
" - " + keyPrefix;
383+
final ScanOptions startTimeAndRangeScanOptions = new ScanOptions.Builder()
384+
.setBucketOption(objectMapper.readValue(buketOptionYaml, S3ScanBucketOption.class))
385+
.setStartDateTime(LocalDateTime.now().minusDays(1))
386+
.setEndDateTime(LocalDateTime.now().plus(Duration.ofMinutes(5)))
387+
.build();
388+
recordsReceived = 0;
389+
lenient().doAnswer(a -> {
390+
final Collection<Record<Event>> recordsCollection = a.getArgument(0);
391+
recordsReceived += recordsCollection.size();
392+
return null;
393+
}).when(buffer).writeAll(anyCollection(), anyInt());
394+
when(s3SourceConfig.getS3ScanScanOptions()).thenReturn(s3ScanScanOptions);
395+
when(s3ScanScanOptions.getSchedulingOptions()).thenReturn(s3ScanSchedulingOptions);
396+
lenient().when(s3ScanSchedulingOptions.getInterval()).thenReturn(Duration.ofHours(1));
397+
lenient().when(s3ScanSchedulingOptions.getCount()).thenReturn(1);
398+
399+
int numberOfObjects = 10;
400+
int numberOfObjectsToAccumulate = 1;
401+
final RecordsGenerator recordsGenerator = new NewlineDelimitedRecordsGenerator();
402+
for (int i = 0; i < numberOfObjects; i++) {
403+
final String key = keyPrefix +"/test"+i+"."+recordsGenerator.getFileExtension();
404+
s3ObjectGenerator.write(1, key, recordsGenerator, Boolean.FALSE);
405+
//stubBufferWriter(recordsGenerator::assertEventIsCorrect, key);
406+
}
407+
final S3ObjectRequest s3ObjectRequest = new S3ObjectRequest.Builder(buffer, numberOfObjectsToAccumulate,
408+
Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics)
409+
.bucketOwnerProvider(bucketOwnerProvider)
410+
.s3Client(s3Client)
411+
.build();
412+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
413+
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
414+
ScanObjectWorker objectUnderTest = new ScanObjectWorker(s3Client,List.of(startTimeAndRangeScanOptions), new S3ObjectMetadataWorker(s3ObjectRequest),
415+
bucketOwnerProvider, sourceCoordinator, s3SourceConfig, acknowledgementSetManager, null, 30000, pluginMetrics);
416+
final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
417+
executorService.submit(objectUnderTest::run);
418+
419+
await().atMost(Duration.ofSeconds(30)).until(() -> waitForAllRecordsToBeProcessed(numberOfObjects));
420+
final int expectedWrites = numberOfObjects / numberOfObjectsToAccumulate + (numberOfObjects % numberOfObjectsToAccumulate != 0 ? 1 : 0);
421+
verify(buffer, times(expectedWrites)).writeAll(anyCollection(), eq(TIMEOUT_IN_MILLIS));
422+
423+
assertThat(recordsReceived, equalTo(numberOfObjects));
424+
425+
}
426+
370427
private String getKeyString(final String keyPrefix ,
371428
final RecordsGenerator recordsGenerator,
372429
final boolean shouldCompress) {

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/S3SelectObjectWorkerIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ private String getKeyString(final RecordsGenerator recordsGenerator, final int n
193193

194194
private void parseObject(final String key, final S3SelectObjectWorker objectUnderTest) throws IOException {
195195
final S3ObjectReference s3ObjectReference = S3ObjectReference.bucketAndKey(bucket, key).build();
196-
objectUnderTest.parseS3Object(s3ObjectReference,null, null, null);
196+
objectUnderTest.processS3Object(s3ObjectReference,null, null, null);
197197
}
198198

199199
static class IntegrationTestArguments implements ArgumentsProvider {

data-prepper-plugins/s3-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/s3/SqsWorkerIT.java

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import com.linecorp.armeria.client.retry.Backoff;
99
import io.micrometer.core.instrument.Counter;
1010
import io.micrometer.core.instrument.DistributionSummary;
11+
import io.micrometer.core.instrument.Meter;
12+
import io.micrometer.core.instrument.Tags;
1113
import io.micrometer.core.instrument.Timer;
14+
import io.micrometer.core.instrument.noop.NoopTimer;
1215
import org.junit.jupiter.api.BeforeEach;
1316
import org.junit.jupiter.api.Disabled;
1417
import org.junit.jupiter.api.Test;
@@ -18,30 +21,38 @@
1821
import org.mockito.ArgumentCaptor;
1922
import org.mockito.Mock;
2023
import org.mockito.junit.jupiter.MockitoExtension;
24+
import org.opensearch.dataprepper.model.buffer.Buffer;
2125
import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetManager;
2226
import org.opensearch.dataprepper.metrics.PluginMetrics;
2327
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
2428
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
2529
import org.opensearch.dataprepper.model.event.Event;
30+
import org.opensearch.dataprepper.model.record.Record;
2631
import org.opensearch.dataprepper.model.event.JacksonEvent;
2732
import org.opensearch.dataprepper.plugins.source.s3.configuration.NotificationSourceOption;
2833
import org.opensearch.dataprepper.plugins.source.s3.configuration.OnErrorOption;
2934
import org.opensearch.dataprepper.plugins.source.s3.configuration.SqsOptions;
35+
import org.opensearch.dataprepper.plugins.source.s3.ownership.BucketOwnerProvider;
3036
import org.opensearch.dataprepper.plugins.source.sqs.common.SqsBackoff;
37+
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
3138
import software.amazon.awssdk.regions.Region;
3239
import software.amazon.awssdk.services.s3.S3Client;
3340
import software.amazon.awssdk.services.sqs.SqsClient;
3441

3542
import java.io.IOException;
3643
import java.time.Duration;
3744
import java.time.Instant;
45+
import java.util.Collection;
3846
import java.util.List;
3947
import java.util.ArrayList;
48+
import java.util.Optional;
4049
import java.util.UUID;
4150
import java.util.concurrent.ScheduledExecutorService;
4251
import java.util.concurrent.Executors;
4352
import java.util.concurrent.atomic.AtomicBoolean;
4453

54+
import static org.mockito.ArgumentMatchers.anyCollection;
55+
import static org.mockito.ArgumentMatchers.anyInt;
4556
import static org.awaitility.Awaitility.await;
4657
import static org.hamcrest.CoreMatchers.equalTo;
4758
import static org.hamcrest.MatcherAssert.assertThat;
@@ -60,14 +71,19 @@
6071

6172
@ExtendWith(MockitoExtension.class)
6273
class SqsWorkerIT {
74+
private static final int TIMEOUT_IN_MILLIS = 200;
6375
private SqsClient sqsClient;
6476
@Mock
6577
private S3Service s3Service;
6678
@Mock
6779
private SqsOptions sqsOptions;
80+
@Mock
81+
DistributionSummary distributionSummary;
82+
6883
private S3SourceConfig s3SourceConfig;
6984
private PluginMetrics pluginMetrics;
7085
private S3ObjectGenerator s3ObjectGenerator;
86+
private S3ObjectPluginMetrics s3ObjectPluginMetrics;
7187
private String bucket;
7288
private Backoff backoff;
7389
private AcknowledgementSetManager acknowledgementSetManager;
@@ -79,11 +95,14 @@ class SqsWorkerIT {
7995
private AtomicBoolean ready = new AtomicBoolean(false);
8096
private int numEventsAdded;
8197
private List<Event> events;
98+
private int recordsReceived;
99+
S3Client s3Client;
82100

83101
@BeforeEach
84102
void setUp() {
103+
distributionSummary = mock(DistributionSummary.class);
85104
acknowledgementSetManager = mock(AcknowledgementSetManager.class);
86-
final S3Client s3Client = S3Client.builder()
105+
s3Client = S3Client.builder()
87106
.region(Region.of(System.getProperty("tests.s3source.region")))
88107
.build();
89108
bucket = System.getProperty("tests.s3source.bucket");
@@ -101,7 +120,6 @@ void setUp() {
101120

102121
pluginMetrics = mock(PluginMetrics.class);
103122
final Counter sharedCounter = mock(Counter.class);
104-
final DistributionSummary distributionSummary = mock(DistributionSummary.class);
105123
final Timer sqsMessageDelayTimer = mock(Timer.class);
106124

107125
lenient().when(pluginMetrics.counter(anyString())).thenReturn(sharedCounter);
@@ -148,6 +166,66 @@ void processSqsMessages_should_return_at_least_one_message(final int numberOfObj
148166
assertThat(sqsMessagesProcessed, lessThanOrEqualTo(numberOfObjectsToWrite));
149167
}
150168

169+
@ParameterizedTest
170+
@ValueSource(ints = {10})
171+
void processSqsMessages_with_metadataOnly_option(final int numberOfObjectsToWrite) throws Exception {
172+
final Counter counter = mock(Counter.class);
173+
s3ObjectPluginMetrics = mock(S3ObjectPluginMetrics.class);
174+
final Timer timer = new NoopTimer(new Meter.Id("test", Tags.empty(), null, null, Meter.Type.TIMER));
175+
when(sqsOptions.getMaxReceiveAttempts()).thenReturn(5);
176+
lenient().when(s3ObjectPluginMetrics.getS3ObjectsSucceededCounter()).thenReturn(counter);
177+
lenient().when(s3ObjectPluginMetrics.getS3ObjectSizeSummary()).thenReturn(distributionSummary);
178+
lenient().when(s3ObjectPluginMetrics.getS3ObjectEventsSummary()).thenReturn(distributionSummary);
179+
lenient().when(s3ObjectPluginMetrics.getS3ObjectReadTimer()).thenReturn(timer);
180+
Buffer<Record<Event>> buffer = mock(Buffer.class);
181+
when(s3SourceConfig.getAcknowledgements()).thenReturn(true);
182+
final Counter receivedCounter = mock(Counter.class);
183+
final Counter deletedCounter = mock(Counter.class);
184+
final Counter ackCallbackCounter = mock(Counter.class);
185+
lenient().doAnswer(a -> {
186+
final Collection<Record<Event>> recordsCollection = a.getArgument(0);
187+
recordsReceived += recordsCollection.size();
188+
return null;
189+
}).when(buffer).writeAll(anyCollection(), anyInt());
190+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_RECEIVED_METRIC_NAME)).thenReturn(receivedCounter);
191+
when(pluginMetrics.counter(SqsWorker.SQS_MESSAGES_DELETED_METRIC_NAME)).thenReturn(deletedCounter);
192+
when(pluginMetrics.counter(SqsWorker.ACKNOWLEDGEMENT_SET_CALLACK_METRIC_NAME)).thenReturn(ackCallbackCounter);
193+
lenient().doAnswer((val) -> {
194+
receivedCount += (double)val.getArgument(0);
195+
return null;
196+
}).when(receivedCounter).increment(any(Double.class));
197+
lenient().doAnswer((val) -> {
198+
ackCallbackCount += 1;
199+
return null;
200+
}).when(ackCallbackCounter).increment();
201+
202+
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
203+
acknowledgementSetManager = new DefaultAcknowledgementSetManager(executor);
204+
BucketOwnerProvider bucketOwnerProvider = b -> Optional.empty();
205+
final S3ObjectRequest s3ObjectRequest = new S3ObjectRequest.Builder(buffer, 1,
206+
Duration.ofMillis(TIMEOUT_IN_MILLIS), s3ObjectPluginMetrics)
207+
.bucketOwnerProvider(bucketOwnerProvider)
208+
.s3Client(s3Client)
209+
.build();
210+
S3Service s3MetadataService = new S3Service(new S3ObjectMetadataWorker(s3ObjectRequest));
211+
final SqsWorker objectUnderTest = new SqsWorker(acknowledgementSetManager, sqsClient, s3MetadataService, s3SourceConfig, pluginMetrics, backoff);
212+
List<String> keyList = writeToS3(numberOfObjectsToWrite);
213+
await().atMost(Duration.ofSeconds(60))
214+
.untilAsserted(() -> {
215+
final int sqsMessagesProcessed = objectUnderTest.processSqsMessages();
216+
assertThat(recordsReceived, equalTo(numberOfObjectsToWrite));
217+
});
218+
219+
assertThat(deletedCount, equalTo((double)0.0));
220+
// Delete the objects created
221+
for (String key: keyList) {
222+
final DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder()
223+
.bucket(bucket)
224+
.key(key).build();
225+
s3Client.deleteObject(deleteObjectRequest);
226+
}
227+
}
228+
151229
@ParameterizedTest
152230
@ValueSource(ints = {1})
153231
void processSqsMessages_should_return_at_least_one_message_with_acks_with_callback_invoked_after_processS3Object_finishes(final int numberOfObjectsToWrite) throws IOException, InterruptedException {
@@ -455,14 +533,17 @@ void processSqsMessages_should_return_zero_if_no_objects_are_written() {
455533
assertThat(sqsMessagesProcessed, equalTo(0));
456534
}
457535

458-
private void writeToS3(final int numberOfObjectsToWrite) throws IOException {
536+
private List<String> writeToS3(final int numberOfObjectsToWrite) throws IOException {
459537
final int numberOfRecords = 100;
460538
final NewlineDelimitedRecordsGenerator newlineDelimitedRecordsGenerator = new NewlineDelimitedRecordsGenerator();
539+
List<String> keyList = new ArrayList<>();
461540
for (int i = 0; i < numberOfObjectsToWrite; i++) {
462541
final String key = "s3 source/sqs/" + UUID.randomUUID() + "_" + Instant.now().toString() + newlineDelimitedRecordsGenerator.getFileExtension();
463542
// isCompressionEnabled is set to false since we test for compression in S3ObjectWorkerIT
464543
s3ObjectGenerator.write(numberOfRecords, key, newlineDelimitedRecordsGenerator, false);
544+
keyList.add(key);
465545
}
546+
return keyList;
466547
}
467548

468549
private void clearSqsQueue() {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.dataprepper.plugins.source.s3;
7+
8+
import com.fasterxml.jackson.annotation.JsonCreator;
9+
10+
import java.util.Arrays;
11+
import java.util.Map;
12+
import java.util.stream.Collectors;
13+
14+
public enum S3DataSelection {
15+
16+
DATA_ONLY("data_only"),
17+
METADATA_ONLY("metadata_only"),
18+
DATA_AND_METADATA("data_and_metadata");
19+
20+
private static final Map<String, S3DataSelection> S3_DATA_SELECTION_MAP = Arrays.stream(S3DataSelection.values())
21+
.collect(Collectors.toMap(
22+
value -> value.type,
23+
value -> value
24+
));
25+
26+
private final String type;
27+
28+
S3DataSelection(final String type) {
29+
this.type = type;
30+
}
31+
32+
@JsonCreator
33+
public static S3DataSelection getByMessageFormatByName(final String name) {
34+
return S3_DATA_SELECTION_MAP.get(name.toLowerCase());
35+
}
36+
}
37+

data-prepper-plugins/s3-source/src/main/java/org/opensearch/dataprepper/plugins/source/s3/S3ObjectHandler.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,27 @@
66

77
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
88
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
9+
import org.opensearch.dataprepper.model.record.Record;
10+
import org.opensearch.dataprepper.model.event.Event;
911

1012
import java.io.IOException;
13+
import java.util.function.Consumer;
1114

1215
/**
1316
* A S3ObjectHandler interface must be extended/implement for S3 Object parsing
1417
*
1518
*/
1619
public interface S3ObjectHandler {
1720
/**
18-
* Parse S3 object content using S3 object reference and pushing to buffer
21+
* Process S3 object content using S3 object reference and pushing to buffer
1922
* @param s3ObjectReference Contains bucket and s3 object details
2023
* @param acknowledgementSet acknowledgement set for the object
2124
* @param sourceCoordinator source coordinator
2225
* @param partitionKey partition key
2326
*
2427
* @throws IOException exception is thrown every time because this is not supported
2528
*/
26-
void parseS3Object(final S3ObjectReference s3ObjectReference,
29+
void processS3Object(final S3ObjectReference s3ObjectReference,
2730
final AcknowledgementSet acknowledgementSet,
2831
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
2932
final String partitionKey) throws IOException;
@@ -35,17 +38,15 @@ void parseS3Object(final S3ObjectReference s3ObjectReference,
3538
void deleteS3Object(final S3ObjectReference s3ObjectReference);
3639

3740
/**
38-
* process S3 object metadata using S3 object reference and pushing to buffer
41+
* consume S3 object content using S3 object reference and pushing to buffer
3942
* @param s3ObjectReference Contains bucket and s3 object details
40-
* @param acknowledgementSet acknowledgement set for the object
41-
* @param sourceCoordinator source coordinator
42-
* @param partitionKey partition key
43+
* @param s3InputFile S3 input file object corresponding to the s3 object
44+
* @param consumer consumer of each record created while processing the object
4345
*
44-
* @throws IOException exception is thrown every time because this is not supported
46+
* @throws IOException exception
4547
*/
46-
default void processS3ObjectMetadata(final S3ObjectReference s3ObjectReference,
47-
final AcknowledgementSet acknowledgementSet,
48-
final SourceCoordinator<S3SourceProgressState> sourceCoordinator,
49-
final String partitionKey) throws IOException {
50-
}
48+
long consumeS3Object(final S3ObjectReference s3ObjectReference,
49+
final S3InputFile inputFile,
50+
final Consumer<Record<Event>> consumer) throws Exception;
51+
5152
}

0 commit comments

Comments
 (0)