Skip to content

Defer Messaging annotations process #2769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -175,7 +175,7 @@ public List<Message<?>> clear() {
}

@Override
public List<Message<?>> purge(MessageSelector selector) {
public List<Message<?>> purge(@Nullable MessageSelector selector) {
if (selector == null) {
return this.clear();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,38 +19,41 @@
import java.util.List;

import org.springframework.integration.core.MessageSelector;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;

/**
* Operations available on a channel that has queuing semantics.
*
* @author Gary Russell
* @author Artem Bilan
*
* @since 3.0
*
*/
public interface QueueChannelOperations {

/**
* Remove all {@link Message Messages} from this channel.
*
* @return The messages that were removed.
*/
List<Message<?>> clear();

/**
* Remove any {@link Message Messages} that are not accepted by the provided selector.
*
* @param selector The message selector.
* @return The list of messages that were purged.
*/
List<Message<?>> purge(MessageSelector selector);
List<Message<?>> purge(@Nullable MessageSelector selector);

/**
* Obtain the current number of queued {@link Message Messages} in this channel.
* @return The current number of queued {@link Message Messages} in this channel.
*/
int getQueueSize();

/**
* Obtain the remaining capacity of this channel.
* @return The remaining capacity of this channel.
*/
int getRemainingCapacity();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,6 +70,8 @@ public abstract class AbstractSimpleMessageHandlerFactoryBean<H extends MessageH

private MessageChannel outputChannel;

private String outputChannelName;

private Integer order;

private List<Advice> adviceChain;
Expand Down Expand Up @@ -119,6 +121,15 @@ public void setOutputChannel(MessageChannel outputChannel) {
this.outputChannel = outputChannel;
}

/**
* Set the handler's output channel name.
* @param outputChannelName the output channel bean name to set.
* @since 5.1.4
*/
public void setOutputChannelName(String outputChannelName) {
this.outputChannelName = outputChannelName;
}

/**
* Set the order in which the handler will be subscribed to its channel
* (when subscribable).
Expand Down Expand Up @@ -197,9 +208,7 @@ protected final H createHandlerInternal() {
((ApplicationEventPublisherAware) this.handler)
.setApplicationEventPublisher(this.applicationEventPublisher);
}
if (this.handler instanceof MessageProducer && this.outputChannel != null) {
((MessageProducer) this.handler).setOutputChannel(this.outputChannel);
}
configureOutputChannelIfAny();
Object actualHandler = extractTarget(this.handler);
if (actualHandler == null) {
actualHandler = this.handler;
Expand Down Expand Up @@ -247,6 +256,18 @@ else if (this.logger.isDebugEnabled()) {
return this.handler;
}

private void configureOutputChannelIfAny() {
if (this.handler instanceof MessageProducer) {
MessageProducer messageProducer = (MessageProducer) this.handler;
if (this.outputChannel != null) {
messageProducer.setOutputChannel(this.outputChannel);
}
else if (this.outputChannelName != null) {
messageProducer.setOutputChannelName(this.outputChannelName);
}
}
}

protected abstract H createHandler();

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -36,6 +37,7 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
Expand Down Expand Up @@ -71,7 +73,8 @@
* @author Gary Russell
* @author Rick Hogge
*/
public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean {
public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean,
SmartInitializingSingleton {

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

Expand All @@ -81,6 +84,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean

private final Set<Class<?>> noAnnotationsCache = Collections.newSetFromMap(new ConcurrentHashMap<>(256));

private final List<Runnable> methodsToPostProcessAfterContextInitialization = new ArrayList<>();

private volatile boolean initialized;

@Override
public void setBeanFactory(BeanFactory beanFactory) {
Assert.isAssignable(ConfigurableListableBeanFactory.class, beanFactory.getClass(),
Expand Down Expand Up @@ -122,9 +129,17 @@ protected Map<Class<? extends Annotation>, MethodAnnotationPostProcessor<?>> set

public <A extends Annotation> void addMessagingAnnotationPostProcessor(Class<A> annotation,
MethodAnnotationPostProcessor<A> postProcessor) {

this.postProcessors.put(annotation, postProcessor);
}

@Override
public void afterSingletonsInstantiated() {
this.initialized = true;
this.methodsToPostProcessAfterContextInitialization.forEach(Runnable::run);
this.methodsToPostProcessAfterContextInitialization.clear();
}

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
Expand All @@ -141,38 +156,43 @@ public Object postProcessAfterInitialization(final Object bean, final String bea
return bean;
}

ReflectionUtils.doWithMethods(beanClass, method -> {
Map<Class<? extends Annotation>, List<Annotation>> annotationChains = new HashMap<>();
for (Class<? extends Annotation> annotationType :
this.postProcessors.keySet()) {
if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) {
List<Annotation> annotationChain = getAnnotationChain(method, annotationType);
if (annotationChain.size() > 0) {
annotationChains.put(annotationType, annotationChain);
}
}
}
if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method))
&& annotationChains.keySet().size() > 1) {
throw new IllegalStateException("@EndpointId on " + method.toGenericString()
+ " can only have one EIP annotation, found: " + annotationChains.keySet().size());
}
for (Entry<Class<? extends Annotation>, List<Annotation>> entry : annotationChains.entrySet()) {
Class<? extends Annotation> annotationType = entry.getKey();
List<Annotation> annotations = entry.getValue();
processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations);
}
ReflectionUtils.doWithMethods(beanClass,
method -> doWithMethod(method, bean, beanName, beanClass),
ReflectionUtils.USER_DECLARED_METHODS);

return bean;
}

if (annotationChains.size() == 0) {
this.noAnnotationsCache.add(beanClass);
private void doWithMethod(Method method, Object bean, String beanName, Class<?> beanClass) {
Map<Class<? extends Annotation>, List<Annotation>> annotationChains = new HashMap<>();
for (Class<? extends Annotation> annotationType :
this.postProcessors.keySet()) {
if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) {
List<Annotation> annotationChain = getAnnotationChain(method, annotationType);
if (annotationChain.size() > 0) {
annotationChains.put(annotationType, annotationChain);
}
}
}, ReflectionUtils.USER_DECLARED_METHODS);
}
if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method))
&& annotationChains.keySet().size() > 1) {
throw new IllegalStateException("@EndpointId on " + method.toGenericString()
+ " can only have one EIP annotation, found: " + annotationChains.keySet().size());
}
for (Entry<Class<? extends Annotation>, List<Annotation>> entry : annotationChains.entrySet()) {
Class<? extends Annotation> annotationType = entry.getKey();
List<Annotation> annotations = entry.getValue();
processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations);
}

