Skip to content

Commit c92ec76

Browse files
committed
GH-2464: Fix Delivery Attempt Header (Rare)
Resolves #2464 `KafkaBackoffException` should never be classified as retryable; the design is such that those exceptions go straight to the recoverer, which will re-throw. If, however, a user configures a super class as retryable (e.g. `RuntimeException`) then its classification wins. This causes a new retry state (`FailedRecord`) to be started, causing an incorrect value of 2 instead of 1 on the first delivery from a retry topic. **I will backport - conflicts expected**
1 parent 921e6ae commit c92ec76

File tree

2 files changed

+140
-1
lines changed

2 files changed

+140
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public abstract class FailedRecordProcessor extends ExceptionClassifier implemen
4646

4747
private final BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> noRetriesForClassified =
4848
(rec, ex) -> {
49-
if (!getClassifier().classify(ex)) {
49+
if (!getClassifier().classify(ex) || SeekUtils.isBackoffException(ex)) {
5050
return NO_RETRIES_OR_DELAY_BACKOFF;
5151
}
5252
return this.userBackOffFunction.apply(rec, ex);
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.retrytopic;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.function.Consumer;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.beans.factory.annotation.Autowired;
30+
import org.springframework.context.annotation.Bean;
31+
import org.springframework.context.annotation.Configuration;
32+
import org.springframework.kafka.annotation.EnableKafka;
33+
import org.springframework.kafka.annotation.KafkaListener;
34+
import org.springframework.kafka.annotation.RetryableTopic;
35+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
36+
import org.springframework.kafka.core.ConsumerFactory;
37+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
38+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
39+
import org.springframework.kafka.core.KafkaTemplate;
40+
import org.springframework.kafka.core.ProducerFactory;
41+
import org.springframework.kafka.support.KafkaHeaders;
42+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
43+
import org.springframework.kafka.test.context.EmbeddedKafka;
44+
import org.springframework.kafka.test.utils.KafkaTestUtils;
45+
import org.springframework.messaging.handler.annotation.Header;
46+
import org.springframework.retry.annotation.Backoff;
47+
import org.springframework.scheduling.TaskScheduler;
48+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
49+
import org.springframework.test.annotation.DirtiesContext;
50+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
51+
import org.springframework.util.backoff.FixedBackOff;
52+
53+
/**
54+
* @author Gary Russell
55+
* @since 2.8.11
56+
*
57+
*/
58+
@SpringJUnitConfig
59+
@DirtiesContext
60+
@EmbeddedKafka(topics = "dh1")
61+
public class DeliveryHeaderTests extends AbstractRetryTopicIntegrationTests {
62+
63+
@Test
64+
void deliveryAttempts(@Autowired Config config, @Autowired KafkaTemplate<Integer, String> template)
65+
throws InterruptedException {
66+
67+
template.send("dh1", "test");
68+
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
69+
assertThat(config.attempts.toString())
70+
.isEqualTo("[[1, 1], [2, 1], [3, 1], [1, 2], [2, 2], [3, 2], [1, 3], [2, 3], [3, 3]]");
71+
}
72+
73+
@Configuration
74+
@EnableKafka
75+
public static class Config extends RetryTopicConfigurationSupport {
76+
77+
@Autowired
78+
EmbeddedKafkaBroker broker;
79+
80+
List<List<Integer>> attempts = new ArrayList<>();
81+
82+
CountDownLatch latch = new CountDownLatch(9);
83+
84+
@Override
85+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
86+
blockingRetries.retryOn(RuntimeException.class)
87+
.backOff(new FixedBackOff(0, 2));
88+
}
89+
90+
@Override
91+
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
92+
return factory -> factory.neverLogListenerException();
93+
}
94+
95+
@RetryableTopic(backoff = @Backoff(maxDelay = 0))
96+
@KafkaListener(id = "dh1", topics = "dh1")
97+
void listen(String in, @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery,
98+
@Header(name = RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, required = false) Integer retryAttempts) {
99+
100+
this.attempts.add(List.of(delivery, retryAttempts == null ? 1 : retryAttempts));
101+
this.latch.countDown();
102+
throw new RuntimeException("test");
103+
}
104+
105+
@Bean
106+
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
107+
return new KafkaTemplate<>(pf);
108+
}
109+
110+
@Bean
111+
ProducerFactory<Integer, String> pf() {
112+
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker));
113+
}
114+
115+
@Bean
116+
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
117+
ConsumerFactory<Integer, String> cf) {
118+
119+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
120+
new ConcurrentKafkaListenerContainerFactory<>();
121+
factory.setConsumerFactory(cf);
122+
factory.getContainerProperties().setDeliveryAttemptHeader(true);
123+
return factory;
124+
}
125+
126+
@Bean
127+
ConsumerFactory<Integer, String> cf() {
128+
return new DefaultKafkaConsumerFactory<>(
129+
KafkaTestUtils.consumerProps("dh1", "false", this.broker));
130+
}
131+
132+
@Bean
133+
TaskScheduler sched() {
134+
return new ThreadPoolTaskScheduler();
135+
}
136+
137+
}
138+
139+
}

0 commit comments

Comments
 (0)