RetryTopicConfigurationBuilder#includeTopics does not work with the List #2636
-
I tried the following for non-blocking retry My listener
request-topic is the name of the topic. "${redacted-request-topic}" resolves to request-topic When the includeTopics is commented out, it is working as expected i.e the retry topic name, dlt name and exponential back off kicks in but when includeTopics is uncommented none of these work and it retries in blocking mode. I can include the logs but have to redact some information. I will try to share redacted logs if they are really required. Kindly let me know what I have missed here. I am using spring kafka 2.8.11 I see a similar issue - #1916 and I am sorry I commented in #1920 |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
This doesn't work for me with, or without, the @SpringBootApplication
public class Kgh2676Application {
public static void main(String[] args) {
SpringApplication.run(Kgh2676Application.class, args);
}
@KafkaListener(id = "kgh2676", topics = "${kgh2676}")
void listen(String in) {
System.out.println(in);
}
@Bean
RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kgh2676}") String topic) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of(topic))
.exponentialBackoff(3000, 2, 100000)
.maxAttempts(5)
.retryTopicSuffix("-asl-nm.retry")
.dltSuffix("-asl-nm.dlt")
.create(kafkaTemplate);
}
} The problem being that the Moving the @SpringBootApplication
public class Kgh2676Application {
public static void main(String[] args) {
SpringApplication.run(Kgh2676Application.class, args);
}
@KafkaListener(id = "kgh2676", topics = "${kgh2676}")
void listen(String in) {
System.out.println(in);
}
}
@Configuration
class OtherConfiguration
{
@Bean
RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, byte[]> kafkaTemplate,
@Value("${kgh2676}") String topic) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of(topic))
.exponentialBackoff(3000, 2, 100000)
.maxAttempts(5)
.retryTopicSuffix("-asl-nm.retry")
.dltSuffix("-asl-nm.dlt")
.create(kafkaTemplate);
}
} kgh2676=myTopic
|
Beta Was this translation helpful? Give feedback.
-
Let me give more details MyListener.java
WorkerKafkaConsumerConfig.java annotated with @configuration in the class level has the following
Integration test with embedded kafka has the following
In the integration test I am producing the message. I am first testing this using my integration test. Based on your above answer I believe the issue might be somewhere here - classes = {AppEntry.class, WorkerKafkaConsumerConfig.class}) I tried making myRetryTopic method static as well but no joy. Any leads will be helpful. Thanks for the efforts so far. |
Beta Was this translation helpful? Give feedback.
-
@garyrussell It works now. Hence marking the answer. Thanks for your time on this. |
Beta Was this translation helpful? Give feedback.
This doesn't work for me with, or without, the
includeTopics()
...