Skip to content

Commit 0ba5cfd

Browse files
committed
Add support for invoking acknowledgmentSet callback on expiry
Signed-off-by: Kondaka <krishkdk@amazon.com>
1 parent 0660318 commit 0ba5cfd

File tree

16 files changed

+322
-16
lines changed

16 files changed

+322
-16
lines changed

data-prepper-api/src/main/java/org/opensearch/dataprepper/model/acknowledgements/AcknowledgementSetManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,16 @@ public interface AcknowledgementSetManager {
3030
* @since 2.2
3131
*/
3232
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout);
33+
34+
/**
35+
* Creates an acknowledgement set
36+
*
37+
* @param callback callback function to be invoked
38+
* @param timeout expiry timeout
39+
* @param invokeCallbackOnExpiry flag indicating if the callback function should be invoked on expiry
40+
*
41+
* @return AcknowledgementSet returns a new acknowledgement set
42+
* @since 2.15
43+
*/
44+
AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry);
3345
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSet.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,27 @@ public class DefaultAcknowledgementSet implements AcknowledgementSet {
4343
private ScheduledFuture<?> progressCheckFuture;
4444
private boolean completed;
4545
private AtomicInteger totalEventsAdded;
46+
private final boolean invokeCallbackOnExpiry;
4647

4748
public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor,
4849
final Consumer<Boolean> callback,
4950
final Duration expiryTime,
5051
final DefaultAcknowledgementSetMetrics metrics) {
52+
this(scheduledExecutor, callback, expiryTime, metrics, false);
53+
}
54+
public DefaultAcknowledgementSet(final ScheduledExecutorService scheduledExecutor,
55+
final Consumer<Boolean> callback,
56+
final Duration expiryTime,
57+
final DefaultAcknowledgementSetMetrics metrics,
58+
final boolean invokeCallbackOnExpiry) {
5159
this.callback = callback;
5260
this.result = true;
5361
this.totalEventsAdded = new AtomicInteger(0);
5462
this.scheduledExecutor = scheduledExecutor;
5563
this.expiryTime = Instant.now().plusMillis(expiryTime.toMillis());
5664
this.callbackFuture = null;
5765
this.metrics = metrics;
66+
this.invokeCallbackOnExpiry = invokeCallbackOnExpiry;
5867
this.completed = false;
5968
this.progressCheckCallback = null;
6069
pendingAcknowledgments = new HashMap<>();
@@ -134,7 +143,9 @@ public boolean isDone() {
134143
if (progressCheckFuture != null) {
135144
progressCheckFuture.cancel(false);
136145
}
137-
if (callbackFuture != null) {
146+
if (invokeCallbackOnExpiry) {
147+
callbackFuture = scheduledExecutor.submit(() -> callback.accept(false));
148+
} else if (callbackFuture != null) {
138149
callbackFuture.cancel(true);
139150
callbackFuture = null;
140151
LOG.warn("AcknowledgementSet expired");

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetManager.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
5151
return acknowledgementSet;
5252
}
5353

54+
public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry) {
55+
AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutor, callback, timeout, metrics, invokeCallbackOnExpiry);
56+
acknowledgementSetMonitor.add(acknowledgementSet);
57+
metrics.increment(DefaultAcknowledgementSetMetrics.CREATED_METRIC_NAME);
58+
return acknowledgementSet;
59+
}
60+
5461
public void shutdown() {
5562
acknowledgementSetMonitorThread.stop();
5663
}

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/acknowledgements/InactiveAcknowledgementSetManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ public AcknowledgementSet create(final Consumer<Boolean> callback, final Duratio
2929
throw new UnsupportedOperationException("create operation not supported");
3030
}
3131

32+
public AcknowledgementSet create(final Consumer<Boolean> callback, final Duration timeout, final boolean invokeCallbackOnExpiry) {
33+
throw new UnsupportedOperationException("create operation not supported");
34+
}
3235
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetManagerTests.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.awaitility.Awaitility.await;
2727
import static org.hamcrest.MatcherAssert.assertThat;
2828
import static org.hamcrest.Matchers.equalTo;
29+
import static org.hamcrest.Matchers.notNullValue;
2930
import static org.mockito.ArgumentMatchers.any;
3031
import static org.mockito.Mockito.doAnswer;
3132
import static org.mockito.Mockito.lenient;
@@ -205,7 +206,6 @@ void testWithProgressCheckCallbacks() {
205206
.untilAsserted(() -> {
206207
assertThat(result, equalTo(true));
207208
});
208-
209209
}
210210

211211
@Test
@@ -270,7 +270,40 @@ void testWithProgressCheckCallbacks_AcksExpire() {
270270
.untilAsserted(() -> {
271271
assertThat(result, equalTo(null));
272272
});
273-
273+
}
274+
275+
@Test
276+
void testCreateWithInvokeCallbackOnExpiryTrue() {
277+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT, true);
278+
assertThat(acknowledgementSet, notNullValue());
279+
}
280+
281+
@Test
282+
void testCreateWithInvokeCallbackOnExpiryFalse() {
283+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT, false);
284+
assertThat(acknowledgementSet, notNullValue());
285+
}
286+
287+
@Test
288+
void testExpirationWithInvokeCallbackOnExpiryTrue() throws InterruptedException {
289+
AcknowledgementSet acknowledgementSet = acknowledgementSetManager.create((flag) -> { result = flag; }, TEST_TIMEOUT, true);
290+
eventHandle3 = mock(DefaultEventHandle.class);
291+
lenient().doAnswer(a -> {
292+
Boolean res = (Boolean)a.getArgument(0);
293+
acknowledgementSet.release(eventHandle3, res);
294+
return null;
295+
}).when(eventHandle3).release(any(Boolean.class));
296+
event3 = mock(JacksonEvent.class);
297+
lenient().when(event3.getEventHandle()).thenReturn(eventHandle3);
298+
acknowledgementSet.add(event3);
299+
lenient().when(eventHandle3.getAcknowledgementSet()).thenReturn(acknowledgementSet);
300+
acknowledgementSet.complete();
301+
302+
Thread.sleep(TEST_TIMEOUT.multipliedBy(5).toMillis());
303+
await().atMost(TEST_TIMEOUT.multipliedBy(5))
304+
.untilAsserted(() -> {
305+
assertThat(result, equalTo(false));
306+
});
274307
}
275308

