Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.annotation;

import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Optional;
import java.util.Set;

import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ReflectionUtils.MethodFilter;

/**
* Find the appropriate handler method when the target class has a class-level {@link KafkaListener}
* annotation and contains a single public method without a {@link KafkaHandler} annotation.
*
* @author Sanghyeok An
*
* @since 4.0
*
* @see KafkaListenerAnnotationBeanPostProcessor
*/
public class KafkaHandlerParser {

/**
* Finds the appropriate handler method when the target class has a class-level {@link KafkaListener}
* annotation and contains a single public method without a {@link KafkaHandler} annotation.
* This method is used to determine which method should be invoked for Kafka message handling
* when no explicit {@link KafkaHandler} annotations are present but the class itself is annotated with {@link KafkaListener}.
*
* @param targetClass the class to inspect for handler methods
* @return the method to be used for kafka message handling, or {@code null} if no suitable method is found.
*/
public Optional<Method> parseSingleHandlerMethod(Class<?> targetClass) {

Set<Method> methodsWithAnnotations = MethodIntrospector.selectMethods(
targetClass, (MethodFilter) method ->
AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) != null ||
AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) != null
);

if (!methodsWithAnnotations.isEmpty()) {
return Optional.empty();
}

Set<Method> publicMethodsWithoutHandlerAnnotation = MethodIntrospector.selectMethods(
targetClass, (ReflectionUtils.MethodFilter) method ->
Modifier.isPublic(method.getModifiers()) &&
AnnotatedElementUtils.findMergedAnnotation(method, KafkaHandler.class) == null &&
AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class) == null
);

if (!hasSinglePublicMethod(publicMethodsWithoutHandlerAnnotation)) {
return Optional.empty();
}

Method publicMethod = publicMethodsWithoutHandlerAnnotation.iterator().next();
return Optional.of(publicMethod);
}

private boolean hasSinglePublicMethod(Set<Method> methods) {
return methods.size() == 1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -148,6 +149,7 @@
* @author Sanghyeok An
* @author Soby Chacko
* @author Omer Celik
* @author Sanghyeok An
*
* @see KafkaListener
* @see KafkaListenerErrorHandler
Expand All @@ -157,6 +159,7 @@
* @see KafkaListenerEndpointRegistry
* @see org.springframework.kafka.config.KafkaListenerEndpoint
* @see MethodKafkaListenerEndpoint
* @see KafkaHandlerParser
*/
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, ApplicationContextAware, SmartInitializingSingleton {
Expand Down Expand Up @@ -213,6 +216,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>

private final Lock globalLock = new ReentrantLock();

private final KafkaHandlerParser kafkaHandlerParser = new KafkaHandlerParser();

@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
Expand Down Expand Up @@ -408,11 +413,19 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
+ beanName + "': " + annotatedMethods);
}
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
Optional<Method> methodWithoutAnnotation = this.kafkaHandlerParser.parseSingleHandlerMethod(targetClass);
if (methodWithoutAnnotation.isPresent() && classLevelListeners.size() == 1) {
// Case when target class has class-level @KafkaListener annotation and
// has only single public method without @KafkaHandler.
for (KafkaListener kafkaListener : classLevelListeners) {
processKafkaListener(kafkaListener, methodWithoutAnnotation.get(), bean, beanName);
}
}
else {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass, (ReflectionUtils.MethodFilter) method -> AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
List<Method> multiMethods = new ArrayList<>(methodsWithHandler);
processMultiMethodListeners(classLevelListeners, multiMethods, targetClass, bean, beanName);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassAndMethodLevelListener;
import org.springframework.kafka.annotation.AliasPropertiesTests.Config.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
Expand All @@ -58,6 +59,7 @@
* @author Gary Russell
* @author Artem Bilan
* @author Soby Chacko
* @author Sanghyeok An
*
* @since 2.2
*
Expand All @@ -81,12 +83,16 @@ public class AliasPropertiesTests {
@Autowired
private ClassAndMethodLevelListener repeatable;

@Autowired
private ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation withPublicMethod;

@Test
public void testAliasFor() throws Exception {
this.template.send("alias.tests", "foo");
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.classLevel.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.repeatable.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.withPublicMethod.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(this.config.kafkaListenerEndpointRegistry()).isSameAs(this.kafkaListenerEndpointRegistry);
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onMethodInConfigClass").getGroupId())
.isEqualTo("onMethodInConfigClass.Config.listen1");
Expand All @@ -102,6 +108,8 @@ public void testAliasFor() throws Exception {
.isEqualTo("onMethodRepeatable2.RepeatableClassAndMethodLevelListener.listen1");
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onClassRepeatable2").getGroupId())
.isEqualTo("onClassRepeatable2.RepeatableClassAndMethodLevelListener");
assertThat(this.kafkaListenerEndpointRegistry.getListenerContainer("onSinglePublicMethod").getGroupId())
.isEqualTo("onSinglePublicMethod.ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation");
assertThat(Config.orderedCalledFirst.get()).isTrue();
}

Expand Down Expand Up @@ -220,6 +228,18 @@ public void listen1(String in) {

}

@Component
@MyListener(id = "onSinglePublicMethod", value = "alias.tests")
public static class ClassLevelListenerWithPublicMethodWithoutHandlerAnnotation {

private final CountDownLatch latch = new CountDownLatch(1);

public void listen(String in) {
this.latch.countDown();
}

}

public static class OrderedEnhancer implements AnnotationEnhancer, Ordered {

private final AtomicBoolean orderedCalledFirst;
Expand Down
Loading