Skip to content

GH-2554: Fix DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME value #2555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,6 @@ private RetryTopicBeanNames() {
* The bean name of the internally registered scheduler wrapper, if needed.
*/
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";
"defaultRetryTopicSchedulerWrapper";

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,21 +54,20 @@
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Tomaz Fernandes
* @author Cenk Akin
* @since 2.8.3
*/
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = { RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC,
RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC}, partitions = 1)
RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC, RetryTopicSameContainerFactoryIntegrationTests.THIRD_TOPIC}, partitions = 1)
public class RetryTopicSameContainerFactoryIntegrationTests {

private static final Logger logger = LoggerFactory.getLogger(RetryTopicSameContainerFactoryIntegrationTests.class);
Expand All @@ -77,6 +76,8 @@ public class RetryTopicSameContainerFactoryIntegrationTests {

public final static String SECOND_TOPIC = "myRetryTopic2";

public final static String THIRD_TOPIC = "myRetryTopic3";

@Autowired
private KafkaTemplate<String, String> sendKafkaTemplate;

Expand All @@ -89,9 +90,13 @@ void shouldRetryFirstAndSecondTopics(@Autowired RetryTopicComponentFactory compo
sendKafkaTemplate.send(FIRST_TOPIC, "Testing topic 1");
logger.debug("Sending message to topic " + SECOND_TOPIC);
sendKafkaTemplate.send(SECOND_TOPIC, "Testing topic 2");
assertThat(awaitLatch(latchContainer.countDownLatch1)).isTrue();
logger.debug("Sending message to topic " + THIRD_TOPIC);
sendKafkaTemplate.send(THIRD_TOPIC, "Testing topic 3");
assertThat(awaitLatch(latchContainer.countDownLatchFirstRetryable)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchDltOne)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatch2)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchSecondRetryable)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchDltSecond)).isTrue();
assertThat(awaitLatch(latchContainer.countDownLatchBasic)).isTrue();
assertThat(awaitLatch(latchContainer.customizerLatch)).isTrue();
verify(componentFactory).destinationTopicResolver();
}
Expand All @@ -107,7 +112,7 @@ private boolean awaitLatch(CountDownLatch latch) {
}

@Component
static class RetryableKafkaListener {
static class FirstRetryableKafkaListener {

@Autowired
CountDownLatchContainer countDownLatchContainer;
Expand All @@ -119,9 +124,9 @@ static class RetryableKafkaListener {
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.FIRST_TOPIC)
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
countDownLatchContainer.countDownLatch1.countDown();
countDownLatchContainer.countDownLatchFirstRetryable.countDown();
logger.warn(in + " from " + topic);
throw new RuntimeException("test");
throw new RuntimeException("from FirstRetryableKafkaListener");
}

@DltHandler
Expand All @@ -132,30 +137,52 @@ public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
}

@Component
static class BasicKafkaListener {
static class SecondRetryableKafkaListener {

@Autowired
CountDownLatchContainer countDownLatchContainer;

@RetryableTopic
@KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.SECOND_TOPIC)
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
countDownLatchContainer.countDownLatchSecondRetryable.countDown();
logger.info(in + " from " + topic);
throw new RuntimeException("another test");
throw new RuntimeException("from SecondRetryableKafkaListener");
}

@DltHandler
public void dlt(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
countDownLatchContainer.countDownLatchDltSecond.countDown();
logger.warn(in + " from " + topic);
}
}


@Component
static class BasicKafkaListener {

@KafkaListener(topics = RetryTopicSameContainerFactoryIntegrationTests.THIRD_TOPIC)
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
logger.info(in + " from " + topic);
throw new RuntimeException("from BasicKafkaListener");
}
}

@Component
static class CountDownLatchContainer {

CountDownLatch countDownLatch1 = new CountDownLatch(4);
CountDownLatch countDownLatch2 = new CountDownLatch(1);
CountDownLatch countDownLatchFirstRetryable = new CountDownLatch(4);
CountDownLatch countDownLatchSecondRetryable = new CountDownLatch(3);
CountDownLatch countDownLatchDltOne = new CountDownLatch(1);
CountDownLatch customizerLatch = new CountDownLatch(6);
CountDownLatch countDownLatchDltSecond = new CountDownLatch(1);

CountDownLatch countDownLatchBasic = new CountDownLatch(1);
CountDownLatch customizerLatch = new CountDownLatch(10);
}

@EnableKafka
@Configuration
static class Config extends RetryTopicConfigurationSupport {
static class Config {
Copy link
Contributor Author

@cenkakin cenkakin Jan 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't extend RetryTopicConfigurationSupport and shouldn't inject our own ThreadPoolTaskScheduler ->
https://github.com/spring-projects/spring-kafka/pull/2555/files#diff-80d8fe2567e6640c6d1edada33ec0bf783012606c2264cf4fc6b3aed4cca9742L242


@Autowired
EmbeddedKafkaBroker broker;
Expand All @@ -166,8 +193,13 @@ CountDownLatchContainer latchContainer() {
}

@Bean
RetryableKafkaListener retryableKafkaListener() {
return new RetryableKafkaListener();
FirstRetryableKafkaListener firstRetryableKafkaListener() {
return new FirstRetryableKafkaListener();
}

@Bean
SecondRetryableKafkaListener secondRetryableKafkaListener() {
return new SecondRetryableKafkaListener();
}

@Bean
Expand All @@ -187,7 +219,7 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
props.setIdlePartitionEventInterval(100L);
factory.setConsumerFactory(consumerFactory);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(cr, ex) -> latchContainer.countDownLatch2.countDown(),
(cr, ex) -> latchContainer.countDownLatchBasic.countDown(),
new FixedBackOff(0, 2));
factory.setCommonErrorHandler(errorHandler);
factory.setConcurrency(1);
Expand Down Expand Up @@ -238,11 +270,6 @@ public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
TaskScheduler sched() {
return new ThreadPoolTaskScheduler();
}

@Bean
RetryTopicComponentFactory componentFactory() {
return spy(new RetryTopicComponentFactory());
Expand Down