Skip to content

INT-2277: Register ChResolver & ErrHandler beans #2805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@
import org.springframework.core.ParameterNameDiscoverer;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.expression.ExpressionEvalMap;
import org.springframework.integration.expression.ExpressionUtils;
Expand Down Expand Up @@ -59,29 +60,31 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact

private final MessagingTemplate messagingTemplate = new MessagingTemplate();

private volatile PublisherMetadataSource metadataSource;
private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();

private volatile DestinationResolver<MessageChannel> channelResolver;
private final PublisherMetadataSource metadataSource;

private volatile BeanFactory beanFactory;
private DestinationResolver<MessageChannel> channelResolver;

private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer();
private BeanFactory beanFactory;

private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();

private volatile boolean messageBuilderFactorySet;
private boolean messageBuilderFactorySet;

private volatile String defaultChannelName;
private String defaultChannelName;

public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) {
Assert.notNull(metadataSource, "metadataSource must not be null");
this.metadataSource = metadataSource;
}


/**
* @param metadataSource the {@link PublisherMetadataSource} to use.
* @deprecated since 5.2 in favor constructor argument.
*/
@Deprecated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See INT-954, not sure if it's still relevant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not sure what is that tooling, but I guess we have already much more components with similar ctor-based initialization and no one has complained so far.

Therefore I believe we are safe to remove it now.
Well, at least deprecate since it is a mandatory ctor option anyway, but someone may still would like to call a setter. Although that is already not so good to mutate this component at runtime.

public void setPublisherMetadataSource(PublisherMetadataSource metadataSource) {
Assert.notNull(metadataSource, "metadataSource must not be null");
this.metadataSource = metadataSource;
}

