Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import io.micrometer.observation.Observation;
Expand Down Expand Up @@ -2085,14 +2086,14 @@ private void processAcks(ConsumerRecords<K, V> records) {
private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
TopicPartition part = new TopicPartition(cRecord.topic(), cRecord.partition());
List<Long> offs = this.offsetsInThisBatch.get(part);
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
if (!offs.isEmpty()) {
if (!ObjectUtils.isEmpty(offs)) {
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
if (offs.get(0) == cRecord.offset()) {
offs.remove(0);
ConsumerRecord<K, V> recordToAck = cRecord;
if (!deferred.isEmpty()) {
Collections.sort(deferred, (a, b) -> Long.compare(a.offset(), b.offset()));
while (!deferred.isEmpty() && deferred.get(0).offset() == recordToAck.offset() + 1) {
while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) {
recordToAck = deferred.remove(0);
offs.remove(0);
}
Expand Down Expand Up @@ -3434,8 +3435,8 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
@Override
public void acknowledge() {
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
if (!this.acked) {
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
for (ConsumerRecord<K, V> cRecord : getHighestOffsetRecords(this.records)) {
if (offs != null) {
offs.remove(new TopicPartition(cRecord.topic(), cRecord.partition()));
Expand Down Expand Up @@ -3529,11 +3530,15 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
ListenerConsumer.this.pausedForNack.removeAll(partitions);
partitions.forEach(ListenerConsumer.this.lastCommits::remove);
synchronized (ListenerConsumer.this) {
if (ListenerConsumer.this.offsetsInThisBatch != null) {
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
if (pendingOffsets != null) {
partitions.forEach(tp -> {
ListenerConsumer.this.offsetsInThisBatch.remove(tp);
pendingOffsets.remove(tp);
ListenerConsumer.this.deferredOffsets.remove(tp);
});
if (pendingOffsets.isEmpty()) {
ListenerConsumer.this.consumerPaused = false;
}
}
}
}
Expand Down Expand Up @@ -3586,7 +3591,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}

private void repauseIfNeeded(Collection<TopicPartition> partitions) {
if (isPaused() || ListenerConsumer.this.remainingRecords != null && !partitions.isEmpty()) {
boolean pending = false;
synchronized (ListenerConsumer.this) {
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
if (pendingOffsets != null && !pendingOffsets.isEmpty()) {
pending = true;
}
}
if ((pending || isPaused() || ListenerConsumer.this.remainingRecords != null)
&& !partitions.isEmpty()) {

ListenerConsumer.this.consumer.pause(partitions);
ListenerConsumer.this.consumerPaused = true;
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.event.ListenerContainerIdleEvent;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -904,7 +906,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLagacyAssignor() throws InterruptedException {
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
TopicPartition tp2 = new TopicPartition("foo", 2);
Expand Down Expand Up @@ -1116,6 +1118,178 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
assertThat(recordsDelivered.get(2)).isEqualTo(record1);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gary, I don't see any Assignor settings in the test code.
How does it work, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simulated.

With a legacy assignor, all current assignments are revoked and all (or a subset) are re-assigned. With a coop assignor, only a subset are revoked and (possibly) previously unassigned partitions might be assigned.

See the case 1: poll phases in both tests - that is where the simulation is. In the coop case, I test that consumer.pause() is called twice (once for the original assignments and once for the newly assigned partition).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool. Thank you!

TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
CountDownLatch subscribeLatch = new CountDownLatch(1);
willAnswer(invocation -> {
rebal.set(invocation.getArgument(1));
subscribeLatch.countDown();
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
ConsumerFactory cf = mock(ConsumerFactory.class);
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
given(cf.getConfigurationProperties())
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
ContainerProperties containerProperties = new ContainerProperties("foo");
containerProperties.setGroupId("grp");
containerProperties.setAckMode(AckMode.MANUAL);
containerProperties.setMessageListener(ackOffset1());
containerProperties.setAsyncAcks(true);
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
containerProperties);
CountDownLatch pollLatch = new CountDownLatch(2);
CountDownLatch rebalLatch = new CountDownLatch(1);
CountDownLatch continueLatch = new CountDownLatch(1);
willAnswer(inv -> {
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
rebal.get().onPartitionsRevoked(allAssignments);
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
default:
return ConsumerRecords.empty();
}
}).given(consumer).poll(any());
container.start();
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
.getPropertyValue(container, "containers", List.class).get(0);
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
assertThat(offsets).hasSize(0);
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isFalse();
continueLatch.countDown();
// no pause when re-assigned because all revoked
verify(consumer).pause(any());
verify(consumer, never()).resume(any());
container.stop();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException {
TopicPartition tp0 = new TopicPartition("foo", 0);
TopicPartition tp1 = new TopicPartition("foo", 1);
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
allRecordMap.put(tp0,
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
allRecordMap.put(tp1,
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
AtomicInteger pollPhase = new AtomicInteger();

Consumer consumer = mock(Consumer.class);
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
CountDownLatch subscribeLatch = new CountDownLatch(1);
willAnswer(invocation -> {
rebal.set(invocation.getArgument(1));
subscribeLatch.countDown();
return null;
}).given(consumer).subscribe(any(Collection.class), any());
CountDownLatch pauseLatch = new CountDownLatch(1);
AtomicBoolean paused = new AtomicBoolean();
willAnswer(inv -> {
paused.set(true);
pauseLatch.countDown();
return null;
}).given(consumer).pause(any());
ConsumerFactory cf = mock(ConsumerFactory.class);
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
given(cf.getConfigurationProperties())
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
ContainerProperties containerProperties = new ContainerProperties("foo");
containerProperties.setGroupId("grp");
containerProperties.setAckMode(AckMode.MANUAL);
containerProperties.setMessageListener(ackOffset1());
containerProperties.setAsyncAcks(true);
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
containerProperties);
CountDownLatch pollLatch = new CountDownLatch(2);
CountDownLatch rebalLatch = new CountDownLatch(1);
CountDownLatch continueLatch = new CountDownLatch(1);
willAnswer(inv -> {
Thread.sleep(50);
pollLatch.countDown();
switch (pollPhase.getAndIncrement()) {
case 0:
rebal.get().onPartitionsAssigned(allAssignments);
return allRecords;
case 1:
rebal.get().onPartitionsRevoked(List.of(tp0));
rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2)));
rebalLatch.countDown();
continueLatch.await(10, TimeUnit.SECONDS);
default:
return ConsumerRecords.empty();
}
}).given(consumer).poll(any());
container.start();
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
.getPropertyValue(container, "containers", List.class).get(0);
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
assertThat(offsets).hasSize(1);
assertThat(offsets.get(tp1)).isNotNull();
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isTrue();
continueLatch.countDown();
verify(consumer, times(2)).pause(any());
verify(consumer, never()).resume(any());
container.stop();
}

@SuppressWarnings("rawtypes")
private AcknowledgingMessageListener ackOffset1() {
return new AcknowledgingMessageListener() {

@Override
public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) {
if (rec.offset() == 1) {
ack.acknowledge();
}
}

@Override
public void onMessage(Object data) {
}

};
}

public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {

private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();
Expand Down