Skip to content

Commit 255ead3

Browse files
GH-3807: Necessity of KafkaHandler on single method class
Signed-off-by: chickenchickenlove <[email protected]>
1 parent a52facc commit 255ead3

File tree

4 files changed

+359
-5
lines changed

4 files changed

+359
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2016-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.lang.reflect.Method;
20+
import java.lang.reflect.Modifier;
21+
import java.util.Optional;
22+
import java.util.Set;
23+
24+
import org.springframework.core.MethodIntrospector;
25+
import org.springframework.core.annotation.AnnotatedElementUtils;
26+
import org.springframework.core.annotation.AnnotationUtils;
27+
import org.springframework.util.ReflectionUtils;
28+
import org.springframework.util.ReflectionUtils.MethodFilter;
29+
30+
/**
31+
* Find the appropriate handler method when the target class has a class-level {@link KafkaListener}
32+
* annotation and contains a single public method without a {@link KafkaHandler} annotation.
33+
*
34+
* @author Sanghyeok An
35+
*
36+
* @since 4.0
37+
*
38+
* @see KafkaListenerAnnotationBeanPostProcessor
39+
*/
40+
public class KafkaHandlerParser {
41+
42+
/**
43+
* Finds the appropriate handler method when the target class has a class-level {@link KafkaListener}
44+
* annotation and contains a single public method without a {@link KafkaHandler} annotation.
45+
* This method is used to determine which method should be invoked for Kafka message handling
46+
* when no explicit {@link KafkaHandler} annotations are present but the class itself is annotated with {@link KafkaListener}.
47+
*
48+
* @param targetClass the class to inspect for handler methods
49+
* @return the method to be used for kafka message handling, or {@code null} if no suitable method is found.
50+
*/
51+
public Optional<Method> parseSingleHandlerMethod(Class<?> targetClass) {
52+
53+
Set<Method> methodsWithAnnotations = MethodIntrospector.selectMethods(
54+
targetClass, (MethodFilter) method ->
55+
AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) != null ||
56+
AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) != null
57+
);
58+
59+
if (!methodsWithAnnotations.isEmpty()) {
60+
return Optional.empty();
61+
}
62+
63+
Set<Method> publicMethodsWithoutHandlerAnnotation = MethodIntrospector.selectMethods(
64+
targetClass, (ReflectionUtils.MethodFilter) method ->
65+
Modifier.isPublic(method.getModifiers()) &&
66+
AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) == null &&
67+
AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) == null
68+
);
69+
70+
if (!hasSinglePublicMethod(publicMethodsWithoutHandlerAnnotation)) {
71+
return Optional.empty();
72+
}
73+
74+
Method publicMethod = publicMethodsWithoutHandlerAnnotation.iterator().next();
75+
return Optional.of(publicMethod);
76+
}
77+
78+
private boolean hasSinglePublicMethod(Set<Method> methods) {
79+
return methods.size() == 1;
80+
}
81+
82+
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

+18-5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Locale;
3434
import java.util.Map;
3535
import java.util.Objects;
36+
import java.util.Optional;
3637
import java.util.Properties;
3738
import java.util.Set;
3839
import java.util.concurrent.ConcurrentHashMap;
@@ -148,6 +149,7 @@
148149
* @author Sanghyeok An
149150
* @author Soby Chacko
150151
* @author Omer Celik
152+
* @author Sanghyeok An
151153
*
152154
* @see KafkaListener
153155
* @see KafkaListenerErrorHandler
@@ -157,6 +159,7 @@
157159
* @see KafkaListenerEndpointRegistry
158160
* @see org.springframework.kafka.config.KafkaListenerEndpoint
159161
* @see MethodKafkaListenerEndpoint
162+
* @see KafkaHandlerParser
160163
*/
161164
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
162165
implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton {
@@ -213,6 +216,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
213216

214217
private final Lock globalLock = new ReentrantLock();
215218

219+
private final KafkaHandlerParser kafkaHandlerParser = new KafkaHandlerParser();
220+
216221
@Override
217222
public int getOrder() {
218223
return LOWEST_PRECEDENCE;
@@ -408,11 +413,19 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
408413
+ beanName + "': " + annotatedMethods);
409414
}
410415
if (hasClassLevelListeners) {
411-
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
412-
(ReflectionUtils.MethodFilter) method ->
413-
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
414-
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
415-
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
416+
Optional<Method> methodWithoutAnnotation = this.kafkaHandlerParser.parseSingleHandlerMethod(targetClass);
417+
if (methodWithoutAnnotation.isPresent() && classLevelListeners.size() == 1) {
418+
// Case when target class has class-level @KafkaListener annotation and
419+
// has only single public method without @KafkaHandler.
420+
for (KafkaListener kafkaListener : classLevelListeners) {
421+
processKafkaListener(kafkaListener, methodWithoutAnnotation.get(), bean, beanName);
422+
}
423+
}
424+
else {
425+
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
426+
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
427+
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
428+
}
416429
}
417430
}
418431
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/AliasPropertiesTests.java

+20
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.springframework.core.Ordered;
3737
import org.springframework.core.annotation.AliasFor;
3838
import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassAndMethodLevelListener;
39+
import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation;
3940
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer;
4041
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4142
import org.springframework.kafka.config.KafkaListenerConfigUtils;
@@ -58,6 +59,7 @@
5859
* @author Gary Russell
5960
* @author Artem Bilan
6061
* @author Soby Chacko
62+
* @author Sanghyeok An
6163
*
6264
* @since 2.2
6365
*
@@ -81,12 +83,16 @@ public class AliasPropertiesTests {
8183
@Autowired
8284
private ClassAndMethodLevelListener repeatable;
8385

86+
@Autowired
87+
private ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation withPublicMethod;
88+
8489
@Test
8590
public void testAliasFor() throws Exception {
8691
this.template.send("alias.tests", "foo");
8792
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
8893
assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue();
8994
assertThat(this.repeatable.latch.await(10, TimeUnit.SECONDS)).isTrue();
95+
assertThat(this.withPublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue();
9096
assertThat(this.config.kafkaListenerEndpointRegistry()).isSameAs(this.kafkaListenerEndpointRegistry);
9197
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onMethodInConfigClass").getGroupId())
9298
.isEqualTo("onMethodInConfigClass.Config.listen1");
@@ -102,6 +108,8 @@ public void testAliasFor() throws Exception {
102108
.isEqualTo("onMethodRepeatable2.RepeatableClassAndMethodLevelListener.listen1");
103109
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onClassRepeatable2").getGroupId())
104110
.isEqualTo("onClassRepeatable2.RepeatableClassAndMethodLevelListener");
111+
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onSinglePublicMethod").getGroupId())
112+
.isEqualTo("onSinglePublicMethod.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation");
105113
assertThat(Config.orderedCalledFirst.get()).isTrue();
106114
}
107115

@@ -220,6 +228,18 @@ public void listen1(String in) {
220228

221229
}
222230

231+
@Component
232+
@MyListener(id = "onSinglePublicMethod", value = "alias.tests")
233+
public static class ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation {
234+
235+
private final CountDownLatch latch = new CountDownLatch(1);
236+
237+
public void listen(String in) {
238+
this.latch.countDown();
239+
}
240+
241+
}
242+
223243
public static class OrderedEnhancer implements AnnotationEnhancer, Ordered {
224244

225245
private final AtomicBoolean orderedCalledFirst;

0 commit comments

Comments
 (0)