/**
Expand All @@ -100,6 +103,9 @@ public void setChannelResolver(DestinationResolver<MessageChannel> channelResolv
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
this.messagingTemplate.setBeanFactory(beanFactory);
if (this.channelResolver == null) {
this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory);
}
}

protected MessageBuilderFactory getMessageBuilderFactory() {
Expand Down Expand Up @@ -191,7 +197,6 @@ private void publishMessage(Method method, StandardEvaluationContext context) {
}

private Map<String, Object> evaluateHeaders(Method method, StandardEvaluationContext context) {

Map<String, Expression> headerExpressionMap = this.metadataSource.getExpressionsForHeaders(method);
if (headerExpressionMap != null) {
return ExpressionEvalMap.from(headerExpressionMap)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,6 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.annotation.Publisher;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.util.Assert;

/**
Expand All @@ -60,7 +59,8 @@ public class PublisherAnnotationAdvisor extends AbstractPointcutAdvisor implemen
@SuppressWarnings("unchecked")
public PublisherAnnotationAdvisor(Class<? extends Annotation>... publisherAnnotationTypes) {
this.publisherAnnotationTypes = new HashSet<>(Arrays.asList(publisherAnnotationTypes));
PublisherMetadataSource metadataSource = new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes);
PublisherMetadataSource metadataSource =
new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes);
this.interceptor = new MessagePublishingInterceptor(metadataSource);
}

Expand All @@ -81,7 +81,6 @@ public void setDefaultChannelName(String defaultChannelName) {

@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.interceptor.setChannelResolver(new BeanFactoryChannelResolver(beanFactory));
this.interceptor.setBeanFactory(beanFactory);
}

Expand Down Expand Up @@ -168,6 +167,7 @@ public ClassFilter getClassFilter() {
public MethodMatcher getMethodMatcher() {
return this.methodMatcher;
}

}


Expand Down Expand Up @@ -197,6 +197,7 @@ public boolean matches(Method method, Class targetClass) {
return (specificMethod != method &&
(AnnotationUtils.getAnnotation(specificMethod, this.annotationType) != null));
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,11 +18,11 @@

import java.util.concurrent.Executor;

import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
import org.springframework.integration.dispatcher.UnicastingDispatcher;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
Expand Down Expand Up @@ -103,8 +103,7 @@ public final void onInit() {
+ "bean is fully initialized by the framework. Do not subscribe in a @Bean definition");
super.onInit();
if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
ErrorHandler errorHandler = new MessagePublishingErrorHandler(
new BeanFactoryChannelResolver(this.getBeanFactory()));
ErrorHandler errorHandler = IntegrationContextUtils.getErrorHandler(getBeanFactory());
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
}
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public MessagePublishingErrorHandler(DestinationResolver<MessageChannel> channel
}


public void setDefaultErrorChannel(MessageChannel defaultErrorChannel) {
public void setDefaultErrorChannel(@Nullable MessageChannel defaultErrorChannel) {
setChannel(defaultErrorChannel);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,9 +18,10 @@

import java.util.concurrent.Executor;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.IntegrationProperties;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -130,21 +131,23 @@ public void setMinSubscribers(int minSubscribers) {
@Override
public final void onInit() {
super.onInit();
BeanFactory beanFactory = this.getBeanFactory();
BroadcastingDispatcher dispatcherToUse = getDispatcher();
if (this.executor != null) {
Assert.state(getDispatcher().getHandlerCount() == 0,
Assert.state(dispatcherToUse.getHandlerCount() == 0,
"When providing an Executor, you cannot subscribe() until the channel "
+ "bean is fully initialized by the framework. Do not subscribe in a @Bean definition");
if (!(this.executor instanceof ErrorHandlingTaskExecutor)) {
if (this.errorHandler == null) {
this.errorHandler = new MessagePublishingErrorHandler(
new BeanFactoryChannelResolver(this.getBeanFactory()));
this.errorHandler = IntegrationContextUtils.getErrorHandler(beanFactory);
}
this.executor = new ErrorHandlingTaskExecutor(this.executor, this.errorHandler);
}
this.dispatcher = new BroadcastingDispatcher(this.executor);
getDispatcher().setIgnoreFailures(this.ignoreFailures);
getDispatcher().setApplySequence(this.applySequence);
getDispatcher().setMinSubscribers(this.minSubscribers);
dispatcherToUse = new BroadcastingDispatcher(this.executor);
dispatcherToUse.setIgnoreFailures(this.ignoreFailures);
dispatcherToUse.setApplySequence(this.applySequence);
dispatcherToUse.setMinSubscribers(this.minSubscribers);
this.dispatcher = dispatcherToUse;
}
else if (this.errorHandler != null) {
if (this.logger.isWarnEnabled()) {
Expand All @@ -157,11 +160,11 @@ else if (this.errorHandler != null) {
if (this.maxSubscribers == null) {
Integer maxSubscribers =
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
this.setMaxSubscribers(maxSubscribers);
setMaxSubscribers(maxSubscribers);
}
getDispatcher().setBeanFactory(this.getBeanFactory());
dispatcherToUse.setBeanFactory(beanFactory);

getDispatcher().setMessageHandlingTaskDecorator(task -> {
dispatcherToUse.setMessageHandlingTaskDecorator(task -> {
if (PublishSubscribeChannel.this.executorInterceptorsSize > 0) {
return new MessageHandlingTask(task);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.context.Lifecycle;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
Expand Down Expand Up @@ -179,14 +179,12 @@ public boolean shouldIntercept(String beanName, InterceptableChannel channel) {
}

private MessageChannel getChannel() {
if (this.channelName != null) {
synchronized (this) {
if (this.channelName != null) {
this.channel = new BeanFactoryChannelResolver(this.beanFactory)
.resolveDestination(this.channelName);
this.channelName = null;
}
}
String channelNameToUse = this.channelName;
if (channelNameToUse != null) {
this.channel =
IntegrationContextUtils.getChannelResolver(this.beanFactory)
.resolveDestination(channelNameToUse);
this.channelName = null;
}
return this.channel;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.NullAwarePayloadArgumentResolver;
import org.springframework.integration.support.SmartLifecycleRoleController;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter;
import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter;
import org.springframework.integration.support.json.JacksonPresent;
Expand Down Expand Up @@ -110,6 +111,8 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory)
this.beanFactory = beanFactory;
this.registry = (BeanDefinitionRegistry) beanFactory;

registerBeanFactoryChannelResolver();
registerMessagePublishingErrorHandler();
registerNullChannel();
registerErrorChannel();
registerIntegrationEvaluationContext();
Expand Down Expand Up @@ -145,6 +148,21 @@ public void afterSingletonsInstantiated() {
}
}

private void registerBeanFactoryChannelResolver() {
if (!this.beanFactory.containsBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME)) {
this.registry.registerBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME,
new RootBeanDefinition(BeanFactoryChannelResolver.class));
}
}

private void registerMessagePublishingErrorHandler() {
if (!this.beanFactory.containsBeanDefinition(
IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)) {
this.registry.registerBeanDefinition(IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME,
new RootBeanDefinition(MessagePublishingErrorHandler.class));
}
}

/**
* Register a null channel in the application context.
* The bean name is defined by the constant {@link IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME}.
Expand Down Expand Up @@ -282,7 +300,8 @@ private void registerTaskScheduler() {
.getExpressionFor(IntegrationProperties.TASK_SCHEDULER_POOL_SIZE))
.addPropertyValue("threadNamePrefix", "task-scheduler-")
.addPropertyValue("rejectedExecutionHandler", new CallerRunsPolicy())
.addPropertyValue("errorHandler", new RootBeanDefinition(MessagePublishingErrorHandler.class))
.addPropertyReference("errorHandler",
IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)
.getBeanDefinition();

this.registry.registerBeanDefinition(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, scheduler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.config.IntegrationConfigUtils;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.context.Orderable;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractPollingEndpoint;
Expand All @@ -64,7 +65,6 @@
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.channel.BeanFactoryChannelResolver;
import org.springframework.integration.util.MessagingAnnotationUtils;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -120,9 +120,10 @@ public AbstractMethodAnnotationPostProcessor(ConfigurableListableBeanFactory bea
this.conversionService = this.beanFactory.getConversionService() != null
? this.beanFactory.getConversionService()
: DefaultConversionService.getSharedInstance();
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
this.annotationType = (Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(),
MethodAnnotationPostProcessor.class);
this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory);
this.annotationType =
(Class<T>) GenericTypeResolver.resolveTypeArgument(this.getClass(),
MethodAnnotationPostProcessor.class);
Disposables disposablesBean = null;
try {
disposablesBean = beanFactory.getBean(Disposables.class);
Expand Down
Loading