276309
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/DefaultAcknowledgementSetTests.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class DefaultAcknowledgementSetTests {
6262
private DefaultAcknowledgementSetMetrics metrics;
6363
private int invalidAcquiresCounter;
6464
private int invalidReleasesCounter;
65-
65+
6666
private void setupMetrics() {
6767
metrics = mock(DefaultAcknowledgementSetMetrics.class);
6868
lenient().doAnswer(a -> {
@@ -168,7 +168,7 @@ void testDefaultAcknowledgementSetWithCustomCallback() throws Exception {
168168
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
169169
(flag) -> {
170170
acknowledgementSetResult = flag;
171-
}
171+
}
172172
);
173173
defaultAcknowledgementSet.add(event);
174174
defaultAcknowledgementSet.complete();
@@ -187,7 +187,7 @@ void testDefaultAcknowledgementSetNegativeAcknowledgements() throws Exception {
187187
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
188188
(flag) -> {
189189
acknowledgementSetResult = flag;
190-
}
190+
}
191191
);
192192
defaultAcknowledgementSet.add(event);
193193
defaultAcknowledgementSet.complete();
@@ -221,7 +221,7 @@ void testDefaultAcknowledgementSetExpirations() throws Exception {
221221
} catch (Exception e) {
222222
callbackInterrupted.set(true);
223223
}
224-
}
224+
}
225225
);
226226
defaultAcknowledgementSet.add(event);
227227
defaultAcknowledgementSet.complete();
@@ -248,7 +248,7 @@ void testDefaultAcknowledgementSetWithProgressCheck() throws Exception {
248248
defaultAcknowledgementSet = createObjectUnderTestWithCallback(
249249
(flag) -> {
250250
acknowledgementSetResult = flag;
251-
}
251+
}
252252
);
253253
defaultAcknowledgementSet.addProgressCheck(
254254
(progressCheck) -> {
@@ -313,4 +313,40 @@ void shutdown_cancels_progress_check_and_callback_future() throws NoSuchFieldExc
313313
verify(callbackFuture).cancel(false);
314314
verify(progressCheck).cancel(true);
315315
}
316+
317+
@Test
318+
void testDefaultAcknowledgementSetWithCallbackOnExpiryTrue() throws Exception {
319+
setupMetrics();
320+
defaultAcknowledgementSet = new DefaultAcknowledgementSet(executor, (flag) -> {
321+
acknowledgementSetResult = flag;
322+
}, TEST_TIMEOUT, metrics, true);
323+
324+
defaultAcknowledgementSet.add(event);
325+
defaultAcknowledgementSet.complete();
326+
327+
Thread.sleep(TEST_TIMEOUT.multipliedBy(2).toMillis());
328+
329+
Awaitility.waitAtMost(Duration.ofSeconds(15))
330+
.pollDelay(Duration.ofMillis(500))
331+
.until(() -> defaultAcknowledgementSet.isDone());
332+
assertThat(acknowledgementSetResult, equalTo(false));
333+
}
334+
335+
@Test
336+
void testDefaultAcknowledgementSetWithCallbackOnExpiryFalse() throws Exception {
337+
setupMetrics();
338+
defaultAcknowledgementSet = new DefaultAcknowledgementSet(executor, (flag) -> {
339+
acknowledgementSetResult = flag;
340+
}, TEST_TIMEOUT, metrics, false);
341+
342+
defaultAcknowledgementSet.add(event);
343+
defaultAcknowledgementSet.complete();
344+
345+
Thread.sleep(TEST_TIMEOUT.multipliedBy(2).toMillis());
346+
347+
Awaitility.waitAtMost(Duration.ofSeconds(10))
348+
.pollDelay(Duration.ofMillis(500))
349+
.until(() -> defaultAcknowledgementSet.isDone());
350+
assertThat(acknowledgementSetResult, equalTo(null));
351+
}
316352
}

