Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log: reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipeline
Similar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.
I think the problem is that the inPipeline variable changes in different objects.
isPipeline decreases in object created here:
|
CommittableBatch offsetBatch = new CommittableBatch(); |
|
for (ConsumerRecord<K, V> r : consumerRecords) { |
|
offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset()); |
but increases in another object here:
|
this.commitBatch.addUncommitted(records); |
So variable inPipeline is always positive and the condition here is always waits until maxDelayRebalance:
|
int inPipeline = commitEvent.commitBatch.getInPipeline(); |
|
if (inPipeline > 0 || this.awaitingTransaction.get()) { |
|
long end = maxDelayRebalance + System.currentTimeMillis(); |
|
do { |
|
try { |
|
log.debug("Rebalancing; waiting for {} records in pipeline", inPipeline); |
|
Thread.sleep(interval); |
|
commitEvent.runIfRequired(true); |
|
} catch (InterruptedException e) { |
|
Thread.currentThread().interrupt(); |
|
break; |
|
} |
|
inPipeline = commitEvent.commitBatch.getInPipeline(); |
|
} while (isActive.get() && (inPipeline > 0 || this.awaitingTransaction.get()) |
|
&& System.currentTimeMillis() < end); |
Expected Behavior
Rebalancing should complete after all read messages have been processed and the transaction has committed.
Actual Behavior
Rebalancing always waits until maxDelayRebalance.
Example to Reproduce
https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(
Map.of(
ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString(),
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "consumer-group",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
)
)
.addAssignListener(onAssign -> log.info("For {} assigned: {}", name, onAssign))
.addRevokeListener(onRevoke -> log.info("For {} revoked: {}", name, onRevoke))
.maxDeferredCommits(100)
.maxDelayRebalance(REBALANCE_LIMIT)
.subscription(List.of(TOPIC_NAME));
KafkaReceiver.create(receiverOptions)
.receiveExactlyOnce(innerSender.transactionManager())
.concatMap(r -> r.groupBy(ConsumerRecord::partition, 1)
.flatMap(c -> innerSender.send(
c.map(cr -> {
try {
log.info("Handle message by {} with key: {}", name, Integer.parseInt(cr.key()));
latch.countDown();
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (Integer.parseInt(cr.key()) % 2 == 0) {
return new ProducerRecord<>(TOPIC_RESULT1, cr.key(), cr.value());
} else {
return new ProducerRecord<>(TOPIC_RESULT2, cr.key(), cr.value());
}
})
.map(pr -> SenderRecord.create(pr, pr.key()))
))
.then(innerSender.transactionManager().commit()),
0
)
.subscribe();
Environment
- reactor-kafka 1.3.21
- kafka-clients 2.8.2
- reactor-core 3.5.7
Hello, I have a topic with multiple partitions and multiple app instances. I use AckMode.EXACTLY_ONCE with transactional sender.
Rebalance always waits until maxDelayRebalance.
I get a repeated message in the log:
reactor.kafka.receiver.internals.ConsumerEventLoop - Rebalancing; waiting for N records in pipelineSimilar to the issue discussed here in the comments , but I have AckMode.EXACTLY_ONCE.
I think the problem is that the
inPipelinevariable changes in different objects.isPipelinedecreases in object created here:reactor-kafka/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java
Lines 133 to 135 in 5436ae9
but increases in another object here:
reactor-kafka/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java
Line 377 in 5436ae9
So variable
inPipelineis always positive and the condition here is always waits until maxDelayRebalance:reactor-kafka/src/main/java/reactor/kafka/receiver/internals/ConsumerEventLoop.java
Lines 170 to 184 in 5436ae9
Expected Behavior
Rebalancing should complete after all read messages have been processed and the transaction has committed.
Actual Behavior
Rebalancing always waits until maxDelayRebalance.
Example to Reproduce
https://github.com/damn1kk/kafka-rebalance-issue/blob/master/src/test/java/KafkaRebalanceTest.java
Environment