Skip to content

Commit ae7a406

Browse files
Addressing PR review
1 parent 9511818 commit ae7a406

File tree

3 files changed

+22
-105
lines changed

3 files changed

+22
-105
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandlerParser.java

-81
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.StringReader;
2121
import java.lang.reflect.AnnotatedElement;
2222
import java.lang.reflect.Method;
23+
import java.lang.reflect.Modifier;
2324
import java.nio.ByteBuffer;
2425
import java.nio.charset.Charset;
2526
import java.nio.charset.StandardCharsets;
@@ -33,7 +34,6 @@
3334
import java.util.Locale;
3435
import java.util.Map;
3536
import java.util.Objects;
36-
import java.util.Optional;
3737
import java.util.Properties;
3838
import java.util.Set;
3939
import java.util.concurrent.ConcurrentHashMap;
@@ -149,7 +149,6 @@
149149
* @author Sanghyeok An
150150
* @author Soby Chacko
151151
* @author Omer Celik
152-
* @author Sanghyeok An
153152
*
154153
* @see KafkaListener
155154
* @see KafkaListenerErrorHandler
@@ -159,7 +158,6 @@
159158
* @see KafkaListenerEndpointRegistry
160159
* @see org.springframework.kafka.config.KafkaListenerEndpoint
161160
* @see MethodKafkaListenerEndpoint
162-
* @see KafkaHandlerParser
163161
*/
164162
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
165163
implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton {
@@ -216,8 +214,6 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
216214

217215
private final Lock globalLock = new ReentrantLock();
218216

219-
private final KafkaHandlerParser kafkaHandlerParser = new KafkaHandlerParser();
220-
221217
@Override
222218
public int getOrder() {
223219
return LOWEST_PRECEDENCE;
@@ -413,16 +409,23 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
413409
+ beanName + "': " + annotatedMethods);
414410
}
415411
if (hasClassLevelListeners) {
416-
Optional<Method> methodWithoutAnnotation = this.kafkaHandlerParser.parseSingleHandlerMethod(targetClass);
417-
if (methodWithoutAnnotation.isPresent() && classLevelListeners.size() == 1) {
412+
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
413+
(ReflectionUtils.MethodFilter) method ->
414+
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
415+
Set<Method> publicMethods = MethodIntrospector.selectMethods(targetClass,
416+
(ReflectionUtils.MethodFilter) method ->
417+
Modifier.isPublic(method.getModifiers()));
418+
419+
if (methodsWithHandler.isEmpty() && publicMethods.size() == 1 && !hasMethodLevelListeners) {
418420
// Case when target class has class-level @KafkaListener annotation and
419421
// has only single public method without @KafkaHandler.
420-
for (KafkaListener kafkaListener : classLevelListeners) {
421-
processKafkaListener(kafkaListener, methodWithoutAnnotation.get(), bean, beanName);
422+
// See, GH-3807 for more details.
423+
Method method = publicMethods.iterator().next();
424+
for (KafkaListener classLevelListener : classLevelListeners) {
425+
processKafkaListener(classLevelListener, method, bean, beanName);
422426
}
423427
}
424428
else {
425-
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
426429
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
427430
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
428431
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/KafkaListenerTests.java

+9-14
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@
3131
import org.springframework.kafka.annotation.KafkaListenerTests.Config.ClassLevelListenerWithSinglePublicMethodAndPrivateMethod;
3232
import org.springframework.kafka.annotation.KafkaListenerTests.Config.OtherClassLevelListenerWithSinglePublicMethod;
3333
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34-
import org.springframework.kafka.config.KafkaListenerConfigUtils;
3534
import org.springframework.kafka.config.KafkaListenerContainerFactory;
3635
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3736
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3837
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3938
import org.springframework.kafka.core.KafkaTemplate;
4039
import org.springframework.kafka.core.ProducerFactory;
4140
import org.springframework.kafka.test.EmbeddedKafkaBroker;
42-
import org.springframework.kafka.test.EmbeddedKafkaKraftBroker;
41+
import org.springframework.kafka.test.context.EmbeddedKafka;
4342
import org.springframework.kafka.test.utils.KafkaTestUtils;
4443
import org.springframework.stereotype.Component;
4544
import org.springframework.test.annotation.DirtiesContext;
@@ -56,6 +55,7 @@
5655

5756
@SpringJUnitConfig
5857
@DirtiesContext
58+
@EmbeddedKafka(partitions = 1, topics = "default-listener.tests")
5959
public class KafkaListenerTests {
6060

6161
private static final String TEST_TOPIC = "default-listener.tests";
@@ -118,15 +118,8 @@ public void testImplicitKafkaHandlerAnnotation() throws Exception {
118118
@EnableKafka
119119
public static class Config {
120120

121-
@Bean(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
122-
public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry() {
123-
return new KafkaListenerEndpointRegistry();
124-
}
125-
126-
@Bean
127-
public EmbeddedKafkaBroker embeddedKafka() {
128-
return new EmbeddedKafkaKraftBroker(1, 1, TEST_TOPIC);
129-
}
121+
@Autowired
122+
EmbeddedKafkaBroker broker;
130123

131124
@Bean
132125
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
@@ -143,8 +136,10 @@ public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
143136

144137
@Bean
145138
public Map<String, Object> consumerConfigs() {
146-
Map<String, Object> consumerProps =
147-
KafkaTestUtils.consumerProps("myDefaultListenerGroup", "false", embeddedKafka());
139+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
140+
this.broker.getBrokersAsString(),
141+
"myDefaultListenerGroup",
142+
"false");
148143
return consumerProps;
149144
}
150145

@@ -160,7 +155,7 @@ public ProducerFactory<Integer, String> producerFactory() {
160155

161156
@Bean
162157
public Map<String, Object> producerConfigs() {
163-
return KafkaTestUtils.producerProps(embeddedKafka());
158+
return KafkaTestUtils.producerProps(this.broker.getBrokersAsString());
164159
}
165160

166161
@Component

0 commit comments

Comments
 (0)