Skip to content

Commit 0ddc84b

Browse files
authored
Fix Data Prepper router to send records through routing strategy before sending to the sinks (opensearch-project#6370)
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
1 parent 92aa5fd commit 0ddc84b

File tree

3 files changed

+72
-7
lines changed

3 files changed

+72
-7
lines changed

data-prepper-core/src/integrationTest/java/org/opensearch/dataprepper/integration/Router_ThreeRoutesDefaultIT.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,31 +9,44 @@
99
import org.junit.jupiter.api.BeforeEach;
1010
import org.junit.jupiter.api.Test;
1111
import org.opensearch.dataprepper.model.event.Event;
12+
import org.opensearch.dataprepper.model.event.DefaultEventHandle;
1213
import org.opensearch.dataprepper.model.event.JacksonEvent;
1314
import org.opensearch.dataprepper.model.record.Record;
1415
import org.opensearch.dataprepper.plugins.InMemorySinkAccessor;
1516
import org.opensearch.dataprepper.plugins.InMemorySourceAccessor;
1617
import org.opensearch.dataprepper.test.framework.DataPrepperTestRunner;
1718

19+
import java.time.Instant;
1820
import java.util.ArrayList;
1921
import java.util.Collections;
22+
import java.util.HashMap;
2023
import java.util.List;
2124
import java.util.Map;
2225
import java.util.UUID;
26+
import java.util.concurrent.atomic.AtomicInteger;
2327
import java.util.concurrent.TimeUnit;
2428
import java.util.stream.Collectors;
2529
import java.util.stream.IntStream;
2630

31+
import static org.mockito.Mockito.mock;
32+
import static org.mockito.Mockito.when;
33+
import static org.mockito.Mockito.doAnswer;
34+
import static org.mockito.ArgumentMatchers.any;
2735
import static org.awaitility.Awaitility.await;
2836
import static org.hamcrest.CoreMatchers.equalTo;
2937
import static org.hamcrest.CoreMatchers.not;
3038
import static org.hamcrest.MatcherAssert.assertThat;
3139
import static org.hamcrest.Matchers.containsInAnyOrder;
3240
import static org.hamcrest.Matchers.empty;
41+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3342

3443
class Router_ThreeRoutesDefaultIT {
3544
private static final String TESTING_KEY = "ConditionalRoutingIT";
3645
private static final String ALL_SOURCE_KEY = TESTING_KEY + "_all";
46+
private static final int NUM_ALPHA_EVENTS = 10;
47+
private static final int NUM_BETA_EVENTS = 20;
48+
private static final int NUM_GAMMA_EVENTS = 20;
49+
private static final int NUM_DEFAULT_EVENTS = 20;
3750
private static final String ALPHA_SOURCE_KEY = TESTING_KEY + "_alpha";
3851
private static final String BETA_SOURCE_KEY = TESTING_KEY + "_beta";
3952
private static final String ALPHA_DEFAULT_SOURCE_KEY = TESTING_KEY + "_alpha_default";
@@ -47,9 +60,43 @@ class Router_ThreeRoutesDefaultIT {
4760
private DataPrepperTestRunner dataPrepperTestRunner;
4861
private InMemorySourceAccessor inMemorySourceAccessor;
4962
private InMemorySinkAccessor inMemorySinkAccessor;
63+
private Map<Object, AtomicInteger> numReleases;
64+
private Map<String, DefaultEventHandle> eventHandles;
5065

5166
@BeforeEach
5267
void setUp() {
68+
eventHandles = new HashMap<>();
69+
Map<String, Integer> testEventTypesMap = Map.of(ALPHA_VALUE, NUM_ALPHA_EVENTS, BETA_VALUE, NUM_BETA_EVENTS, GAMMA_VALUE, NUM_GAMMA_EVENTS, DEFAULT_VALUE, NUM_DEFAULT_EVENTS);
70+
// Setup check on event handles such a way that all acquires happen before release
71+
for (Map.Entry<String, Integer> testEventType: testEventTypesMap.entrySet()) {
72+
for (int i = 0; i < testEventType.getValue(); i++) {
73+
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
74+
eventHandles.put(testEventType.getKey()+i, eventHandle);
75+
when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now());
76+
when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now());
77+
doAnswer(invocation -> {
78+
Object mock = invocation.getMock();
79+
synchronized (numReleases) {
80+
AtomicInteger releases = numReleases.computeIfAbsent(mock, k -> new AtomicInteger(0));
81+
assertThat(releases.get(), equalTo(0));
82+
}
83+
return null;
84+
}).when(eventHandle).acquireReference();
85+
86+
doAnswer(invocation -> {
87+
Object mock = invocation.getMock();
88+
synchronized (numReleases) {
89+
AtomicInteger releases = numReleases.get(mock);
90+
if (releases != null) {
91+
releases.incrementAndGet();
92+
}
93+
}
94+
return null;
95+
}).when(eventHandle).release(any(Boolean.class));
96+
}
97+
}
98+
numReleases = new HashMap<>();
99+
53100
dataPrepperTestRunner = DataPrepperTestRunner.builder()
54101
.withPipelinesDirectoryOrFile("route/three-route-with-default-route.yaml")
55102
.build();
@@ -66,10 +113,10 @@ void tearDown() {
66113

67114
@Test
68115
void test_default_route() {
69-
final List<Record<Event>> alphaEvents = createEvents(ALPHA_VALUE, 10);
70-
final List<Record<Event>> betaEvents = createEvents(BETA_VALUE, 20);
71-
final List<Record<Event>> gammaEvents = createEvents(GAMMA_VALUE, 20);
72-
final List<Record<Event>> defaultEvents = createEvents(DEFAULT_VALUE, 20);
116+
final List<Record<Event>> alphaEvents = createEvents(ALPHA_VALUE, NUM_ALPHA_EVENTS);
117+
final List<Record<Event>> betaEvents = createEvents(BETA_VALUE, NUM_BETA_EVENTS);
118+
final List<Record<Event>> gammaEvents = createEvents(GAMMA_VALUE, NUM_GAMMA_EVENTS);
119+
final List<Record<Event>> defaultEvents = createEvents(DEFAULT_VALUE, NUM_DEFAULT_EVENTS);
73120

74121
final List<Record<Event>> allEvents = new ArrayList<>(alphaEvents);
75122
allEvents.addAll(betaEvents);
@@ -120,8 +167,12 @@ void test_default_route() {
120167

121168
private List<Record<Event>> createEvents(final String value, final int numberToCreate) {
122169
return IntStream.range(0, numberToCreate)
123-
.mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "arbitrary_field", UUID.randomUUID().toString()))
124-
.map(map -> JacksonEvent.builder().withData(map).withEventType("TEST").build())
170+
.mapToObj(i -> Map.of(KNOWN_CONDITIONAL_KEY, value, "id", i, "arbitrary_field", UUID.randomUUID().toString()))
171+
.map(map -> {
172+
DefaultEventHandle eventHandle = eventHandles.get(value+map.get("id"));
173+
assertNotNull(eventHandle);
174+
return JacksonEvent.builder().withData(map).withEventType("TEST").withEventHandle(eventHandle).build();
175+
})
125176
.map(jacksonEvent -> (Event) jacksonEvent)
126177
.map(Record::new)
127178
.collect(Collectors.toList());

data-prepper-core/src/integrationTest/resources/org/opensearch/dataprepper/pipeline/route/three-route-with-default-route.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ routing-pipeline:
44
source:
55
in_memory:
66
testing_key: ConditionalRoutingIT
7+
acknowledgments: true
78
buffer:
89
bounded_blocking:
910
# Use a small batch size to help ensure that multiple threads
@@ -15,27 +16,33 @@ routing-pipeline:
1516
- gamma: '/value == "g"'
1617
sink:
1718
- in_memory:
19+
acknowledgments: true
1820
testing_key: ConditionalRoutingIT_alpha
1921
routes:
2022
- alpha
2123
- in_memory:
24+
acknowledgments: true
2225
testing_key: ConditionalRoutingIT_beta
2326
routes:
2427
- beta
2528
- in_memory:
29+
acknowledgments: true
2630
testing_key: ConditionalRoutingIT_alpha_default
2731
routes:
2832
- alpha
2933
- _default
3034
- in_memory:
35+
acknowledgments: true
3136
testing_key: ConditionalRoutingIT_alpha_beta_gamma
3237
routes:
3338
- alpha
3439
- beta
3540
- gamma
3641
- in_memory:
42+
acknowledgments: true
3743
testing_key: ConditionalRoutingIT_default
3844
routes:
3945
- _default
4046
- in_memory:
47+
acknowledgments: true
4148
testing_key: ConditionalRoutingIT_all

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/router/Router.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.opensearch.dataprepper.model.record.Record;
1111

1212
import java.util.Collection;
13+
import java.util.HashMap;
1314
import java.util.HashSet;
1415
import java.util.Map;
1516
import java.util.Objects;
@@ -41,6 +42,8 @@ public <C> void route(
4142
Objects.requireNonNull(dataFlowComponents);
4243
Objects.requireNonNull(componentRecordsConsumer);
4344

45+
final Map<Object, Collection<Record>> componentRecords = new HashMap<>();
46+
4447
final Map<Record, Set<String>> recordsToRoutes = routeEventEvaluator.evaluateEventRoutes(allRecords);
4548

4649
boolean allRecordsRouted = false;
@@ -61,10 +64,14 @@ public <C> void route(
6164
recordsUnRouted.remove(record);
6265
}
6366
}
64-
componentRecordsConsumer.accept(component, records);
67+
componentRecords.put((C)component, records);
6568
});
6669
}
6770

71+
for (Map.Entry<Object, Collection<Record>> entry : componentRecords.entrySet()) {
72+
componentRecordsConsumer.accept((C)entry.getKey(), entry.getValue());
73+
}
74+
6875
if (recordsUnRouted != null) {
6976
for (Record record: recordsUnRouted) {
7077
if (record.getData() instanceof Event) {

0 commit comments

Comments
 (0)