From e4827ec64088f6e539cda8979ab63f8ff65e3c0e Mon Sep 17 00:00:00 2001 From: cenkakin Date: Fri, 20 Jan 2023 10:51:49 +0100 Subject: [PATCH 1/3] GH-2554: Fix DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME value Fixes #2554 --- .../kafka/retrytopic/RetryTopicBeanNames.java | 4 +- ...TopicMultipleListenerIntegrationTests.java | 254 ++++++++++++++++++ 2 files changed, 256 insertions(+), 2 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java index f9c7fd6487..966c77548d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBeanNames.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,6 +62,6 @@ private RetryTopicBeanNames() { * The bean name of the internally registered scheduler wrapper, if needed. */ public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME = - "defaultRetryTopicKafkaTemplate"; + "defaultRetryTopicSchedulerWrapper"; } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java new file mode 100644 index 0000000000..5a16dafabf --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java @@ -0,0 +1,254 @@ +/* + * Copyright 2021-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.DltHandler; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.retry.annotation.Backoff; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author Cenk Akin + * @since 3.0.3 + */ +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = {RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC, + RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC}, partitions = 1) +public class RetryTopicMultipleListenerIntegrationTests { + + private static final Logger logger = LoggerFactory.getLogger(RetryTopicMultipleListenerIntegrationTests.class); + + public final static String FIRST_TOPIC = "myRetryTopic1"; + + public final static String SECOND_TOPIC = "myRetryTopic2"; + + @Autowired + private KafkaTemplate sendKafkaTemplate; + + @Autowired + private CountDownLatchContainer latchContainer; + + @Test + void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) { + logger.debug("Sending message to topic " + FIRST_TOPIC); + sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); + logger.debug("Sending message to topic " + SECOND_TOPIC); + sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); + assertThat(awaitLatch(latchContainer.firstCountDownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.firstCountDownLatchDlt)).isTrue(); + assertThat(awaitLatch(latchContainer.secondCountDownLatch)).isTrue(); + assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue(); + verify(componentFactory).destinationTopicResolver(); + } + + private boolean awaitLatch(CountDownLatch latch) { + try { + return latch.await(150, TimeUnit.SECONDS); + } + catch (Exception e) { + fail(e.getMessage()); + throw new RuntimeException(e); + } + } + + @Component + static class FirstKafkaListener { + + @Autowired + CountDownLatchContainer countDownLatchContainer; + + @RetryableTopic( + attempts = "4", + backoff = @Backoff(delay = 10, multiplier = 2.0), + autoCreateTopics = "false", + topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE) + @KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC) + public void firstListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + countDownLatchContainer.firstCountDownLatch.countDown(); + logger.warn(in + " from " + topic); + throw new RuntimeException("test"); + } + + @DltHandler + public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + countDownLatchContainer.firstCountDownLatchDlt.countDown(); + logger.warn(in + " from " + topic); + } + } + + @Component + static class SecondKafkaListener { + + @Autowired + CountDownLatchContainer countDownLatchContainer; + + @RetryableTopic + @KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC) + public void secondListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + countDownLatchContainer.secondCountDownLatch.countDown(); + logger.info(in + " from " + topic); + throw new RuntimeException("another test"); + } + + @DltHandler + public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + countDownLatchContainer.secondCountDownLatchDlt.countDown(); + logger.warn(in + " from " + topic); + } + } + + @Component + static class CountDownLatchContainer { + + CountDownLatch firstCountDownLatch = new CountDownLatch(4); + CountDownLatch secondCountDownLatch = new CountDownLatch(3); + CountDownLatch firstCountDownLatchDlt = new CountDownLatch(1); + + CountDownLatch secondCountDownLatchDlt = new CountDownLatch(1); + CountDownLatch customizerLatch = new CountDownLatch(6); + } + + @EnableKafka + @Configuration + static class Config { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + CountDownLatchContainer latchContainer() { + return new CountDownLatchContainer(); + } + + @Bean + FirstKafkaListener firstKafkaListener() { + return new FirstKafkaListener(); + } + + @Bean + SecondKafkaListener secondKafkaListener() { + return new SecondKafkaListener(); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory, CountDownLatchContainer latchContainer) { + + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + ContainerProperties props = factory.getContainerProperties(); + props.setIdleEventInterval(100L); + props.setPollTimeout(50L); + props.setIdlePartitionEventInterval(100L); + factory.setConsumerFactory(consumerFactory); + DefaultErrorHandler errorHandler = new DefaultErrorHandler( + (cr, ex) -> latchContainer.secondCountDownLatch.countDown(), + new FixedBackOff(0, 2)); + factory.setCommonErrorHandler(errorHandler); + factory.setConcurrency(1); + factory.setContainerCustomizer( + container -> latchContainer.customizerLatch.countDown()); + return factory; + } + + @Bean + public ProducerFactory producerFactory() { + Map configProps = new HashMap<>(); + configProps.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + configProps.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + configProps.put( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } + + @Bean + public ConsumerFactory consumerFactory() { + Map props = new HashMap<>(); + props.put( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.broker.getBrokersAsString()); + props.put( + ConsumerConfig.GROUP_ID_CONFIG, + "groupId"); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class); + props.put( + ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return new DefaultKafkaConsumerFactory<>(props); + } + + @Bean + RetryTopicComponentFactory componentFactory() { + return spy(new RetryTopicComponentFactory()); + } + } +} From e88fb4939f539eaa10a5e5097397634dad650b34 Mon Sep 17 00:00:00 2001 From: cenkakin Date: Mon, 23 Jan 2023 17:58:05 +0100 Subject: [PATCH 2/3] Remove new IT and modify RetryTopicSameContainerFactoryIntegrationTests instead --- ...TopicMultipleListenerIntegrationTests.java | 254 ------------------ ...cSameContainerFactoryIntegrationTests.java | 57 ++-- 2 files changed, 28 insertions(+), 283 deletions(-) delete mode 100644 spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java deleted file mode 100644 index 5a16dafabf..0000000000 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicMultipleListenerIntegrationTests.java +++ /dev/null @@ -1,254 +0,0 @@ -/* - * Copyright 2021-2023 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.kafka.retrytopic; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.annotation.DltHandler; -import org.springframework.kafka.annotation.EnableKafka; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.annotation.RetryableTopic; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.retry.annotation.Backoff; -import org.springframework.stereotype.Component; -import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.backoff.FixedBackOff; - -/** - * @author Cenk Akin - * @since 3.0.3 - */ -@SpringJUnitConfig -@DirtiesContext -@EmbeddedKafka(topics = {RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC, - RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC}, partitions = 1) -public class RetryTopicMultipleListenerIntegrationTests { - - private static final Logger logger = LoggerFactory.getLogger(RetryTopicMultipleListenerIntegrationTests.class); - - public final static String FIRST_TOPIC = "myRetryTopic1"; - - public final static String SECOND_TOPIC = "myRetryTopic2"; - - @Autowired - private KafkaTemplate sendKafkaTemplate; - - @Autowired - private CountDownLatchContainer latchContainer; - - @Test - void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory componentFactory) { - logger.debug("Sending message to topic " + FIRST_TOPIC); - sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); - logger.debug("Sending message to topic " + SECOND_TOPIC); - sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); - assertThat(awaitLatch(latchContainer.firstCountDownLatch)).isTrue(); - assertThat(awaitLatch(latchContainer.firstCountDownLatchDlt)).isTrue(); - assertThat(awaitLatch(latchContainer.secondCountDownLatch)).isTrue(); - assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue(); - verify(componentFactory).destinationTopicResolver(); - } - - private boolean awaitLatch(CountDownLatch latch) { - try { - return latch.await(150, TimeUnit.SECONDS); - } - catch (Exception e) { - fail(e.getMessage()); - throw new RuntimeException(e); - } - } - - @Component - static class FirstKafkaListener { - - @Autowired - CountDownLatchContainer countDownLatchContainer; - - @RetryableTopic( - attempts = "4", - backoff = @Backoff(delay = 10, multiplier = 2.0), - autoCreateTopics = "false", - topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE) - @KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.FIRST_TOPIC) - public void firstListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.firstCountDownLatch.countDown(); - logger.warn(in + " from " + topic); - throw new RuntimeException("test"); - } - - @DltHandler - public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.firstCountDownLatchDlt.countDown(); - logger.warn(in + " from " + topic); - } - } - - @Component - static class SecondKafkaListener { - - @Autowired - CountDownLatchContainer countDownLatchContainer; - - @RetryableTopic - @KafkaListener(topics = RetryTopicMultipleListenerIntegrationTests.SECOND_TOPIC) - public void secondListener(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.secondCountDownLatch.countDown(); - logger.info(in + " from " + topic); - throw new RuntimeException("another test"); - } - - @DltHandler - public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.secondCountDownLatchDlt.countDown(); - logger.warn(in + " from " + topic); - } - } - - @Component - static class CountDownLatchContainer { - - CountDownLatch firstCountDownLatch = new CountDownLatch(4); - CountDownLatch secondCountDownLatch = new CountDownLatch(3); - CountDownLatch firstCountDownLatchDlt = new CountDownLatch(1); - - CountDownLatch secondCountDownLatchDlt = new CountDownLatch(1); - CountDownLatch customizerLatch = new CountDownLatch(6); - } - - @EnableKafka - @Configuration - static class Config { - - @Autowired - EmbeddedKafkaBroker broker; - - @Bean - CountDownLatchContainer latchContainer() { - return new CountDownLatchContainer(); - } - - @Bean - FirstKafkaListener firstKafkaListener() { - return new FirstKafkaListener(); - } - - @Bean - SecondKafkaListener secondKafkaListener() { - return new SecondKafkaListener(); - } - - @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConsumerFactory consumerFactory, CountDownLatchContainer latchContainer) { - - ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory); - ContainerProperties props = factory.getContainerProperties(); - props.setIdleEventInterval(100L); - props.setPollTimeout(50L); - props.setIdlePartitionEventInterval(100L); - factory.setConsumerFactory(consumerFactory); - DefaultErrorHandler errorHandler = new DefaultErrorHandler( - (cr, ex) -> latchContainer.secondCountDownLatch.countDown(), - new FixedBackOff(0, 2)); - factory.setCommonErrorHandler(errorHandler); - factory.setConcurrency(1); - factory.setContainerCustomizer( - container -> latchContainer.customizerLatch.countDown()); - return factory; - } - - @Bean - public ProducerFactory producerFactory() { - Map configProps = new HashMap<>(); - configProps.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - configProps.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class); - configProps.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class); - return new DefaultKafkaProducerFactory<>(configProps); - } - - @Bean - public KafkaTemplate kafkaTemplate() { - return new KafkaTemplate<>(producerFactory()); - } - - @Bean - public ConsumerFactory consumerFactory() { - Map props = new HashMap<>(); - props.put( - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, - this.broker.getBrokersAsString()); - props.put( - ConsumerConfig.GROUP_ID_CONFIG, - "groupId"); - props.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class); - props.put( - ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return new DefaultKafkaConsumerFactory<>(props); - } - - @Bean - RetryTopicComponentFactory componentFactory() { - return spy(new RetryTopicComponentFactory()); - } - } -} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 2e7d442364..7197ed11fe 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2022 the original author or authors. + * Copyright 2021-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,18 +48,14 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; -import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.retry.annotation.Backoff; -import org.springframework.scheduling.TaskScheduler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import org.springframework.util.backoff.FixedBackOff; /** * @author Tomaz Fernandes @@ -90,8 +86,9 @@ void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory compo logger.debug("Sending message to topic " + SECOND_TOPIC); sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); - assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue(); assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDlt1)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDlt2)).isTrue(); assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue(); verify(componentFactory).destinationTopicResolver(); } @@ -107,7 +104,7 @@ private boolean awaitLatch(CountDownLatch latch) { } @Component - static class RetryableKafkaListener { + static class FirstRetryableKafkaListener { @Autowired CountDownLatchContainer countDownLatchContainer; @@ -121,26 +118,35 @@ static class RetryableKafkaListener { public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { countDownLatchContainer.countDownLatch1.countDown(); logger.warn(in + " from " + topic); - throw new RuntimeException("test"); + throw new RuntimeException("from RetryableKafkaListener"); } @DltHandler public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.countDownLatchDltOne.countDown(); + countDownLatchContainer.countDownLatchDlt1.countDown(); logger.warn(in + " from " + topic); } } @Component - static class BasicKafkaListener { + static class SecondRetryableKafkaListener { @Autowired CountDownLatchContainer countDownLatchContainer; + @RetryableTopic @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC) public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - logger.info(in + " from " + topic); - throw new RuntimeException("another test"); + countDownLatchContainer.countDownLatch2.countDown(); + logger.warn(in + " from " + topic); + throw new RuntimeException("from SecondRetryableKafkaListener"); + } + + + @DltHandler + public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + countDownLatchContainer.countDownLatchDlt2.countDown(); + logger.warn(in + " from " + topic); } } @@ -148,14 +154,16 @@ public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) static class CountDownLatchContainer { CountDownLatch countDownLatch1 = new CountDownLatch(4); - CountDownLatch countDownLatch2 = new CountDownLatch(1); - CountDownLatch countDownLatchDltOne = new CountDownLatch(1); - CountDownLatch customizerLatch = new CountDownLatch(6); + CountDownLatch countDownLatch2 = new CountDownLatch(3); + + CountDownLatch countDownLatchDlt1 = new CountDownLatch(1); + CountDownLatch countDownLatchDlt2 = new CountDownLatch(1); + CountDownLatch customizerLatch = new CountDownLatch(9); } @EnableKafka @Configuration - static class Config extends RetryTopicConfigurationSupport { + static class Config { @Autowired EmbeddedKafkaBroker broker; @@ -166,13 +174,13 @@ CountDownLatchContainer latchContainer() { } @Bean - RetryableKafkaListener retryableKafkaListener() { - return new RetryableKafkaListener(); + FirstRetryableKafkaListener firstRetryableKafkaListener() { + return new FirstRetryableKafkaListener(); } @Bean - BasicKafkaListener basicKafkaListener() { - return new BasicKafkaListener(); + SecondRetryableKafkaListener secondRetryableKafkaListener() { + return new SecondRetryableKafkaListener(); } @Bean @@ -186,10 +194,6 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont props.setPollTimeout(50L); props.setIdlePartitionEventInterval(100L); factory.setConsumerFactory(consumerFactory); - DefaultErrorHandler errorHandler = new DefaultErrorHandler( - (cr, ex) -> latchContainer.countDownLatch2.countDown(), - new FixedBackOff(0, 2)); - factory.setCommonErrorHandler(errorHandler); factory.setConcurrency(1); factory.setContainerCustomizer( container -> latchContainer.customizerLatch.countDown()); @@ -238,11 +242,6 @@ public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(props); } - @Bean - TaskScheduler sched() { - return new ThreadPoolTaskScheduler(); - } - @Bean RetryTopicComponentFactory componentFactory() { return spy(new RetryTopicComponentFactory()); From 8d38dfc9bd962f8f2f496db6761d03283304e6f5 Mon Sep 17 00:00:00 2001 From: cenkakin Date: Mon, 23 Jan 2023 18:14:23 +0100 Subject: [PATCH 3/3] Re-add BasicKafkaListener --- ...cSameContainerFactoryIntegrationTests.java | 62 ++++++++++++++----- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java index 7197ed11fe..3473c8a6c7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicSameContainerFactoryIntegrationTests.java @@ -48,6 +48,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ContainerProperties; +import org.springframework.kafka.listener.DefaultErrorHandler; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; @@ -56,15 +57,17 @@ import org.springframework.stereotype.Component; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import org.springframework.util.backoff.FixedBackOff; /** * @author Tomaz Fernandes + * @author Cenk Akin * @since 2.8.3 */ @SpringJUnitConfig @DirtiesContext @EmbeddedKafka(topics = { RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC, - RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC}, partitions = 1) + RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC, RetryTopicSameContainerFactoryIntegrationTests.THIRD_TOPIC}, partitions = 1) public class RetryTopicSameContainerFactoryIntegrationTests { private static final Logger logger = LoggerFactory.getLogger(RetryTopicSameContainerFactoryIntegrationTests.class); @@ -73,6 +76,8 @@ public class RetryTopicSameContainerFactoryIntegrationTests { public final static String SECOND_TOPIC = "myRetryTopic2"; + public final static String THIRD_TOPIC = "myRetryTopic3"; + @Autowired private KafkaTemplate sendKafkaTemplate; @@ -85,10 +90,13 @@ void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory compo sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1"); logger.debug("Sending message to topic " + SECOND_TOPIC); sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2"); - assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue(); - assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue(); - assertThat(awaitLatch(latchContainer.countDownLatchDlt1)).isTrue(); - assertThat(awaitLatch(latchContainer.countDownLatchDlt2)).isTrue(); + logger.debug("Sending message to topic " + THIRD_TOPIC); + sendKafkaTemplate.send(THIRD_TOPIC, "Testing topic 3"); + assertThat(awaitLatch(latchContainer.countDownLatchFirstRetryable)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchSecondRetryable)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltSecond)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchBasic)).isTrue(); assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue(); verify(componentFactory).destinationTopicResolver(); } @@ -116,14 +124,14 @@ static class FirstRetryableKafkaListener { topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE) @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC) public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.countDownLatch1.countDown(); + countDownLatchContainer.countDownLatchFirstRetryable.countDown(); logger.warn(in + " from " + topic); - throw new RuntimeException("from RetryableKafkaListener"); + throw new RuntimeException("from FirstRetryableKafkaListener"); } @DltHandler public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.countDownLatchDlt1.countDown(); + countDownLatchContainer.countDownLatchDltOne.countDown(); logger.warn(in + " from " + topic); } } @@ -137,28 +145,39 @@ static class SecondRetryableKafkaListener { @RetryableTopic @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC) public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.countDownLatch2.countDown(); - logger.warn(in + " from " + topic); + countDownLatchContainer.countDownLatchSecondRetryable.countDown(); + logger.info(in + " from " + topic); throw new RuntimeException("from SecondRetryableKafkaListener"); } - @DltHandler public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { - countDownLatchContainer.countDownLatchDlt2.countDown(); + countDownLatchContainer.countDownLatchDltSecond.countDown(); logger.warn(in + " from " + topic); } } + + @Component + static class BasicKafkaListener { + + @KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.THIRD_TOPIC) + public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { + logger.info(in + " from " + topic); + throw new RuntimeException("from BasicKafkaListener"); + } + } + @Component static class CountDownLatchContainer { - CountDownLatch countDownLatch1 = new CountDownLatch(4); - CountDownLatch countDownLatch2 = new CountDownLatch(3); + CountDownLatch countDownLatchFirstRetryable = new CountDownLatch(4); + CountDownLatch countDownLatchSecondRetryable = new CountDownLatch(3); + CountDownLatch countDownLatchDltOne = new CountDownLatch(1); + CountDownLatch countDownLatchDltSecond = new CountDownLatch(1); - CountDownLatch countDownLatchDlt1 = new CountDownLatch(1); - CountDownLatch countDownLatchDlt2 = new CountDownLatch(1); - CountDownLatch customizerLatch = new CountDownLatch(9); + CountDownLatch countDownLatchBasic = new CountDownLatch(1); + CountDownLatch customizerLatch = new CountDownLatch(10); } @EnableKafka @@ -183,6 +202,11 @@ SecondRetryableKafkaListener secondRetryableKafkaListener() { return new SecondRetryableKafkaListener(); } + @Bean + BasicKafkaListener basicKafkaListener() { + return new BasicKafkaListener(); + } + @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory, CountDownLatchContainer latchContainer) { @@ -194,6 +218,10 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont props.setPollTimeout(50L); props.setIdlePartitionEventInterval(100L); factory.setConsumerFactory(consumerFactory); + DefaultErrorHandler errorHandler = new DefaultErrorHandler( + (cr, ex) -> latchContainer.countDownLatchBasic.countDown(), + new FixedBackOff(0, 2)); + factory.setCommonErrorHandler(errorHandler); factory.setConcurrency(1); factory.setContainerCustomizer( container -> latchContainer.customizerLatch.countDown());