data-prepper-core/src/test/java/org/opensearch/dataprepper/core/acknowledgements/InactiveAcknowledgementSetManagerTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,11 @@ void testCreateAPI() {
3232
assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10)));
3333
}
3434

35+
@Test
36+
void testCreateAPIWithInvokeCallbackOnExpiry() {
37+
assertThat(acknowledgementSetManager, notNullValue());
38+
assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10), true));
39+
assertThrows(UnsupportedOperationException.class, () -> acknowledgementSetManager.create((a)->{}, Duration.ofMillis(10), false));
40+
}
41+
3542
}

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
102102
this.shutdownInProgress = new AtomicBoolean(false);
103103
final PluginMetrics consumerMetrics = PluginMetrics.fromNames(metricPrefixName + READ, pluginSetting.getPipelineName());
104104
this.consumers = kafkaCustomConsumerFactory.createConsumersForTopic(kafkaBufferConfig, kafkaBufferConfig.getTopic(),
105-
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker, customCompressionOption);
105+
innerBuffer, consumerMetrics, acknowledgementSetManager, byteDecoder, shutdownInProgress, false, circuitBreaker, customCompressionOption, true);
106106
this.kafkaAdminAccessor = new KafkaAdminAccessor(kafkaBufferConfig, List.of(kafkaBufferConfig.getTopic().getGroupId()));
107107
this.executorService = Executors.newFixedThreadPool(consumers.size(), KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE));
108108
consumers.forEach(this.executorService::submit);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
3131
private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30);
32+
static final Duration DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT = Duration.ofHours(2);
3233

3334
@JsonProperty("bootstrap_servers")
3435
private List<String> bootstrapServers;
@@ -55,6 +56,9 @@ class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
5556
@JsonProperty("drain_timeout")
5657
private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT;
5758

59+
@JsonProperty("acknowledgements_timeout")
60+
private Duration acknowledgementsTimeout = DEFAULT_ACKNOWLEDGEMENTS_TIMEOUT;
61+
5862
@JsonProperty("custom_metric_prefix")
5963
private String customMetricPrefix;
6064

@@ -144,6 +148,12 @@ public Duration getDrainTimeout() {
144148
return drainTimeout;
145149
}
146150

151+
@Override
152+
@JsonIgnore
153+
public Duration getAcknowledgementsTimeout() {
154+
return acknowledgementsTimeout;
155+
}
156+
147157
@JsonIgnore
148158
public Optional<String> getCustomMetricPrefix() {
149159
return Optional.ofNullable(customMetricPrefix);

data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/configuration/KafkaConsumerConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,15 @@
55

66
package org.opensearch.dataprepper.plugins.kafka.configuration;
77

8+
import java.time.Duration;
89
import java.util.List;
910

1011
public interface KafkaConsumerConfig extends KafkaConnectionConfig {
1112
String getClientDnsLookup();
1213

1314
boolean getAcknowledgementsEnabled();
15+
16+
Duration getAcknowledgementsTimeout();
1417

1518
SchemaConfig getSchemaConfig();
1619

0 commit comments

Comments
 (0)