return bean;
if (annotationChains.size() == 0) {
this.noAnnotationsCache.add(beanClass);
}
}

protected void processAnnotationTypeOnMethod(Object bean, String beanName, Method method,
Class<? extends Annotation> annotationType, List<Annotation> annotations) {

MethodAnnotationPostProcessor<?> postProcessor =
MessagingAnnotationPostProcessor.this.postProcessors.get(annotationType);
if (postProcessor != null && postProcessor.shouldCreateEndpoint(method, annotations)) {
Expand All @@ -187,36 +207,53 @@ protected void processAnnotationTypeOnMethod(Object bean, String beanName, Metho
+ "and its method: '" + method + "'", e);
}
}
Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
if (result != null && result instanceof AbstractEndpoint) {
AbstractEndpoint endpoint = (AbstractEndpoint) result;
String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup",
String.class);

if (this.initialized) {
postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
postProcessor, targetMethod);
}
else {
Method methodToPostProcess = targetMethod;
this.methodsToPostProcessAfterContextInitialization.add(() ->
postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations,
postProcessor, methodToPostProcess));
}
}
}

private void postProcessMethodAndRegisterEndpointIfAny(Object bean, String beanName, Method method,
Class<? extends Annotation> annotationType, List<Annotation> annotations,
MethodAnnotationPostProcessor<?> postProcessor, Method targetMethod) {

Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations);
if (result instanceof AbstractEndpoint) {
AbstractEndpoint endpoint = (AbstractEndpoint) result;
String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup",
String.class);
if (StringUtils.hasText(autoStartup)) {
autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup);
if (StringUtils.hasText(autoStartup)) {
autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup);
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup));
}
endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup));
}
}

String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class);
String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class);
if (StringUtils.hasText(phase)) {
phase = getBeanFactory().resolveEmbeddedValue(phase);
if (StringUtils.hasText(phase)) {
phase = getBeanFactory().resolveEmbeddedValue(phase);
if (StringUtils.hasText(phase)) {
endpoint.setPhase(Integer.parseInt(phase));
}
}

Role role = AnnotationUtils.findAnnotation(method, Role.class);
if (role != null) {
endpoint.setRole(role.value());
endpoint.setPhase(Integer.parseInt(phase));
}
}

String endpointBeanName = generateBeanName(beanName, method, annotationType);
endpoint.setBeanName(endpointBeanName);
getBeanFactory().registerSingleton(endpointBeanName, endpoint);
getBeanFactory().initializeBean(endpoint, endpointBeanName);
Role role = AnnotationUtils.findAnnotation(method, Role.class);
if (role != null) {
endpoint.setRole(role.value());
}

String endpointBeanName = generateBeanName(beanName, method, annotationType);
endpoint.setBeanName(endpointBeanName);
getBeanFactory().registerSingleton(endpointBeanName, endpoint);
getBeanFactory().initializeBean(endpoint, endpointBeanName);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,8 +84,9 @@ protected String getInputChannelAttributeName() {

@Override
protected final AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) {
BeanDefinitionBuilder handlerBuilder = this.parseHandler(element, parserContext);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(handlerBuilder, element, "output-channel");
BeanDefinitionBuilder handlerBuilder = parseHandler(element, parserContext);
IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "output-channel",
"outputChannelName");
IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "order");

Element txElement = DomUtils.getChildElementByTagName(element, "transactional");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,8 +78,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad

IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, MESSAGE_STORE_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "scheduler", "taskScheduler");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "lock-registry");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE,
"discardChannelName");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout",
Expand Down
Loading