Skip to content

Commit 46372df

Browse files
mikael-carlstedtgaryrussell
authored andcommitted
GH-2489: Retry Commits If Necessary on Revoke
Closes #2489 * Retry commits that have failed temporarily due to rebalance in progress when onPartitionsRevoked is called. * Adjust expectations for unit tests where commits are retried (previous expectation accepted the defect that failed commits for subsequently revoked partitions were not retried)
1 parent 9de314f commit 46372df

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3490,6 +3490,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
34903490
}
34913491
try {
34923492
// Wait until now to commit, in case the user listener added acks
3493+
checkRebalanceCommits();
34933494
commitPendingAcks();
34943495
fixTxOffsetsIfNeeded();
34953496
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3366,7 +3366,7 @@ void testCommitRebalanceInProgressBatch() throws Exception {
33663366
assertThat(commits).hasSize(3);
33673367
assertThat(commits.get(0)).hasSize(2); // assignment
33683368
assertThat(commits.get(1)).hasSize(2); // batch commit
3369-
assertThat(commits.get(2)).hasSize(1); // re-commit
3369+
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
33703370
});
33713371
}
33723372

@@ -3379,7 +3379,7 @@ void testCommitRebalanceInProgressRecord() throws Exception {
33793379
assertThat(commits.get(2)).hasSize(1);
33803380
assertThat(commits.get(3)).hasSize(1);
33813381
assertThat(commits.get(4)).hasSize(1);
3382-
assertThat(commits.get(5)).hasSize(1); // re-commit
3382+
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
33833383
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
33843384
.isNotNull()
33853385
.extracting(om -> om.offset())

0 commit comments

Comments
 (0)