Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.9.5
Describe the bug
In this case consumption from the partition is not resumed if
- partition was paused by retry delay on the given consumer
- unassigned
- and then after backoff delay reassigned back
After upgrade to 2.9.5 to get bugfix #2525 I still see sometimes partitions are getting paused and not resumed after k8 rolling update.
To Reproduce
Cause retry to pause P0 in C0.
Cause rebalance and have P0 unassigned from C0
Wait for backoff delay and have the resume logic run on C0
Assign P0 back to C0
Consumption from P0 will not be resumed after this
Maybe one thing to note here is that I am using ConcurrentKafkaListenerContainerFactory
In the resume part of ConcurrentMessageListenerContainer I see that it checks that partition is assigned before resuming it:
@Override
public void resumePartition(TopicPartition topicPartition) {
synchronized (this.lifecycleMonitor) {
this.containers
.stream()
.filter(container -> containsPartition(topicPartition, container))
.forEach(container -> container.resumePartition(topicPartition));
}
}
What I think is happening is that scheduled resume task is being ignored due to partition is not assigned, which leaves partition in
AbstractMessageListenerContainer.pauseRequestedPartitions.
And then when partition is assigned back to the C0 is it being paused in KafkaMessageListenerContainer.onPartitionsAssigned listener and then there is no trigger anymore to resume it.
Expected behavior
Consuming should be resumed for a given partition