diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aop/MessagePublishingInterceptor.java b/spring-integration-core/src/main/java/org/springframework/integration/aop/MessagePublishingInterceptor.java index 172d90ee236..eaf69b9a71d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aop/MessagePublishingInterceptor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aop/MessagePublishingInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.springframework.core.ParameterNameDiscoverer; import org.springframework.expression.Expression; import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.expression.ExpressionEvalMap; import org.springframework.integration.expression.ExpressionUtils; @@ -59,29 +60,31 @@ public class MessagePublishingInterceptor implements MethodInterceptor, BeanFact private final MessagingTemplate messagingTemplate = new MessagingTemplate(); - private volatile PublisherMetadataSource metadataSource; + private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); - private volatile DestinationResolver channelResolver; + private final PublisherMetadataSource metadataSource; - private volatile BeanFactory beanFactory; + private DestinationResolver channelResolver; - private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); + private BeanFactory beanFactory; - private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); + private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory(); - private volatile boolean messageBuilderFactorySet; + private boolean messageBuilderFactorySet; - private volatile String defaultChannelName; + private String defaultChannelName; public MessagePublishingInterceptor(PublisherMetadataSource metadataSource) { Assert.notNull(metadataSource, "metadataSource must not be null"); this.metadataSource = metadataSource; } - + /** + * @param metadataSource the {@link PublisherMetadataSource} to use. + * @deprecated since 5.2 in favor constructor argument. + */ + @Deprecated public void setPublisherMetadataSource(PublisherMetadataSource metadataSource) { - Assert.notNull(metadataSource, "metadataSource must not be null"); - this.metadataSource = metadataSource; } /** @@ -100,6 +103,9 @@ public void setChannelResolver(DestinationResolver channelResolv public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; this.messagingTemplate.setBeanFactory(beanFactory); + if (this.channelResolver == null) { + this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory); + } } protected MessageBuilderFactory getMessageBuilderFactory() { @@ -191,7 +197,6 @@ private void publishMessage(Method method, StandardEvaluationContext context) { } private Map evaluateHeaders(Method method, StandardEvaluationContext context) { - Map headerExpressionMap = this.metadataSource.getExpressionsForHeaders(method); if (headerExpressionMap != null) { return ExpressionEvalMap.from(headerExpressionMap) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aop/PublisherAnnotationAdvisor.java b/spring-integration-core/src/main/java/org/springframework/integration/aop/PublisherAnnotationAdvisor.java index c67f1c76760..8a76726d402 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aop/PublisherAnnotationAdvisor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aop/PublisherAnnotationAdvisor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,7 +36,6 @@ import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.Publisher; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.util.Assert; /** @@ -60,7 +59,8 @@ public class PublisherAnnotationAdvisor extends AbstractPointcutAdvisor implemen @SuppressWarnings("unchecked") public PublisherAnnotationAdvisor(Class... publisherAnnotationTypes) { this.publisherAnnotationTypes = new HashSet<>(Arrays.asList(publisherAnnotationTypes)); - PublisherMetadataSource metadataSource = new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes); + PublisherMetadataSource metadataSource = + new MethodAnnotationPublisherMetadataSource(this.publisherAnnotationTypes); this.interceptor = new MessagePublishingInterceptor(metadataSource); } @@ -81,7 +81,6 @@ public void setDefaultChannelName(String defaultChannelName) { @Override public void setBeanFactory(BeanFactory beanFactory) { - this.interceptor.setChannelResolver(new BeanFactoryChannelResolver(beanFactory)); this.interceptor.setBeanFactory(beanFactory); } @@ -168,6 +167,7 @@ public ClassFilter getClassFilter() { public MethodMatcher getMethodMatcher() { return this.methodMatcher; } + } @@ -197,6 +197,7 @@ public boolean matches(Method method, Class targetClass) { return (specificMethod != method && (AnnotationUtils.getAnnotation(specificMethod, this.annotationType) != null)); } + } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java index a21b89d07bc..476a458af03 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,11 @@ import java.util.concurrent.Executor; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; import org.springframework.integration.dispatcher.LoadBalancingStrategy; import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy; import org.springframework.integration.dispatcher.UnicastingDispatcher; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; @@ -103,8 +103,7 @@ public final void onInit() { + "bean is fully initialized by the framework. Do not subscribe in a @Bean definition"); super.onInit(); if (!(this.executor instanceof ErrorHandlingTaskExecutor)) { - ErrorHandler errorHandler = new MessagePublishingErrorHandler( - new BeanFactoryChannelResolver(this.getBeanFactory())); + ErrorHandler errorHandler = IntegrationContextUtils.getErrorHandler(getBeanFactory()); this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler); } UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java index 9ab3644255e..8f133f9a5bc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java @@ -64,7 +64,7 @@ public MessagePublishingErrorHandler(DestinationResolver channel } - public void setDefaultErrorChannel(MessageChannel defaultErrorChannel) { + public void setDefaultErrorChannel(@Nullable MessageChannel defaultErrorChannel) { setChannel(defaultErrorChannel); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index 4e238eaf074..bd4389208ba 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,9 +18,10 @@ import java.util.concurrent.Executor; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; import org.springframework.integration.dispatcher.BroadcastingDispatcher; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -130,21 +131,23 @@ public void setMinSubscribers(int minSubscribers) { @Override public final void onInit() { super.onInit(); + BeanFactory beanFactory = this.getBeanFactory(); + BroadcastingDispatcher dispatcherToUse = getDispatcher(); if (this.executor != null) { - Assert.state(getDispatcher().getHandlerCount() == 0, + Assert.state(dispatcherToUse.getHandlerCount() == 0, "When providing an Executor, you cannot subscribe() until the channel " + "bean is fully initialized by the framework. Do not subscribe in a @Bean definition"); if (!(this.executor instanceof ErrorHandlingTaskExecutor)) { if (this.errorHandler == null) { - this.errorHandler = new MessagePublishingErrorHandler( - new BeanFactoryChannelResolver(this.getBeanFactory())); + this.errorHandler = IntegrationContextUtils.getErrorHandler(beanFactory); } this.executor = new ErrorHandlingTaskExecutor(this.executor, this.errorHandler); } - this.dispatcher = new BroadcastingDispatcher(this.executor); - getDispatcher().setIgnoreFailures(this.ignoreFailures); - getDispatcher().setApplySequence(this.applySequence); - getDispatcher().setMinSubscribers(this.minSubscribers); + dispatcherToUse = new BroadcastingDispatcher(this.executor); + dispatcherToUse.setIgnoreFailures(this.ignoreFailures); + dispatcherToUse.setApplySequence(this.applySequence); + dispatcherToUse.setMinSubscribers(this.minSubscribers); + this.dispatcher = dispatcherToUse; } else if (this.errorHandler != null) { if (this.logger.isWarnEnabled()) { @@ -157,11 +160,11 @@ else if (this.errorHandler != null) { if (this.maxSubscribers == null) { Integer maxSubscribers = getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class); - this.setMaxSubscribers(maxSubscribers); + setMaxSubscribers(maxSubscribers); } - getDispatcher().setBeanFactory(this.getBeanFactory()); + dispatcherToUse.setBeanFactory(beanFactory); - getDispatcher().setMessageHandlingTaskDecorator(task -> { + dispatcherToUse.setMessageHandlingTaskDecorator(task -> { if (PublishSubscribeChannel.this.executorInterceptorsSize > 0) { return new MessageHandlingTask(task); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java index dcd0c5ddc9f..f93209b7e5e 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java @@ -23,8 +23,8 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.context.Lifecycle; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageSelector; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; @@ -179,14 +179,12 @@ public boolean shouldIntercept(String beanName, InterceptableChannel channel) { } private MessageChannel getChannel() { - if (this.channelName != null) { - synchronized (this) { - if (this.channelName != null) { - this.channel = new BeanFactoryChannelResolver(this.beanFactory) - .resolveDestination(this.channelName); - this.channelName = null; - } - } + String channelNameToUse = this.channelName; + if (channelNameToUse != null) { + this.channel = + IntegrationContextUtils.getChannelResolver(this.beanFactory) + .resolveDestination(channelNameToUse); + this.channelName = null; } return this.channel; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java index 7577482f2e9..31ad75e966c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/DefaultConfiguringBeanFactoryPostProcessor.java @@ -63,6 +63,7 @@ import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.NullAwarePayloadArgumentResolver; import org.springframework.integration.support.SmartLifecycleRoleController; +import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.converter.ConfigurableCompositeMessageConverter; import org.springframework.integration.support.converter.DefaultDatatypeChannelMessageConverter; import org.springframework.integration.support.json.JacksonPresent; @@ -110,6 +111,8 @@ public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) this.beanFactory = beanFactory; this.registry = (BeanDefinitionRegistry) beanFactory; + registerBeanFactoryChannelResolver(); + registerMessagePublishingErrorHandler(); registerNullChannel(); registerErrorChannel(); registerIntegrationEvaluationContext(); @@ -145,6 +148,21 @@ public void afterSingletonsInstantiated() { } } + private void registerBeanFactoryChannelResolver() { + if (!this.beanFactory.containsBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME)) { + this.registry.registerBeanDefinition(IntegrationContextUtils.CHANNEL_RESOLVER_BEAN_NAME, + new RootBeanDefinition(BeanFactoryChannelResolver.class)); + } + } + + private void registerMessagePublishingErrorHandler() { + if (!this.beanFactory.containsBeanDefinition( + IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)) { + this.registry.registerBeanDefinition(IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME, + new RootBeanDefinition(MessagePublishingErrorHandler.class)); + } + } + /** * Register a null channel in the application context. * The bean name is defined by the constant {@link IntegrationContextUtils#NULL_CHANNEL_BEAN_NAME}. @@ -282,7 +300,8 @@ private void registerTaskScheduler() { .getExpressionFor(IntegrationProperties.TASK_SCHEDULER_POOL_SIZE)) .addPropertyValue("threadNamePrefix", "task-scheduler-") .addPropertyValue("rejectedExecutionHandler", new CallerRunsPolicy()) - .addPropertyValue("errorHandler", new RootBeanDefinition(MessagePublishingErrorHandler.class)) + .addPropertyReference("errorHandler", + IntegrationContextUtils.MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME) .getBeanDefinition(); this.registry.registerBeanDefinition(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, scheduler); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 7868ec550a5..bea31533aea 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -51,6 +51,7 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.config.IntegrationConfigUtils; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.Orderable; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.AbstractPollingEndpoint; @@ -64,7 +65,6 @@ import org.springframework.integration.handler.advice.HandleMessageAdvice; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.scheduling.PollerMetadata; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.util.MessagingAnnotationUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; @@ -120,9 +120,10 @@ public AbstractMethodAnnotationPostProcessor(ConfigurableListableBeanFactory bea this.conversionService = this.beanFactory.getConversionService() != null ? this.beanFactory.getConversionService() : DefaultConversionService.getSharedInstance(); - this.channelResolver = new BeanFactoryChannelResolver(beanFactory); - this.annotationType = (Class) GenericTypeResolver.resolveTypeArgument(this.getClass(), - MethodAnnotationPostProcessor.class); + this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory); + this.annotationType = + (Class) GenericTypeResolver.resolveTypeArgument(this.getClass(), + MethodAnnotationPostProcessor.class); Disposables disposablesBean = null; try { disposablesBean = beanFactory.getBean(Disposables.class); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PublishingInterceptorParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PublishingInterceptorParser.java index b72b490ce97..fdc786bfaf2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PublishingInterceptorParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/PublishingInterceptorParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,13 @@ import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.ManagedMap; import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aop.MessagePublishingInterceptor; import org.springframework.integration.aop.MethodNameMappingPublisherMetadataSource; import org.springframework.integration.context.IntegrationContextUtils; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; @@ -53,36 +52,31 @@ protected AbstractBeanDefinition parseInternal(Element element, ParserContext pa MessagePublishingInterceptor.class); BeanDefinitionBuilder spelSourceBuilder = BeanDefinitionBuilder .genericBeanDefinition(MethodNameMappingPublisherMetadataSource.class); - Map> mappings = this.getMappings(element, element.getAttribute("default-channel"), parserContext); + Map> mappings = this + .getMappings(element, element.getAttribute("default-channel"), parserContext); spelSourceBuilder.addConstructorArgValue(mappings.get("payload")); if (mappings.get("headers") != null) { spelSourceBuilder.addPropertyValue("headerExpressionMap", mappings.get("headers")); } - BeanDefinitionBuilder chResolverBuilder = BeanDefinitionBuilder.genericBeanDefinition( - BeanFactoryChannelResolver.class); - if (mappings.get("channels") != null) { spelSourceBuilder.addPropertyValue("channelMap", mappings.get("channels")); } - String chResolverName = - BeanDefinitionReaderUtils.registerWithGeneratedName(chResolverBuilder.getBeanDefinition(), parserContext.getRegistry()); String defaultChannel = StringUtils.hasText(element.getAttribute("default-channel")) ? element.getAttribute("default-channel") : IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME; rootBuilder.addConstructorArgValue(spelSourceBuilder.getBeanDefinition()); - rootBuilder.addPropertyReference("channelResolver", chResolverName); rootBuilder.addPropertyValue("defaultChannelName", defaultChannel); return rootBuilder.getBeanDefinition(); } private Map> getMappings(Element element, String defaultChannel, ParserContext parserContext) { List mappings = DomUtils.getChildElementsByTagName(element, "method"); - Map> interceptorMappings = new HashMap>(); - Map payloadExpressionMap = new HashMap(); - Map> headersExpressionMap = new HashMap>(); - Map channelMap = new HashMap(); - ManagedMap resolvableChannelMap = new ManagedMap(); - if (mappings != null && mappings.size() > 0) { + Map> interceptorMappings = new HashMap<>(); + Map payloadExpressionMap = new HashMap<>(); + Map> headersExpressionMap = new HashMap<>(); + Map channelMap = new HashMap<>(); + ManagedMap resolvableChannelMap = new ManagedMap<>(); + if (!CollectionUtils.isEmpty(mappings)) { for (Element mapping : mappings) { // set payloadMap String methodPattern = StringUtils.hasText(mapping.getAttribute("pattern")) ? @@ -93,12 +87,13 @@ protected AbstractBeanDefinition parseInternal(Element element, ParserContext pa // set headersMap List headerElements = DomUtils.getChildElementsByTagName(mapping, "header"); - Map headerExpressions = new HashMap(); + Map headerExpressions = new HashMap<>(); for (Element headerElement : headerElements) { String name = headerElement.getAttribute("name"); if (!StringUtils.hasText(name)) { - parserContext.getReaderContext().error("the 'name' attribute is required on the
element", - parserContext.extractSource(headerElement)); + parserContext.getReaderContext() + .error("the 'name' attribute is required on the
element", + parserContext.extractSource(headerElement)); continue; } String value = headerElement.getAttribute("value"); @@ -106,8 +101,9 @@ protected AbstractBeanDefinition parseInternal(Element element, ParserContext pa boolean hasValue = StringUtils.hasText(value); boolean hasExpression = StringUtils.hasText(expression); if (hasValue == hasExpression) { - parserContext.getReaderContext().error("exactly one of 'value' or 'expression' is required on the
element", - parserContext.extractSource(headerElement)); + parserContext.getReaderContext() + .error("exactly one of 'value' or 'expression' is required on the
element", + parserContext.extractSource(headerElement)); continue; } if (hasValue) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index 3b8c8b93437..19c0bbcbe25 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -23,11 +23,15 @@ import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.expression.spel.support.SimpleEvaluationContext; import org.springframework.expression.spel.support.StandardEvaluationContext; +import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.config.IntegrationConfigUtils; import org.springframework.integration.metadata.MetadataStore; +import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.core.DestinationResolver; import org.springframework.scheduling.TaskScheduler; import org.springframework.util.Assert; +import org.springframework.util.ErrorHandler; /** * Utility methods for accessing common integration components from the BeanFactory. @@ -44,10 +48,10 @@ public abstract class IntegrationContextUtils { public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"; - public static final String ERROR_LOGGER_BEAN_NAME = "_org.springframework.integration.errorLogger"; - public static final String NULL_CHANNEL_BEAN_NAME = "nullChannel"; + public static final String ERROR_LOGGER_BEAN_NAME = "_org.springframework.integration.errorLogger"; + public static final String METADATA_STORE_BEAN_NAME = "metadataStore"; public static final String CONVERTER_REGISTRAR_BEAN_NAME = "converterRegistrar"; @@ -107,6 +111,10 @@ public abstract class IntegrationContextUtils { public static final String LIST_MESSAGE_HANDLER_FACTORY_BEAN_NAME = "integrationListMessageHandlerMethodFactory"; + public static final String CHANNEL_RESOLVER_BEAN_NAME = "integrationChannelResolver"; + + public static final String MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME = "integrationMessagePublishingErrorHandler"; + /** * @param beanFactory BeanFactory for lookup, must not be null. * @return The {@link MetadataStore} bean whose name is "metadataStore". @@ -212,4 +220,39 @@ public static Properties getIntegrationProperties(BeanFactory beanFactory) { return properties; } + /** + * Obtain a {@link DestinationResolver} registered with the + * {@value #CHANNEL_RESOLVER_BEAN_NAME} bean name. + * @param beanFactory BeanFactory for lookup, must not be null. + * @return the instance of {@link DestinationResolver} bean whose name is + * {@value #CHANNEL_RESOLVER_BEAN_NAME}. + * @since 5.2 + */ + @SuppressWarnings("unchecked") + public static DestinationResolver getChannelResolver(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "'beanFactory' must not be null"); + if (!beanFactory.containsBean(CHANNEL_RESOLVER_BEAN_NAME)) { + return new BeanFactoryChannelResolver(beanFactory); + } + return beanFactory.getBean(CHANNEL_RESOLVER_BEAN_NAME, DestinationResolver.class); + } + + /** + * Obtain an {@link ErrorHandler} registered with the + * {@value #MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME} bean name. + * By default resolves to the {@link org.springframework.integration.channel.MessagePublishingErrorHandler} + * with the {@value #CHANNEL_RESOLVER_BEAN_NAME} {@link DestinationResolver} bean. + * @param beanFactory BeanFactory for lookup, must not be null. + * @return the instance of {@link ErrorHandler} bean whose name is + * {@value #MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME}. + * @since 5.2 + */ + public static ErrorHandler getErrorHandler(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "'beanFactory' must not be null"); + if (!beanFactory.containsBean(MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME)) { + return new MessagePublishingErrorHandler(getChannelResolver(beanFactory)); + } + return beanFactory.getBean(MESSAGE_PUBLISHING_ERROR_HANDLER_BEAN_NAME, ErrorHandler.class); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java index 8a21d37b7bc..a7558904915 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java @@ -37,7 +37,6 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.context.NamedComponent; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.lang.Nullable; @@ -228,7 +227,7 @@ protected TaskScheduler getTaskScheduler() { protected DestinationResolver getChannelResolver() { if (this.channelResolver == null) { - this.channelResolver = new BeanFactoryChannelResolver(this.beanFactory); + this.channelResolver = IntegrationContextUtils.getChannelResolver(this.beanFactory); } return this.channelResolver; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java index a8a38249933..2ac73d4d99d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/ErrorMessagePublisher.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2018 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,6 @@ import org.springframework.integration.support.DefaultErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageStrategy; import org.springframework.integration.support.ErrorMessageUtils; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -97,7 +96,7 @@ public final void setChannelResolver(DestinationResolver channel public void setBeanFactory(BeanFactory beanFactory) { Assert.notNull(beanFactory, "beanFactory must not be null"); if (this.channelResolver == null) { - this.channelResolver = new BeanFactoryChannelResolver(beanFactory); + this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory); } } @@ -183,8 +182,8 @@ protected Throwable determinePayload(Throwable throwable, AttributeAccessor cont else if (!(lastThrowable instanceof MessagingException)) { Message message = (Message) context.getAttribute(ErrorMessageUtils.FAILED_MESSAGE_CONTEXT_KEY); lastThrowable = message == null - ? new MessagingException(lastThrowable.getMessage(), lastThrowable) - : new MessagingException(message, lastThrowable.getMessage(), lastThrowable); + ? new MessagingException(lastThrowable.getMessage(), lastThrowable) + : new MessagingException(message, lastThrowable.getMessage(), lastThrowable); } return lastThrowable; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java b/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java index 8a374f18ee7..7478bbd8bdd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/core/MessagingTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.core.GenericMessagingTemplate; @@ -57,12 +56,12 @@ public MessagingTemplate(MessageChannel defaultChannel) { } /** - * Overridden to set the destination resolver to a {@link BeanFactoryChannelResolver}. + * Overridden to set the destination resolver to a {@code integrationChannelResolver} bean. */ @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; //NOSONAR - non-sync is ok here - super.setDestinationResolver(new BeanFactoryChannelResolver(beanFactory)); + setDestinationResolver(IntegrationContextUtils.getChannelResolver(beanFactory)); } @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index 32118ee635b..d301c4adbd4 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -33,8 +33,8 @@ import org.springframework.beans.factory.BeanInitializationException; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.support.MessagingExceptionWrapper; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.transaction.IntegrationResourceHolder; import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization; import org.springframework.integration.transaction.PassThroughTransactionSynchronizationFactory; @@ -105,7 +105,7 @@ public void setTaskExecutor(Executor taskExecutor) { this.taskExecutor = (taskExecutor != null ? taskExecutor : new SyncTaskExecutor()); this.syncExecutor = this.taskExecutor instanceof SyncTaskExecutor || (this.taskExecutor instanceof ErrorHandlingTaskExecutor - && ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor()); + && ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor()); } protected Executor getTaskExecutor() { @@ -196,9 +196,7 @@ protected void onInit() { if (this.taskExecutor != null) { if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { if (this.errorHandler == null) { - Assert.notNull(this.getBeanFactory(), "BeanFactory is required"); - this.errorHandler = new MessagePublishingErrorHandler( - new BeanFactoryChannelResolver(getBeanFactory())); + this.errorHandler = IntegrationContextUtils.getErrorHandler(getBeanFactory()); this.errorHandlerIsDefault = true; } this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler); @@ -316,9 +314,10 @@ private Flux> createFluxGenerator() { .take(this.maxMessagesPerPoll) .subscribeOn(Schedulers.fromExecutor(this.taskExecutor)) .doOnComplete(() -> - triggerContext.update(triggerContext.lastScheduledExecutionTime(), - triggerContext.lastActualExecutionTime(), - new Date()) + triggerContext + .update(triggerContext.lastScheduledExecutionTime(), + triggerContext.lastActualExecutionTime(), + new Date()) )), 1) .repeat(this::isRunning) .doOnSubscribe(subscription -> this.subscription = subscription); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java index ce24f5a7024..a4e34686957 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/ReactiveStreamsConsumer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,11 +24,10 @@ import org.springframework.context.Lifecycle; import org.springframework.integration.channel.MessageChannelReactiveUtils; -import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.router.MessageRouter; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -124,8 +123,7 @@ public MessageHandler getHandler() { protected void onInit() { super.onInit(); if (this.errorHandler == null) { - Assert.notNull(getBeanFactory(), "BeanFactory is required"); - this.errorHandler = new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory())); + this.errorHandler = IntegrationContextUtils.getErrorHandler(getBeanFactory()); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java index d25915ac174..0b0c8553761 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java @@ -55,11 +55,11 @@ import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.GatewayHeader; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.support.DefaultMessageBuilderFactory; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.management.TrackableComponent; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -384,12 +384,12 @@ protected void onInit() { } BeanFactory beanFactory = this.getBeanFactory(); if (this.channelResolver == null && beanFactory != null) { - this.channelResolver = new BeanFactoryChannelResolver(beanFactory); + this.channelResolver = IntegrationContextUtils.getChannelResolver(beanFactory); } - Class proxyInterface = this.determineServiceInterface(); + Class proxyInterface = determineServiceInterface(); Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(proxyInterface); for (Method method : methods) { - MethodInvocationGateway gateway = this.createGatewayForMethod(method); + MethodInvocationGateway gateway = createGatewayForMethod(method); this.gatewayMap.put(method, gateway); } this.serviceProxy = new ProxyFactory(proxyInterface, this) @@ -451,7 +451,8 @@ else if (CompletableFuture.class.equals(returnType)) { // exact } else if (Future.class.isAssignableFrom(returnType)) { if (logger.isDebugEnabled()) { - logger.debug("AsyncTaskExecutor submit*() return types are incompatible with the method return type; " + logger.debug("AsyncTaskExecutor submit*() return types are incompatible with the method return " + + "type; " + "running on calling thread; the downstream flow must return the required Future: " + returnType.getSimpleName()); } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java index 72742c82391..bab209596c7 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/channel/SubscribableRedisChannel.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executor; +import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.SmartLifecycle; @@ -32,10 +33,9 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.integration.MessageDispatchingException; import org.springframework.integration.channel.AbstractMessageChannel; -import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationProperties; import org.springframework.integration.dispatcher.BroadcastingDispatcher; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.converter.SimpleMessageConverter; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.messaging.Message; @@ -143,13 +143,13 @@ public void onInit() { if (this.messageConverter == null) { this.messageConverter = new SimpleMessageConverter(); } + BeanFactory beanFactory = getBeanFactory(); if (this.messageConverter instanceof BeanFactoryAware) { - ((BeanFactoryAware) this.messageConverter).setBeanFactory(this.getBeanFactory()); + ((BeanFactoryAware) this.messageConverter).setBeanFactory(beanFactory); } this.container.setConnectionFactory(this.connectionFactory); if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { - ErrorHandler errorHandler = new MessagePublishingErrorHandler( - new BeanFactoryChannelResolver(this.getBeanFactory())); + ErrorHandler errorHandler = IntegrationContextUtils.getErrorHandler(beanFactory); this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler); } this.container.setTaskExecutor(this.taskExecutor); @@ -158,7 +158,7 @@ public void onInit() { adapter.afterPropertiesSet(); this.container.addMessageListener(adapter, new ChannelTopic(this.topicName)); this.container.afterPropertiesSet(); - this.dispatcher.setBeanFactory(this.getBeanFactory()); + this.dispatcher.setBeanFactory(beanFactory); this.initialized = true; } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java index 10d5bdebdb7..9bd103ba390 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueInboundGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2018 the original author or authors. + * Copyright 2014-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,7 +31,6 @@ import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.redis.event.RedisExceptionEvent; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.jmx.export.annotation.ManagedMetric; @@ -152,8 +151,8 @@ protected void onInit() { + this.getComponentType()); } if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) { - MessagePublishingErrorHandler errorHandler = - new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory())); + MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler(); + errorHandler.setBeanFactory(getBeanFactory()); errorHandler.setDefaultErrorChannel(getErrorChannel()); this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler); } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java index 6619e30373e..53550eec6a2 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/inbound/RedisQueueMessageDrivenEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2017 the original author or authors. + * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import org.springframework.beans.factory.BeanFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.core.task.SimpleAsyncTaskExecutor; @@ -30,9 +31,9 @@ import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.redis.event.RedisExceptionEvent; -import org.springframework.integration.support.channel.BeanFactoryChannelResolver; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.integration.util.ErrorHandlingTaskExecutor; import org.springframework.jmx.export.annotation.ManagedMetric; @@ -169,13 +170,14 @@ protected void onInit() { Assert.notNull(this.serializer, "'serializer' has to be provided where 'expectMessage == true'."); } if (this.taskExecutor == null) { - String beanName = this.getComponentName(); + String beanName = getComponentName(); this.taskExecutor = new SimpleAsyncTaskExecutor((beanName == null ? "" : beanName + "-") - + this.getComponentType()); + + getComponentType()); } - if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && this.getBeanFactory() != null) { + BeanFactory beanFactory = getBeanFactory(); + if (!(this.taskExecutor instanceof ErrorHandlingTaskExecutor) && beanFactory != null) { MessagePublishingErrorHandler errorHandler = - new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(this.getBeanFactory())); + new MessagePublishingErrorHandler(IntegrationContextUtils.getChannelResolver(beanFactory)); errorHandler.setDefaultErrorChannel(this.errorChannel); this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, errorHandler); }