Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
buildscript {
ext.kotlinVersion = '1.7.20'
ext.kotlinVersion = '1.9.10'
ext.isCI = System.getenv('GITHUB_ACTION') || System.getenv('bamboo_buildKey')
repositories {
mavenCentral()
Expand Down Expand Up @@ -59,28 +59,28 @@ ext {
files(grgit.status().unstaged.modified).filter{ f -> f.name.endsWith('.java') || f.name.endsWith('.kt') }
}

assertjVersion = '3.23.1'
assertjVersion = '3.24.2'
awaitilityVersion = '4.2.0'
googleJsr305Version = '3.0.2'
hamcrestVersion = '2.2'
hibernateValidationVersion = '8.0.0.Final'
jacksonBomVersion = '2.14.3'
jaywayJsonPathVersion = '2.7.0'
hibernateValidationVersion = '8.0.1.Final'
jacksonBomVersion = '2.15.2'
jaywayJsonPathVersion = '2.8.0'
junit4Version = '4.13.2'
junitJupiterVersion = '5.9.3'
kafkaVersion = '3.3.2'
log4jVersion = '2.19.0'
junitJupiterVersion = '5.10.0'
kafkaVersion = '3.5.1'
log4jVersion = '2.20.0'
micrometerDocsVersion = "1.0.2"
micrometerVersion = '1.10.10'
micrometerTracingVersion = '1.0.9'
mockitoVersion = '4.8.1'
reactorVersion = '2022.0.10'
micrometerVersion = '1.12.0-SNAPSHOT'
micrometerTracingVersion = '1.2.0-SNAPSHOT'
mockitoVersion = '5.5.0'
reactorVersion = '2023.0.0-SNAPSHOT'
scalaVersion = '2.13'
springBootVersion = '3.0.9' // docs module
springDataVersion = '2022.0.9'
springDataVersion = '2023.1.0-SNAPSHOT'
springRetryVersion = '2.0.2'
springVersion = '6.0.11'
zookeeperVersion = '3.6.3'
springVersion = '6.1.0-SNAPSHOT'
zookeeperVersion = '3.6.4'

idPrefix = 'kafka'

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.0.12-SNAPSHOT
version=3.1.0-SNAPSHOT
org.gradle.jvmargs=-Xmx1536M -Dfile.encoding=UTF-8
org.gradle.caching=true
org.gradle.parallel=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ public void manyTests() throws Exception {
.getPropertyValue(this.registry.getListenerContainer("qux"), "containers", List.class).get(0),
"listenerConsumer.consumer"));
assertThat(
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer, "fetcher.maxPollRecords", Integer.class))
.isEqualTo(100);
KafkaTestUtils.getPropertyValue(this.listener.listen4Consumer,
"fetcher.fetchConfig.maxPollRecords", Integer.class))
.isEqualTo(100);
assertThat(this.quxGroup).hasSize(1);
assertThat(this.quxGroup.get(0)).isSameAs(manualContainer);
List<?> containers = KafkaTestUtils.getPropertyValue(manualContainer, "containers", List.class);
Expand Down Expand Up @@ -365,9 +366,11 @@ public void manyTests() throws Exception {
.isEqualTo("fiz");
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.clientId"))
.isEqualTo("clientIdViaAnnotation-0");
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.fetcher.maxPollRecords"))
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
"listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
.isEqualTo(10);
assertThat(KafkaTestUtils.getPropertyValue(fizContainer, "listenerConsumer.consumer.fetcher.minBytes"))
assertThat(KafkaTestUtils.getPropertyValue(fizContainer,
"listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
.isEqualTo(420000);

MessageListenerContainer rebalanceConcurrentContainer = registry.getListenerContainer("rebalanceListener");
Expand Down Expand Up @@ -523,9 +526,11 @@ public void testJson() throws Exception {
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
"listenerConsumer.consumer.groupId", Optional.class).get())
.isEqualTo("buz.explicitGroupId");
assertThat(KafkaTestUtils.getPropertyValue(buzContainer, "listenerConsumer.consumer.fetcher.maxPollRecords"))
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
"listenerConsumer.consumer.fetcher.fetchConfig.maxPollRecords"))
.isEqualTo(5);
assertThat(KafkaTestUtils.getPropertyValue(buzContainer, "listenerConsumer.consumer.fetcher.minBytes"))
assertThat(KafkaTestUtils.getPropertyValue(buzContainer,
"listenerConsumer.consumer.fetcher.fetchConfig.minBytes"))
.isEqualTo(123456);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand Down Expand Up @@ -675,7 +676,7 @@ public Producer producer2() {

@SuppressWarnings("rawtypes")
@Bean
public ProducerFactory pf(Producer producer1, Producer producer2) {
public ProducerFactory pf(@Qualifier("producer1") Producer producer1, @Qualifier("producer2") Producer producer2) {
return new MockProducerFactory((tx, id) -> id.equals("default") ? producer1 : producer2, "default");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2963,7 +2963,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(List.class);
ArgumentCaptor<Collection<TopicPartition>> captor = ArgumentCaptor.forClass(Collection.class);
verify(consumer).seekToBeginning(captor.capture());
assertThat(captor.getValue())
.isEqualTo(new HashSet<>(Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("foo", 4))));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,7 +64,7 @@ void record() throws Exception {
return null;
});
HandlerAdapter handler = mock(HandlerAdapter.class);
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any(), any(), any());
adapter.setHandlerMethod(handler);
Acknowledgment ack = mock(Acknowledgment.class);
adapter.onMessage(mock(ConsumerRecord.class), ack, mock(Consumer.class));
Expand All @@ -80,7 +80,7 @@ void batch() throws Exception {
return null;
});
HandlerAdapter handler = mock(HandlerAdapter.class);
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any());
willThrow(new RuntimeException("test")).given(handler).invoke(any(), any(), any());
adapter.setHandlerMethod(handler);
Acknowledgment ack = mock(Acknowledgment.class);
adapter.onMessage(Collections.emptyList(), ack, mock(Consumer.class));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2022 the original author or authors.
* Copyright 2021-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,7 +67,7 @@ void testCreateConfigurer() {
KafkaConsumerBackoffManager backoffManager = mock(KafkaConsumerBackoffManager.class);
DestinationTopicResolver resolver = mock(DestinationTopicResolver.class);
DestinationTopicProcessor processor = mock(DestinationTopicProcessor.class);
ListenerContainerFactoryConfigurer lcfc = mock(ListenerContainerFactoryConfigurer.class);
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(null, null, null);
ListenerContainerFactoryResolver lcfr = mock(ListenerContainerFactoryResolver.class);
RetryTopicNamesProviderFactory namesProviderFactory = mock(RetryTopicNamesProviderFactory.class);
BeanFactory beanFactory = mock(BeanFactory.class);
Expand Down Expand Up @@ -146,11 +146,14 @@ protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetrie
then(componentFactory).should().retryTopicConfigurer(processor, lcfc, lcfr, namesProviderFactory);

then(dlprf).should().setDeadLetterPublishingRecovererCustomizer(dlprCustomizer);
then(lcfc).should().setContainerCustomizer(listenerContainerCustomizer);
then(lcfc).should().setErrorHandlerCustomizer(errorHandlerCustomizer);
assertThat(KafkaTestUtils.getPropertyValue(lcfc, "containerCustomizer"))
.isSameAs(listenerContainerCustomizer);
assertThat(KafkaTestUtils.getPropertyValue(lcfc, "errorHandlerCustomizer"))
.isSameAs(errorHandlerCustomizer);
assertThatThrownBy(lcfc::setBlockingRetryableExceptions).isInstanceOf(IllegalStateException.class);
then(lcfc).should().setBlockingRetriesBackOff(backoff);
then(lcfc).should().setRetainStandardFatal(true);
assertThat(KafkaTestUtils.getPropertyValue(lcfc, "providedBlockingBackOff"))
.isSameAs(backoff);
assertThat(KafkaTestUtils.getPropertyValue(lcfc, "retainStandardFatal", Boolean.class)).isTrue();
then(dlprfCustomizer).should().accept(dlprf);
then(rtconfigurer).should().accept(topicConfigurer);
then(lcfcConsumer).should().accept(lcfc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.BDDMockito.willThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

Expand All @@ -44,9 +43,7 @@
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
Expand Down Expand Up @@ -76,8 +73,7 @@ class RetryTopicConfigurerTests {
@Mock
private BeanFactory beanFactory;

@Mock
private DefaultListableBeanFactory defaultListableBeanFactory;
private DefaultListableBeanFactory defaultListableBeanFactory = new DefaultListableBeanFactory();

@Mock
private RetryTopicConfigurer.EndpointProcessor endpointProcessor;
Expand Down Expand Up @@ -285,8 +281,7 @@ void shouldConfigureRetryEndpoints() {
Consumer<Collection<String>> topicsConsumer = topicsConsumerCaptor.getValue();
topicsConsumer.accept(topics);

then(defaultListableBeanFactory).should(times(2))
.registerSingleton(any(String.class), any(NewTopic.class));
assertThat(this.defaultListableBeanFactory.getBeansOfType(NewTopic.class)).hasSize(2);
}

private void assertTopicNames(String retrySuffix, DestinationTopic.Properties destinationProperties, DestinationTopicProcessor.Context context, int index) {
Expand Down Expand Up @@ -346,19 +341,15 @@ void shouldInstantiateIfNotInContainer() {

// setup
String beanName = NoOpsClass.class.getSimpleName() + "-handlerMethod";
given(defaultListableBeanFactory.getBean(beanName)).willReturn(new NoOpsClass());
willThrow(NoSuchBeanDefinitionException.class).given(defaultListableBeanFactory).getBean(NoOpsClass.class);
EndpointHandlerMethod handlerMethod =
RetryTopicConfigurer.createHandlerMethodWith(NoOpsClass.class, noOpsMethodName);

// given
Object resolvedBean = handlerMethod.resolveBean(this.defaultListableBeanFactory);

// then
then(defaultListableBeanFactory).should()
.registerBeanDefinition(eq(beanName), any(RootBeanDefinition.class));
assertThat(this.defaultListableBeanFactory.getBean(NoOpsClass.class)).isNotNull();
assertThat(NoOpsClass.class.isAssignableFrom(resolvedBean.getClass())).isTrue();

}

@LogLevels(classes = RetryTopicConfigurer.class, level = "info")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
Expand Down Expand Up @@ -92,7 +94,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@Autowired MeterRegistry meterRegistry, @Autowired EmbeddedKafkaBroker broker,
@Autowired KafkaListenerEndpointRegistry endpointRegistry, @Autowired KafkaAdmin admin,
@Autowired KafkaTemplate<Integer, String> customTemplate, @Autowired Config config)
@Autowired @Qualifier("customTemplate") KafkaTemplate<Integer, String> customTemplate,
@Autowired Config config)
throws InterruptedException, ExecutionException, TimeoutException {

template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
Expand Down Expand Up @@ -246,6 +249,7 @@ ConsumerFactory<Integer, String> consumerFactory(EmbeddedKafkaBroker broker) {
}

@Bean
@Primary
KafkaTemplate<Integer, String> template(ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
Expand Down