From cfb6cd6bd6560e8130fccb56a0872c015705f3aa Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 27 Feb 2019 17:08:54 -0500 Subject: [PATCH 1/3] Defer Messaging annotations process The `AbstractMethodAnnotationPostProcessor` and its implementations have a `beanFactory.getBean()` call for the `@Bean` methods with Messaging annotations. This is done, actually, from the `MessagingAnnotationPostProcessor.postProcessAfterInitialization()` which might be still too early in some scenarios, like Spring Cloud Feign with its child application contexts being initialized from the `FeignClientFactoryBean`, causing a `BeanCurrentlyInCreationException` See https://stackoverflow.com/questions/54887963/beancurrentlyincreationexception-when-using-spring-integration-with-spring-cloud * Implement a `SmartInitializingSingleton` for the `MessagingAnnotationPostProcessor` and gather `Runnable` wrappers for newly introduced `postProcessMethodAndRegisterEndpointIfAny()` to be called later in the `afterSingletonsInstantiated()` when context is still in the initialization phase. All runtime-registered beans are going to be processed normally from the regular `postProcessAfterInitialization()` **Cherry-pick to 5.1.x** --- .../MessagingAnnotationPostProcessor.java | 137 +++++++++++------- .../bus/DirectChannelSubscriptionTests.java | 8 +- .../FilterAnnotationPostProcessorTests.java | 20 ++- ...tionPostProcessorChannelCreationTests.java | 18 +-- ...MessagingAnnotationPostProcessorTests.java | 67 ++++----- ...ingAnnotationsWithBeanAnnotationTests.java | 16 +- .../RouterAnnotationPostProcessorTests.java | 2 + .../SplitterAnnotationPostProcessorTests.java | 1 + 8 files changed, 151 insertions(+), 118 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java index ab03363f1c7..4bb17128aff 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -36,6 +37,7 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionBuilder; @@ -71,7 +73,8 @@ * @author Gary Russell * @author Rick Hogge */ -public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean { +public class MessagingAnnotationPostProcessor implements BeanPostProcessor, BeanFactoryAware, InitializingBean, + SmartInitializingSingleton { protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR @@ -81,6 +84,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean private final Set> noAnnotationsCache = Collections.newSetFromMap(new ConcurrentHashMap<>(256)); + private final List methodsToPostProcessAfterContextInitialization = new ArrayList<>(); + + private volatile boolean initialized; + @Override public void setBeanFactory(BeanFactory beanFactory) { Assert.isAssignable(ConfigurableListableBeanFactory.class, beanFactory.getClass(), @@ -122,9 +129,17 @@ protected Map, MethodAnnotationPostProcessor> set public void addMessagingAnnotationPostProcessor(Class annotation, MethodAnnotationPostProcessor postProcessor) { + this.postProcessors.put(annotation, postProcessor); } + @Override + public void afterSingletonsInstantiated() { + this.initialized = true; + this.methodsToPostProcessAfterContextInitialization.forEach(Runnable::run); + this.methodsToPostProcessAfterContextInitialization.clear(); + } + @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; @@ -141,38 +156,43 @@ public Object postProcessAfterInitialization(final Object bean, final String bea return bean; } - ReflectionUtils.doWithMethods(beanClass, method -> { - Map, List> annotationChains = new HashMap<>(); - for (Class annotationType : - this.postProcessors.keySet()) { - if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) { - List annotationChain = getAnnotationChain(method, annotationType); - if (annotationChain.size() > 0) { - annotationChains.put(annotationType, annotationChain); - } - } - } - if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method)) - && annotationChains.keySet().size() > 1) { - throw new IllegalStateException("@EndpointId on " + method.toGenericString() - + " can only have one EIP annotation, found: " + annotationChains.keySet().size()); - } - for (Entry, List> entry : annotationChains.entrySet()) { - Class annotationType = entry.getKey(); - List annotations = entry.getValue(); - processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations); - } + ReflectionUtils.doWithMethods(beanClass, + method -> doWithMethod(method, bean, beanName, beanClass), + ReflectionUtils.USER_DECLARED_METHODS); + + return bean; + } - if (annotationChains.size() == 0) { - this.noAnnotationsCache.add(beanClass); + private void doWithMethod(Method method, Object bean, String beanName, Class beanClass) { + Map, List> annotationChains = new HashMap<>(); + for (Class annotationType : + this.postProcessors.keySet()) { + if (AnnotatedElementUtils.isAnnotated(method, annotationType.getName())) { + List annotationChain = getAnnotationChain(method, annotationType); + if (annotationChain.size() > 0) { + annotationChains.put(annotationType, annotationChain); + } } - }, ReflectionUtils.USER_DECLARED_METHODS); + } + if (StringUtils.hasText(MessagingAnnotationUtils.endpointIdValue(method)) + && annotationChains.keySet().size() > 1) { + throw new IllegalStateException("@EndpointId on " + method.toGenericString() + + " can only have one EIP annotation, found: " + annotationChains.keySet().size()); + } + for (Entry, List> entry : annotationChains.entrySet()) { + Class annotationType = entry.getKey(); + List annotations = entry.getValue(); + processAnnotationTypeOnMethod(bean, beanName, method, annotationType, annotations); + } - return bean; + if (annotationChains.size() == 0) { + this.noAnnotationsCache.add(beanClass); + } } protected void processAnnotationTypeOnMethod(Object bean, String beanName, Method method, Class annotationType, List annotations) { + MethodAnnotationPostProcessor postProcessor = MessagingAnnotationPostProcessor.this.postProcessors.get(annotationType); if (postProcessor != null && postProcessor.shouldCreateEndpoint(method, annotations)) { @@ -187,36 +207,53 @@ protected void processAnnotationTypeOnMethod(Object bean, String beanName, Metho + "and its method: '" + method + "'", e); } } - Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations); - if (result != null && result instanceof AbstractEndpoint) { - AbstractEndpoint endpoint = (AbstractEndpoint) result; - String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup", - String.class); + + if (this.initialized) { + postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations, + postProcessor, targetMethod); + } + else { + Method methodToPostProcess = targetMethod; + this.methodsToPostProcessAfterContextInitialization.add(() -> + postProcessMethodAndRegisterEndpointIfAny(bean, beanName, method, annotationType, annotations, + postProcessor, methodToPostProcess)); + } + } + } + + private void postProcessMethodAndRegisterEndpointIfAny(Object bean, String beanName, Method method, + Class annotationType, List annotations, + MethodAnnotationPostProcessor postProcessor, Method targetMethod) { + + Object result = postProcessor.postProcess(bean, beanName, targetMethod, annotations); + if (result instanceof AbstractEndpoint) { + AbstractEndpoint endpoint = (AbstractEndpoint) result; + String autoStartup = MessagingAnnotationUtils.resolveAttribute(annotations, "autoStartup", + String.class); + if (StringUtils.hasText(autoStartup)) { + autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup); if (StringUtils.hasText(autoStartup)) { - autoStartup = getBeanFactory().resolveEmbeddedValue(autoStartup); - if (StringUtils.hasText(autoStartup)) { - endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup)); - } + endpoint.setAutoStartup(Boolean.parseBoolean(autoStartup)); } + } - String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class); + String phase = MessagingAnnotationUtils.resolveAttribute(annotations, "phase", String.class); + if (StringUtils.hasText(phase)) { + phase = getBeanFactory().resolveEmbeddedValue(phase); if (StringUtils.hasText(phase)) { - phase = getBeanFactory().resolveEmbeddedValue(phase); - if (StringUtils.hasText(phase)) { - endpoint.setPhase(Integer.parseInt(phase)); - } - } - - Role role = AnnotationUtils.findAnnotation(method, Role.class); - if (role != null) { - endpoint.setRole(role.value()); + endpoint.setPhase(Integer.parseInt(phase)); } + } - String endpointBeanName = generateBeanName(beanName, method, annotationType); - endpoint.setBeanName(endpointBeanName); - getBeanFactory().registerSingleton(endpointBeanName, endpoint); - getBeanFactory().initializeBean(endpoint, endpointBeanName); + Role role = AnnotationUtils.findAnnotation(method, Role.class); + if (role != null) { + endpoint.setRole(role.value()); } + + String endpointBeanName = generateBeanName(beanName, method, annotationType); + endpoint.setBeanName(endpointBeanName); + getBeanFactory().registerSingleton(endpointBeanName, endpoint); + getBeanFactory().initializeBean(endpoint, endpointBeanName); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/spring-integration-core/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 271fe76f2dc..b1f7a6a601d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.bus; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import org.junit.After; import org.junit.Before; @@ -81,6 +82,7 @@ public void sendAndReceiveForAnnotatedEndpoint() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(this.context.getBeanFactory()); postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); TestEndpoint endpoint = new TestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); this.context.refresh(); @@ -105,17 +107,19 @@ public Object handleRequestMessage(Message message) { this.sourceChannel.send(new GenericMessage<>("foo")); } - @Test(expected = MessagingException.class) + @Test public void exceptionThrownFromAnnotatedEndpoint() { QueueChannel errorChannel = new QueueChannel(); this.context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(this.context.getBeanFactory()); postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); FailingTestEndpoint endpoint = new FailingTestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); this.context.refresh(); - this.sourceChannel.send(new GenericMessage<>("foo")); + assertThatExceptionOfType(MessagingException.class) + .isThrownBy(() -> this.sourceChannel.send(new GenericMessage<>("foo"))); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/FilterAnnotationPostProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/FilterAnnotationPostProcessorTests.java index 60efe670d6a..86b69371a0a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/FilterAnnotationPostProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/FilterAnnotationPostProcessorTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.config.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import java.util.Collections; import java.util.List; @@ -55,10 +56,11 @@ public class FilterAnnotationPostProcessorTests { @Before public void init() { - context.registerChannel("input", inputChannel); - context.registerChannel("output", outputChannel); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + this.context.registerChannel("input", this.inputChannel); + this.context.registerChannel("output", this.outputChannel); + this.postProcessor.setBeanFactory(this.context.getBeanFactory()); + this.postProcessor.afterPropertiesSet(); + this.postProcessor.afterSingletonsInstantiated(); } @After @@ -169,16 +171,18 @@ public void filterAnnotationWithBooleanWrapperClass() { testValidFilter(new TestFilterWithBooleanWrapperClass()); } - @Test(expected = IllegalArgumentException.class) + @Test public void invalidMethodWithStringReturnType() { Object filter = new TestFilterWithStringReturnType(); - postProcessor.postProcessAfterInitialization(filter, "testFilter"); + assertThatIllegalArgumentException() + .isThrownBy(() -> this.postProcessor.postProcessAfterInitialization(filter, "testFilter")); } - @Test(expected = IllegalArgumentException.class) + @Test public void invalidMethodWithVoidReturnType() { Object filter = new TestFilterWithVoidReturnType(); - postProcessor.postProcessAfterInitialization(filter, "testFilter"); + assertThatIllegalArgumentException() + .isThrownBy(() -> postProcessor.postProcessAfterInitialization(filter, "testFilter")); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorChannelCreationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorChannelCreationTests.java index 46ee14376d3..0833042cb85 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorChannelCreationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorChannelCreationTests.java @@ -16,8 +16,7 @@ package org.springframework.integration.config.annotation; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; @@ -40,6 +39,8 @@ /** * @author Gary Russell + * @author Artem Bilan + * * @since 4.3.8 * */ @@ -57,6 +58,7 @@ public void testAutoCreateChannel() { MessagingAnnotationPostProcessor mapp = new MessagingAnnotationPostProcessor(); mapp.setBeanFactory(beanFactory); mapp.afterPropertiesSet(); + mapp.afterSingletonsInstantiated(); mapp.postProcessAfterInitialization(new Foo(), "foo"); verify(beanFactory).registerSingleton(eq("channel"), any(DirectChannel.class)); } @@ -73,14 +75,10 @@ public void testDontCreateChannelWhenChannelHasBadDefinition() { MessagingAnnotationPostProcessor mapp = new MessagingAnnotationPostProcessor(); mapp.setBeanFactory(beanFactory); mapp.afterPropertiesSet(); - try { - mapp.postProcessAfterInitialization(new Foo(), "foo"); - fail("Expected a DestinationResolutionException"); - } - catch (DestinationResolutionException e) { - assertThat(e.getMessage()) - .contains("A bean definition with name 'channel' exists, but failed to be created"); - } + mapp.afterSingletonsInstantiated(); + assertThatExceptionOfType(DestinationResolutionException.class) + .isThrownBy(() -> mapp.postProcessAfterInitialization(new Foo(), "foo")) + .withMessageContaining("A bean definition with name 'channel' exists, but failed to be created"); } public static class Foo { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index f9233659aba..61bdbaf8dc4 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.config.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -29,6 +30,7 @@ import org.junit.Test; import org.springframework.aop.framework.ProxyFactory; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; @@ -62,9 +64,7 @@ public void serviceActivatorAnnotation() { context.registerChannel("inputChannel", inputChannel); context.refresh(); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean(); postProcessor.postProcessAfterInitialization(bean, "testBean"); assertThat(context.containsBean("testBean.test.serviceActivator")).isTrue(); @@ -79,7 +79,7 @@ public void serviceActivatorInApplicationContext() { "serviceActivatorAnnotationPostProcessorTests.xml", this.getClass()); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); - inputChannel.send(new GenericMessage("world")); + inputChannel.send(new GenericMessage<>("world")); Message reply = outputChannel.receive(0); assertThat(reply.getPayload()).isEqualTo("hello world"); context.close(); @@ -87,13 +87,12 @@ public void serviceActivatorInApplicationContext() { @Test public void testSimpleHandler() { - AbstractApplicationContext context = new ClassPathXmlApplicationContext("simpleAnnotatedEndpointTests.xml", - this - .getClass()); + AbstractApplicationContext context = + new ClassPathXmlApplicationContext("simpleAnnotatedEndpointTests.xml", getClass()); context.start(); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); - GenericMessage messageToSend = new GenericMessage("world"); + GenericMessage messageToSend = new GenericMessage<>("world"); inputChannel.send(messageToSend); Message message = outputChannel.receive(1000); assertThat(message.getPayload()).isEqualTo("hello world"); @@ -136,27 +135,26 @@ public void typeConvertingHandler() { public void outboundOnlyServiceActivator() throws InterruptedException { TestApplicationContext context = TestUtils.createTestApplicationContext(); context.registerChannel("testChannel", new DirectChannel()); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); CountDownLatch latch = new CountDownLatch(1); OutboundOnlyTestBean testBean = new OutboundOnlyTestBean(latch); postProcessor.postProcessAfterInitialization(testBean, "testBean"); context.refresh(); DestinationResolver channelResolver = new BeanFactoryChannelResolver(context); MessageChannel testChannel = channelResolver.resolveDestination("testChannel"); - testChannel.send(new GenericMessage("foo")); + testChannel.send(new GenericMessage<>("foo")); latch.await(1000, TimeUnit.MILLISECONDS); assertThat(latch.getCount()).isEqualTo(0); assertThat(testBean.getMessageText()).isEqualTo("foo"); context.close(); } - @Test(expected = IllegalArgumentException.class) + @Test public void testPostProcessorWithoutBeanFactory() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.afterPropertiesSet(); + assertThatIllegalArgumentException() + .isThrownBy(postProcessor::afterPropertiesSet); } @Test @@ -168,9 +166,7 @@ public void testChannelResolution() { context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); context.registerChannel("eventBus", eventBus); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean(); postProcessor.postProcessAfterInitialization(bean, "testBean"); context.refresh(); @@ -193,9 +189,7 @@ public void testProxiedMessageEndpointAnnotation() { QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); ProxyFactory proxyFactory = new ProxyFactory(new AnnotatedTestService()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); @@ -213,9 +207,7 @@ public void testMessageEndpointAnnotationInherited() { QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass"); context.refresh(); inputChannel.send(new GenericMessage<>("world")); @@ -231,9 +223,7 @@ public void testMessageEndpointAnnotationInheritedWithProxy() { QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); @@ -251,9 +241,7 @@ public void testMessageEndpointAnnotationInheritedFromInterface() { QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); context.refresh(); inputChannel.send(new GenericMessage<>("ABC")); @@ -269,9 +257,7 @@ public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedCh QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); context.refresh(); inputChannel.send(new GenericMessage<>("ABC")); @@ -287,9 +273,7 @@ public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() { QueueChannel outputChannel = new QueueChannel(); context.registerChannel("inputChannel", inputChannel); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); @@ -307,9 +291,7 @@ public void testTransformer() { context.registerChannel("inputChannel", inputChannel); QueueChannel outputChannel = new QueueChannel(); context.registerChannel("outputChannel", outputChannel); - MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); + MessagingAnnotationPostProcessor postProcessor = prepareMessagingAnnotationPostProcessor(context); TransformerAnnotationTestBean testBean = new TransformerAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); context.refresh(); @@ -319,6 +301,15 @@ public void testTransformer() { context.close(); } + private MessagingAnnotationPostProcessor prepareMessagingAnnotationPostProcessor( + ConfigurableApplicationContext context) { + + MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); + postProcessor.setBeanFactory(context.getBeanFactory()); + postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); + return postProcessor; + } @MessageEndpoint public static class OutboundOnlyTestBean { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index ec7f3b77139..f40ce67957c 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.config.annotation; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.fail; import java.util.ArrayList; @@ -102,7 +103,8 @@ public class MessagingAnnotationsWithBeanAnnotationTests { private List> collector; @Autowired(required = false) - @Qualifier("messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.skippedMessageHandler.serviceActivator") + @Qualifier("messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.skippedMessageHandler" + + ".serviceActivator") private EventDrivenConsumer skippedServiceActivator; @Autowired(required = false) @@ -217,15 +219,9 @@ public void testMessagingAnnotationsFlow() { @Test public void testInvalidMessagingAnnotationsConfig() { - try { - new AnnotationConfigApplicationContext(InvalidContextConfiguration.class).close(); - fail("BeanCreationException expected"); - } - catch (Exception e) { - assertThat(e).isInstanceOf(BeanCreationException.class); - assertThat(e.getCause()).isInstanceOf(BeanDefinitionValidationException.class); - assertThat(e.getMessage()).contains("The attribute causing the ambiguity is: [applySequence]."); - } + assertThatExceptionOfType(BeanDefinitionValidationException.class) + .isThrownBy(() -> new AnnotationConfigApplicationContext(InvalidContextConfiguration.class)) + .withMessageContaining("The attribute causing the ambiguity is: [applySequence]."); } @Configuration diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java index c3151077730..ebd28719a05 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java @@ -72,6 +72,7 @@ public void testRouter() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); TestRouter testRouter = new TestRouter(); postProcessor.postProcessAfterInitialization(testRouter, "test"); context.refresh(); @@ -86,6 +87,7 @@ public void testRouterWithListParam() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); TestRouter testRouter = new TestRouter(); postProcessor.postProcessAfterInitialization(testRouter, "test"); context.refresh(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java index 119e3949b5e..35b7d3128e6 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java @@ -62,6 +62,7 @@ public void testSplitterAnnotation() { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); + postProcessor.afterSingletonsInstantiated(); TestSplitter splitter = new TestSplitter(); postProcessor.postProcessAfterInitialization(splitter, "testSplitter"); context.refresh(); From 8a1e1ff6cb709b9fef75822bcffb683419fe3cb7 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Wed, 27 Feb 2019 20:29:59 -0500 Subject: [PATCH 2/3] * Fix unused imports in the `MessagingAnnotationsWithBeanAnnotationTests` --- .../MessagingAnnotationsWithBeanAnnotationTests.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index f40ce67957c..8729d13930e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.fail; import java.util.ArrayList; import java.util.List; @@ -33,7 +32,6 @@ import org.junit.Test; import org.junit.runner.RunWith; -import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.support.BeanDefinitionValidationException; @@ -79,7 +77,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit4.SpringRunner; /** * @author Artem Bilan @@ -89,7 +87,7 @@ * @since 4.0 */ @ContextConfiguration(classes = MessagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.class) -@RunWith(SpringJUnit4ClassRunner.class) +@RunWith(SpringRunner.class) @DirtiesContext public class MessagingAnnotationsWithBeanAnnotationTests { @@ -103,8 +101,8 @@ public class MessagingAnnotationsWithBeanAnnotationTests { private List> collector; @Autowired(required = false) - @Qualifier("messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.skippedMessageHandler" + - ".serviceActivator") + @Qualifier( + "messagingAnnotationsWithBeanAnnotationTests.ContextConfiguration.skippedMessageHandler.serviceActivator") private EventDrivenConsumer skippedServiceActivator; @Autowired(required = false) From e011a2e379553bce059f668022e73f773d21d1ec Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 28 Feb 2019 11:46:46 -0500 Subject: [PATCH 3/3] * Fix `IntegrationEndpointsInitializer` in the testing framework to handle all the possible `AbstractEndpoint` beans registration. See its JavaDocs for more info * Fix `AbstractCorrelatingMessageHandlerParser` and `AbstractConsumerEndpointParser` to use bean names for `outputChannel` and `discardChannel` instead of bean references. Since `MessagingAnnotationPostProcessor` now registers endpoints and beans for channels much later, than parsers, we can't rely on bean references any more there. * Fixes for failing tests which expected `outputChannel/discardChannel` bean references, when it is already just their names for late binding. * Apply some code style polishing for the affected classes. * Add `@Nullable` for `MessageSelector` parameter in the `QueueChannel.purge()` --- .../integration/channel/QueueChannel.java | 4 +- .../channel/QueueChannelOperations.java | 11 +-- ...stractSimpleMessageHandlerFactoryBean.java | 29 ++++++-- .../xml/AbstractConsumerEndpointParser.java | 7 +- ...stractCorrelatingMessageHandlerParser.java | 5 +- .../channel/DirectChannelTests.java | 52 +++++++------- .../config/AggregatorParserTests.java | 10 ++- ...xml => ResequencerParserTests-context.xml} | 0 .../config/ResequencerParserTests.java | 69 +++++++++---------- .../config/xml/DelayerParserTests.java | 7 +- .../config/xml/EnricherParserTests.java | 3 +- ...icherParserTestsWithoutRequestChannel.java | 2 +- ...rserWithRequestPayloadExpressionTests.java | 3 +- .../file/config/FileSplitterParserTests.java | 10 +-- .../FtpStreamingMessageSourceTests.java | 4 +- .../SftpStreamingMessageSourceTests.java | 19 ++++- .../IntegrationEndpointsInitializer.java | 45 ++++++++---- .../mock/MockMessageSourceTests-context.xml | 2 +- .../test/mock/MockMessageSourceTests.java | 29 ++++---- 19 files changed, 186 insertions(+), 125 deletions(-) rename spring-integration-core/src/test/java/org/springframework/integration/config/{resequencerParserTests.xml => ResequencerParserTests-context.xml} (100%) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java index db7a9879070..d805218fa70 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -175,7 +175,7 @@ public List> clear() { } @Override - public List> purge(MessageSelector selector) { + public List> purge(@Nullable MessageSelector selector) { if (selector == null) { return this.clear(); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannelOperations.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannelOperations.java index f23cde74145..a32672d6da1 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannelOperations.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/QueueChannelOperations.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,12 +19,15 @@ import java.util.List; import org.springframework.integration.core.MessageSelector; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; /** * Operations available on a channel that has queuing semantics. * * @author Gary Russell + * @author Artem Bilan + * * @since 3.0 * */ @@ -32,25 +35,25 @@ public interface QueueChannelOperations { /** * Remove all {@link Message Messages} from this channel. - * * @return The messages that were removed. */ List> clear(); /** * Remove any {@link Message Messages} that are not accepted by the provided selector. - * * @param selector The message selector. * @return The list of messages that were purged. */ - List> purge(MessageSelector selector); + List> purge(@Nullable MessageSelector selector); /** + * Obtain the current number of queued {@link Message Messages} in this channel. * @return The current number of queued {@link Message Messages} in this channel. */ int getQueueSize(); /** + * Obtain the remaining capacity of this channel. * @return The remaining capacity of this channel. */ int getRemainingCapacity(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java b/spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java index 0643242c1e1..d0524f696b2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/AbstractSimpleMessageHandlerFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -70,6 +70,8 @@ public abstract class AbstractSimpleMessageHandlerFactoryBean adviceChain; @@ -119,6 +121,15 @@ public void setOutputChannel(MessageChannel outputChannel) { this.outputChannel = outputChannel; } + /** + * Set the handler's output channel name. + * @param outputChannelName the output channel bean name to set. + * @since 5.1.4 + */ + public void setOutputChannelName(String outputChannelName) { + this.outputChannelName = outputChannelName; + } + /** * Set the order in which the handler will be subscribed to its channel * (when subscribable). @@ -197,9 +208,7 @@ protected final H createHandlerInternal() { ((ApplicationEventPublisherAware) this.handler) .setApplicationEventPublisher(this.applicationEventPublisher); } - if (this.handler instanceof MessageProducer && this.outputChannel != null) { - ((MessageProducer) this.handler).setOutputChannel(this.outputChannel); - } + configureOutputChannelIfAny(); Object actualHandler = extractTarget(this.handler); if (actualHandler == null) { actualHandler = this.handler; @@ -247,6 +256,18 @@ else if (this.logger.isDebugEnabled()) { return this.handler; } + private void configureOutputChannelIfAny() { + if (this.handler instanceof MessageProducer) { + MessageProducer messageProducer = (MessageProducer) this.handler; + if (this.outputChannel != null) { + messageProducer.setOutputChannel(this.outputChannel); + } + else if (this.outputChannelName != null) { + messageProducer.setOutputChannelName(this.outputChannelName); + } + } + } + protected abstract H createHandler(); @Override diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java index 297fc40fbf9..b2082d6b1f6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,8 +84,9 @@ protected String getInputChannelAttributeName() { @Override protected final AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { - BeanDefinitionBuilder handlerBuilder = this.parseHandler(element, parserContext); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(handlerBuilder, element, "output-channel"); + BeanDefinitionBuilder handlerBuilder = parseHandler(element, parserContext); + IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "output-channel", + "outputChannelName"); IntegrationNamespaceUtils.setValueIfAttributeDefined(handlerBuilder, element, "order"); Element txElement = DomUtils.getChildElementByTagName(element, "transactional"); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java index 47ffcd7aceb..bf770f49dcf 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/config/xml/AbstractCorrelatingMessageHandlerParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -78,8 +78,9 @@ protected void doParse(BeanDefinitionBuilder builder, Element element, BeanMetad IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, MESSAGE_STORE_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "scheduler", "taskScheduler"); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "lock-registry"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE, + "discardChannelName"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_EXPIRY_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "empty-group-min-timeout", diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/DirectChannelTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/DirectChannelTests.java index eb7992a87a5..b9018851149 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/DirectChannelTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/DirectChannelTests.java @@ -29,7 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; -import org.junit.Test; +import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.springframework.beans.DirectFieldAccessor; @@ -38,6 +38,7 @@ import org.springframework.integration.dispatcher.UnicastingDispatcher; import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.test.util.TestUtils; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -48,18 +49,19 @@ * @author Mark Fisher * @author Oleg Zhurakousky * @author Gary Russell + * @author Artem Bilan */ -public class DirectChannelTests { +class DirectChannelTests { @Test - public void testSend() { + void testSend() { DirectChannel channel = new DirectChannel(); Log logger = spy(TestUtils.getPropertyValue(channel, "logger", Log.class)); when(logger.isDebugEnabled()).thenReturn(true); new DirectFieldAccessor(channel).setPropertyValue("logger", logger); ThreadNameExtractingTestTarget target = new ThreadNameExtractingTestTarget(); channel.subscribe(target); - GenericMessage message = new GenericMessage("test"); + GenericMessage message = new GenericMessage<>("test"); assertThat(channel.send(message)).isTrue(); assertThat(target.threadName).isEqualTo(Thread.currentThread().getName()); DirectFieldAccessor channelAccessor = new DirectFieldAccessor(channel); @@ -76,7 +78,7 @@ public void testSend() { } @Test - public void testSendPerfOneHandler() { + void testSendPerfOneHandler() { /* * INT-3308 - used to run 12 million/sec * 1. optimize for single handler 20 million/sec @@ -89,7 +91,7 @@ public void testSendPerfOneHandler() { DirectChannel channel = new DirectChannel(); final AtomicInteger count = new AtomicInteger(); channel.subscribe(message -> count.incrementAndGet()); - GenericMessage message = new GenericMessage("test"); + GenericMessage message = new GenericMessage<>("test"); assertThat(channel.send(message)).isTrue(); for (int i = 0; i < 10000000; i++) { channel.send(message); @@ -97,7 +99,7 @@ public void testSendPerfOneHandler() { } @Test - public void testSendPerfTwoHandlers() { + void testSendPerfTwoHandlers() { /* * INT-3308 - used to run 6.4 million/sec * 1. Skip empty iterators as above 7.2 million/sec @@ -110,7 +112,7 @@ public void testSendPerfTwoHandlers() { final AtomicInteger count2 = new AtomicInteger(); channel.subscribe(message -> count1.incrementAndGet()); channel.subscribe(message -> count2.getAndIncrement()); - GenericMessage message = new GenericMessage("test"); + GenericMessage message = new GenericMessage<>("test"); assertThat(channel.send(message)).isTrue(); for (int i = 0; i < 10000000; i++) { channel.send(message); @@ -120,7 +122,7 @@ public void testSendPerfTwoHandlers() { } @Test - public void testSendPerfFixedSubscriberChannel() { + void testSendPerfFixedSubscriberChannel() { /* * INT-3308 - 96 million/sec * NOTE: in order to get a measurable time, I had to add some code to the handler - @@ -130,7 +132,7 @@ public void testSendPerfFixedSubscriberChannel() { */ final AtomicInteger count = new AtomicInteger(); FixedSubscriberChannel channel = new FixedSubscriberChannel(message -> count.incrementAndGet()); - GenericMessage message = new GenericMessage("test"); + GenericMessage message = new GenericMessage<>("test"); assertThat(channel.send(message)).isTrue(); for (int i = 0; i < 100000000; i++) { channel.send(message, 0); @@ -138,26 +140,27 @@ public void testSendPerfFixedSubscriberChannel() { } @Test - public void testSendInSeparateThread() throws InterruptedException { + void testSendInSeparateThread() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); final DirectChannel channel = new DirectChannel(); ThreadNameExtractingTestTarget target = new ThreadNameExtractingTestTarget(latch); channel.subscribe(target); - final GenericMessage message = new GenericMessage("test"); + final GenericMessage message = new GenericMessage<>("test"); new Thread((Runnable) () -> channel.send(message), "test-thread").start(); latch.await(1000, TimeUnit.MILLISECONDS); assertThat(target.threadName).isEqualTo("test-thread"); } - @Test // See INT-2434 - public void testChannelCreationWithBeanDefinitionOverrideTrue() throws Exception { - ClassPathXmlApplicationContext parentContext = new ClassPathXmlApplicationContext("parent-config.xml", this.getClass()); + @Test + void testChannelCreationWithBeanDefinitionOverrideTrue() throws Exception { + ClassPathXmlApplicationContext parentContext = + new ClassPathXmlApplicationContext("parent-config.xml", getClass()); MessageChannel parentChannelA = parentContext.getBean("parentChannelA", MessageChannel.class); MessageChannel parentChannelB = parentContext.getBean("parentChannelB", MessageChannel.class); ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(); context.setAllowBeanDefinitionOverriding(false); - context.setConfigLocations(new String[]{"classpath:org/springframework/integration/channel/channel-override-config.xml"}); + context.setConfigLocations("classpath:org/springframework/integration/channel/channel-override-config.xml"); context.setParent(parentContext); Method method = ReflectionUtils.findMethod(ClassPathXmlApplicationContext.class, "obtainFreshBeanFactory"); method.setAccessible(true); @@ -177,28 +180,28 @@ public void testChannelCreationWithBeanDefinitionOverrideTrue() throws Exception assertThat(context.containsBean("channelD")).isTrue(); EventDrivenConsumer consumerA = context.getBean("serviceA", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerA, "inputChannel")).isEqualTo(context.getBean("channelA")); - assertThat(TestUtils.getPropertyValue(consumerA, "handler.outputChannel")) - .isEqualTo(context.getBean("channelB")); + assertThat(TestUtils.getPropertyValue(consumerA, "handler.outputChannelName")).isEqualTo("channelB"); EventDrivenConsumer consumerB = context.getBean("serviceB", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerB, "inputChannel")).isEqualTo(context.getBean("channelB")); - assertThat(TestUtils.getPropertyValue(consumerB, "handler.outputChannel")) - .isEqualTo(context.getBean("channelC")); + assertThat(TestUtils.getPropertyValue(consumerB, "handler.outputChannelName")).isEqualTo("channelC"); EventDrivenConsumer consumerC = context.getBean("serviceC", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerC, "inputChannel")).isEqualTo(context.getBean("channelC")); - assertThat(TestUtils.getPropertyValue(consumerC, "handler.outputChannel")) - .isEqualTo(context.getBean("channelD")); + assertThat(TestUtils.getPropertyValue(consumerC, "handler.outputChannelName")).isEqualTo("channelD"); EventDrivenConsumer consumerD = context.getBean("serviceD", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerD, "inputChannel")).isEqualTo(parentChannelA); - assertThat(TestUtils.getPropertyValue(consumerD, "handler.outputChannel")).isEqualTo(parentChannelB); + assertThat(TestUtils.getPropertyValue(consumerD, "handler.outputChannelName")).isEqualTo("parentChannelB"); EventDrivenConsumer consumerE = context.getBean("serviceE", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerE, "inputChannel")).isEqualTo(parentChannelB); EventDrivenConsumer consumerF = context.getBean("serviceF", EventDrivenConsumer.class); assertThat(TestUtils.getPropertyValue(consumerF, "inputChannel")).isEqualTo(channelEarly); + + context.close(); + parentContext.close(); } @@ -213,7 +216,7 @@ private static class ThreadNameExtractingTestTarget implements MessageHandler { this(null); } - ThreadNameExtractingTestTarget(CountDownLatch latch) { + ThreadNameExtractingTestTarget(@Nullable CountDownLatch latch) { this.latch = latch; } @@ -224,6 +227,7 @@ public void handleMessage(Message message) { this.latch.countDown(); } } + } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 0ac6da90d66..452ea766ad7 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -153,8 +153,6 @@ public void testPropertyAssignment() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("completelyDefinedAggregator"); ReleaseStrategy releaseStrategy = (ReleaseStrategy) context.getBean("releaseStrategy"); CorrelationStrategy correlationStrategy = (CorrelationStrategy) context.getBean("correlationStrategy"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); - MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("handler"); assertThat(consumer).isInstanceOf(AggregatingMessageHandler.class); DirectFieldAccessor accessor = new DirectFieldAccessor(consumer); @@ -172,12 +170,12 @@ public void testPropertyAssignment() { assertThat(accessor.getPropertyValue("correlationStrategy")) .as("The AggregatorEndpoint is not injected with the appropriate CorrelationStrategy instance") .isEqualTo(correlationStrategy); - assertThat(accessor.getPropertyValue("outputChannel")) + assertThat(accessor.getPropertyValue("outputChannelName")) .as("The AggregatorEndpoint is not injected with the appropriate output channel") - .isEqualTo(outputChannel); - assertThat(accessor.getPropertyValue("discardChannel")) + .isEqualTo("outputChannel"); + assertThat(accessor.getPropertyValue("discardChannelName")) .as("The AggregatorEndpoint is not injected with the appropriate discard channel") - .isEqualTo(discardChannel); + .isEqualTo("discardChannel"); assertThat(TestUtils.getPropertyValue(consumer, "messagingTemplate.sendTimeout")) .as("The AggregatorEndpoint is not set with the appropriate timeout value").isEqualTo(86420000L); assertThat(accessor.getPropertyValue("sendPartialResultOnExpiry")) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests-context.xml similarity index 100% rename from spring-integration-core/src/test/java/org/springframework/integration/config/resequencerParserTests.xml rename to spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests-context.xml diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 44052c84230..c6a10e1b574 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -21,11 +21,10 @@ import java.util.List; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.aggregator.CorrelationStrategy; import org.springframework.integration.aggregator.MethodInvokingCorrelationStrategy; import org.springframework.integration.aggregator.MethodInvokingReleaseStrategy; @@ -38,7 +37,7 @@ import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageChannel; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; /** * @author Marius Bogoevici @@ -49,17 +48,14 @@ * @author Artem Bilan * @author Gary Russell */ +@SpringJUnitConfig public class ResequencerParserTests { + @Autowired private ApplicationContext context; - @Before - public void setUp() { - this.context = new ClassPathXmlApplicationContext("resequencerParserTests.xml", this.getClass()); - } - @Test - public void testDefaultResequencerProperties() { + void testDefaultResequencerProperties() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("defaultResequencer"); ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); @@ -69,7 +65,9 @@ public void testDefaultResequencerProperties() { resequencer, "messagingTemplate.sendTimeout")) .as("The ResequencerEndpoint is not set with the appropriate timeout value").isEqualTo(-1L); assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry")) - .as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' " + + .as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " + + "timeout'" + + " " + "flag") .isEqualTo(false); assertThat(getPropertyValue(resequencer, "releasePartialSequences")) @@ -78,23 +76,21 @@ public void testDefaultResequencerProperties() { } @Test - public void testPropertyAssignment() throws Exception { - EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("completelyDefinedResequencer"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); - MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); + void testPropertyAssignment() { + EventDrivenConsumer endpoint = this.context.getBean("completelyDefinedResequencer", EventDrivenConsumer.class); ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); - assertThat(getPropertyValue(resequencer, "outputChannel")) + assertThat(getPropertyValue(resequencer, "outputChannelName")) .as("The ResequencerEndpoint is not injected with the appropriate output channel") - .isEqualTo(outputChannel); - assertThat(getPropertyValue(resequencer, "discardChannel")) + .isEqualTo("outputChannel"); + assertThat(getPropertyValue(resequencer, "discardChannelName")) .as("The ResequencerEndpoint is not injected with the appropriate discard channel") - .isEqualTo(discardChannel); + .isEqualTo("discardChannel"); assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")) .as("The ResequencerEndpoint is not set with the appropriate timeout value").isEqualTo(86420000L); assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry")) - .as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' " + - "flag") + .as("The ResequencerEndpoint is not configured with the appropriate " + + "'send partial results on timeout' flag") .isEqualTo(true); assertThat(getPropertyValue(getPropertyValue(resequencer, "releaseStrategy"), "releasePartialSequences")) .as("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag") @@ -104,7 +100,7 @@ public void testPropertyAssignment() throws Exception { } @Test - public void testCorrelationStrategyRefOnly() throws Exception { + void testCorrelationStrategyRefOnly() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context .getBean("resequencerWithCorrelationStrategyRefOnly"); ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", @@ -116,7 +112,7 @@ public void testCorrelationStrategyRefOnly() throws Exception { } @Test - public void testReleaseStrategyRefOnly() throws Exception { + void testReleaseStrategyRefOnly() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("resequencerWithReleaseStrategyRefOnly"); ResequencingMessageHandler resequencer = getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); @@ -127,7 +123,7 @@ public void testReleaseStrategyRefOnly() throws Exception { } @Test - public void testReleaseStrategyRefAndMethod() throws Exception { + void testReleaseStrategyRefAndMethod() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context .getBean("resequencerWithReleaseStrategyRefAndMethod"); ResequencingMessageHandler resequencer = getPropertyValue(endpoint, "handler", @@ -150,17 +146,17 @@ public void testReleaseStrategyRefAndMethod() throws Exception { } @Test - public void shouldSetReleasePartialSequencesFlag() { - EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("completelyDefinedResequencer"); - ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", - ResequencingMessageHandler.class); + void shouldSetReleasePartialSequencesFlag() { + EventDrivenConsumer endpoint = (EventDrivenConsumer) context.getBean("completelyDefinedResequencer"); + ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", + ResequencingMessageHandler.class); assertThat(getPropertyValue(getPropertyValue(resequencer, "releaseStrategy"), "releasePartialSequences")) .as("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag") .isEqualTo(true); } @Test - public void testCorrelationStrategyRefAndMethod() throws Exception { + void testCorrelationStrategyRefAndMethod() { EventDrivenConsumer endpoint = (EventDrivenConsumer) context .getBean("resequencerWithCorrelationStrategyRefAndMethod"); ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", @@ -173,19 +169,13 @@ public void testCorrelationStrategyRefAndMethod() throws Exception { assertThat(adapter.getCorrelationKey(MessageBuilder.withPayload("not important").build())).isEqualTo("foo"); } - @SuppressWarnings("unused") - private static Message createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber, - MessageChannel outputChannel) { - return MessageBuilder.withPayload(payload).setCorrelationId(correlationId).setSequenceSize(sequenceSize) - .setSequenceNumber(sequenceNumber).setReplyChannel(outputChannel).build(); - } - static class TestCorrelationStrategy implements CorrelationStrategy { @Override public Object getCorrelationKey(Message message) { return "test"; } + } static class TestCorrelationStrategyPojo { @@ -193,22 +183,27 @@ static class TestCorrelationStrategyPojo { public Object foo(Object o) { return "foo"; } + } static class TestReleaseStrategy implements ReleaseStrategy { + @Override public boolean canRelease(MessageGroup group) { return true; } + } static class TestReleaseStrategyPojo { + private int invocationCount = 0; - public boolean bar(List> messages) { + public boolean bar(List> __) { invocationCount++; return true; } + } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java index 1f450eee42b..edb327e0cbe 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java @@ -48,6 +48,7 @@ * @author Artem Bilan * @author Gunnar Hillert * @author Gary Russell + * * @since 1.0.3 */ @RunWith(SpringJUnit4ClassRunner.class) @@ -61,7 +62,7 @@ public class DelayerParserTests { public void defaultScheduler() { DelayHandler delayHandler = context.getBean("delayerWithDefaultScheduler.handler", DelayHandler.class); assertThat(delayHandler.getOrder()).isEqualTo(99); - assertThat(TestUtils.getPropertyValue(delayHandler, "outputChannel")).isEqualTo(context.getBean("output")); + assertThat(delayHandler.getOutputChannel()).isSameAs(this.context.getBean("output")); assertThat(TestUtils.getPropertyValue(delayHandler, "defaultDelay", Long.class)).isEqualTo(new Long(1234)); //INT-2243 assertThat(TestUtils.getPropertyValue(delayHandler, "delayExpression")).isNotNull(); @@ -81,8 +82,8 @@ public void customScheduler() { DelayHandler delayHandler = (DelayHandler) handler; assertThat(delayHandler.getOrder()).isEqualTo(Ordered.LOWEST_PRECEDENCE); DirectFieldAccessor accessor = new DirectFieldAccessor(delayHandler); - assertThat(accessor.getPropertyValue("outputChannel")).isEqualTo(context.getBean("output")); - assertThat(accessor.getPropertyValue("defaultDelay")).isEqualTo(new Long(0)); + assertThat(delayHandler.getOutputChannel()).isSameAs(this.context.getBean("output")); + assertThat(accessor.getPropertyValue("defaultDelay")).isEqualTo(0L); assertThat(accessor.getPropertyValue("taskScheduler")).isEqualTo(context.getBean("testScheduler")); assertThat(accessor.getPropertyValue("taskScheduler")).isNotNull(); assertThat(new DirectFieldAccessor( diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTests.java index 25be4f6e6e6..34837214d9d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTests.java @@ -71,7 +71,7 @@ public void configurationCheck() { ContentEnricher enricher = (ContentEnricher) handler; assertThat(enricher.getOrder()).isEqualTo(99); DirectFieldAccessor accessor = new DirectFieldAccessor(enricher); - assertThat(accessor.getPropertyValue("outputChannel")).isEqualTo(context.getBean("output")); + assertThat(accessor.getPropertyValue("outputChannelName")).isEqualTo("output"); assertThat(accessor.getPropertyValue("shouldClonePayload")).isEqualTo(true); assertThat(accessor.getPropertyValue("requestPayloadExpression")).isNull(); assertThat(TestUtils.getPropertyValue(enricher, "gateway.beanFactory")).isNotNull(); @@ -136,6 +136,7 @@ class Foo extends AbstractReplyProducingMessageHandler { protected Object handleRequestMessage(Message requestMessage) { return new Source("foo"); } + } Foo foo = new Foo(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTestsWithoutRequestChannel.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTestsWithoutRequestChannel.java index ce9500b87c1..f2e710d32fd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTestsWithoutRequestChannel.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserTestsWithoutRequestChannel.java @@ -64,7 +64,7 @@ public void configurationCheck() { DirectFieldAccessor accessor = new DirectFieldAccessor(enricher); assertThat(accessor.getPropertyValue("gateway")).isNull(); - assertThat(accessor.getPropertyValue("outputChannel")).isEqualTo(context.getBean("output")); + assertThat(accessor.getPropertyValue("outputChannelName")).isEqualTo("output"); assertThat(accessor.getPropertyValue("shouldClonePayload")).isEqualTo(false); assertThat(accessor.getPropertyValue("requestPayloadExpression")).isNull(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserWithRequestPayloadExpressionTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserWithRequestPayloadExpressionTests.java index e9d73cb7d1c..d37fb282351 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserWithRequestPayloadExpressionTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/EnricherParserWithRequestPayloadExpressionTests.java @@ -42,6 +42,7 @@ /** * @author Mark Fisher * @author Artem Bilan + * * @since 2.1 */ @RunWith(SpringJUnit4ClassRunner.class) @@ -62,7 +63,7 @@ public void configurationCheck() { ContentEnricher enricher = (ContentEnricher) handler; assertThat(enricher.getOrder()).isEqualTo(99); DirectFieldAccessor accessor = new DirectFieldAccessor(enricher); - assertThat(accessor.getPropertyValue("outputChannel")).isEqualTo(context.getBean("output")); + assertThat(accessor.getPropertyValue("outputChannelName")).isEqualTo("output"); assertThat(accessor.getPropertyValue("shouldClonePayload")).isEqualTo(false); Expression requestPayloadExpression = (Expression) accessor.getPropertyValue("requestPayloadExpression"); diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/config/FileSplitterParserTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/config/FileSplitterParserTests.java index 6dcde890066..8c090c7d704 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/config/FileSplitterParserTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/config/FileSplitterParserTests.java @@ -65,12 +65,12 @@ public void testComplete() { assertThat(TestUtils.getPropertyValue(this.splitter, "applySequence", Boolean.class)).isTrue(); assertThat(TestUtils.getPropertyValue(this.splitter, "charset")).isEqualTo(Charset.forName("UTF-8")); assertThat(TestUtils.getPropertyValue(this.splitter, "messagingTemplate.sendTimeout")).isEqualTo(5L); - assertThat(TestUtils.getPropertyValue(this.splitter, "outputChannel")).isEqualTo(this.out); - assertThat(TestUtils.getPropertyValue(this.splitter, "order")).isEqualTo(2); assertThat(TestUtils.getPropertyValue(this.splitter, "firstLineHeaderName")).isEqualTo("foo"); - assertThat(TestUtils.getPropertyValue(this.fullBoat, "inputChannel")).isEqualTo(this.in); - assertThat(TestUtils.getPropertyValue(this.fullBoat, "autoStartup", Boolean.class)).isFalse(); - assertThat(TestUtils.getPropertyValue(this.fullBoat, "phase")).isEqualTo(1); + assertThat(this.splitter.getOutputChannel()).isSameAs(this.out); + assertThat(this.splitter.getOrder()).isEqualTo(2); + assertThat(this.fullBoat.getInputChannel()).isSameAs(this.in); + assertThat(this.fullBoat.isAutoStartup()).isFalse(); + assertThat(this.fullBoat.getPhase()).isEqualTo(1); } } diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java index 0477e60e218..c8f2fbbe11f 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/FtpStreamingMessageSourceTests.java @@ -115,11 +115,11 @@ public void testAllContents() { this.data.purge(null); this.metadataMap.clear(); this.adapter.start(); + assertThat(this.data.receive(10000)).isNotNull(); received = (Message) this.data.receive(10000); assertThat(received).isNotNull(); - this.adapter.stop(); - assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO)).isInstanceOf(FtpFileInfo.class); + this.adapter.stop(); } @Test diff --git a/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java b/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java index 0145f1abe99..22803a5af70 100644 --- a/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java +++ b/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java @@ -21,6 +21,8 @@ import java.io.InputStream; import java.util.Arrays; import java.util.Comparator; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.junit.Test; import org.junit.runner.RunWith; @@ -38,8 +40,10 @@ import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.filters.AcceptAllFileListFilter; import org.springframework.integration.file.remote.session.SessionFactory; +import org.springframework.integration.metadata.SimpleMetadataStore; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.sftp.SftpTestSupport; +import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter; import org.springframework.integration.sftp.session.SftpFileInfo; import org.springframework.integration.sftp.session.SftpRemoteFileTemplate; import org.springframework.integration.transformer.StreamTransformer; @@ -76,6 +80,9 @@ public class SftpStreamingMessageSourceTests extends SftpTestSupport { @Autowired private ApplicationContext context; + @Autowired + private ConcurrentMap metadataMap; + @SuppressWarnings("unchecked") @Test public void testAllContents() { @@ -106,7 +113,9 @@ public void testAllContents() { this.adapter.stop(); this.source.setFileInfoJson(false); this.data.purge(null); + this.metadataMap.clear(); this.adapter.start(); + assertThat(this.data.receive(10000)).isNotNull(); received = (Message) this.data.receive(10000); assertThat(received).isNotNull(); assertThat(received.getHeaders().get(FileHeaders.REMOTE_FILE_INFO)).isInstanceOf(SftpFileInfo.class); @@ -179,12 +188,20 @@ public PollerMetadata defaultPoller() { return pollerMetadata; } + @Bean + public ConcurrentMap metadataMap() { + return new ConcurrentHashMap<>(); + } + + @Bean @InboundChannelAdapter(channel = "stream", autoStartup = "false") public MessageSource sftpMessageSource() { SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template(), Comparator.comparing(LsEntry::getFilename)); - messageSource.setFilter(new AcceptAllFileListFilter<>()); + messageSource.setFilter( + new SftpPersistentAcceptOnceFileListFilter( + new SimpleMetadataStore(metadataMap()), "testStreaming")); messageSource.setRemoteDirectory("sftpSource/"); return messageSource; } diff --git a/spring-integration-test/src/main/java/org/springframework/integration/test/context/IntegrationEndpointsInitializer.java b/spring-integration-test/src/main/java/org/springframework/integration/test/context/IntegrationEndpointsInitializer.java index 593658be1d1..5bff7f6b35c 100644 --- a/spring-integration-test/src/main/java/org/springframework/integration/test/context/IntegrationEndpointsInitializer.java +++ b/spring-integration-test/src/main/java/org/springframework/integration/test/context/IntegrationEndpointsInitializer.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 the original author or authors. + * Copyright 2017-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,13 @@ package org.springframework.integration.test.context; -import java.util.Map; +import java.util.Arrays; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.SmartInitializingSingleton; +import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.util.Assert; @@ -31,19 +32,30 @@ * A component to customize {@link AbstractEndpoint} beans according * to the provided options in the {@link SpringIntegrationTest} annotation * after all beans are registered in the application context but before its refresh. + *

+ * This class implements both {@link SmartInitializingSingleton} and {@link BeanPostProcessor} + * to cover all the possible variants of {@link AbstractEndpoint} beans registration. + * First of all a bean for this class is registered in the application context, when XML configurations + * are parsed and registered already by the Spring Testing Framework, therefore a + * {@link BeanPostProcessor#postProcessBeforeInitialization(Object, String)} hook is not called for those beans. + * On the other hand we can't always rely on just a {@link SmartInitializingSingleton#afterSingletonsInstantiated()} + * because {@link SmartInitializingSingleton} beans are not ordered and some implementations may register beans + * later, than this {@link #afterSingletonsInstantiated()} is called. + * Plus beans might be registered at runtime, therefore {@link #postProcessBeforeInitialization(Object, String)} + * is still applied. * * @author Artem Bilan * * @since 5.0 */ -class IntegrationEndpointsInitializer implements SmartInitializingSingleton, BeanFactoryAware { +class IntegrationEndpointsInitializer implements SmartInitializingSingleton, BeanPostProcessor, BeanFactoryAware { - private final SpringIntegrationTest springIntegrationTest; + private final String[] patterns; private ConfigurableListableBeanFactory beanFactory; IntegrationEndpointsInitializer(SpringIntegrationTest springIntegrationTest) { - this.springIntegrationTest = springIntegrationTest; + this.patterns = springIntegrationTest.noAutoStartup(); } @Override @@ -53,22 +65,25 @@ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { } @Override - public void afterSingletonsInstantiated() { - Map endpoints = this.beanFactory.getBeansOfType(AbstractEndpoint.class); + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof AbstractEndpoint && ((AbstractEndpoint) bean).isAutoStartup() && match(beanName)) { + ((AbstractEndpoint) bean).setAutoStartup(false); + } + return bean; + } - endpoints.entrySet() + @Override + public void afterSingletonsInstantiated() { + this.beanFactory.getBeansOfType(AbstractEndpoint.class) + .entrySet() .stream() - .filter(entry -> match(entry.getKey())) + .filter(entry -> entry.getValue().isAutoStartup() && match(entry.getKey())) .forEach(entry -> entry.getValue().setAutoStartup(false)); } private boolean match(String name) { - for (String pattern : this.springIntegrationTest.noAutoStartup()) { - if (PatternMatchUtils.simpleMatch(pattern, name)) { - return true; - } - } - return false; + return Arrays.stream(this.patterns) + .anyMatch(pattern -> PatternMatchUtils.simpleMatch(pattern, name)); } } diff --git a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests-context.xml b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests-context.xml index 43297e9def8..004846bd9c7 100644 --- a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests-context.xml +++ b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests-context.xml @@ -3,7 +3,7 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd -http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> + http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> diff --git a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests.java b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests.java index aa1159ff698..a562e3ffb99 100644 --- a/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests.java +++ b/spring-integration-test/src/test/java/org/springframework/integration/test/mock/MockMessageSourceTests.java @@ -57,7 +57,7 @@ */ @RunWith(SpringRunner.class) @ContextConfiguration(classes = MockMessageSourceTests.Config.class) -@SpringIntegrationTest(noAutoStartup = {"inboundChannelAdapter", "*Source*"}) +@SpringIntegrationTest(noAutoStartup = { "inboundChannelAdapter", "*Source*" }) @DirtiesContext public class MockMessageSourceTests { @@ -76,7 +76,7 @@ public class MockMessageSourceTests { @After public void tearDown() { this.mockIntegrationContext.resetBeans("mySourceEndpoint", "inboundChannelAdapter"); - results.purge(null); + this.results.purge(null); } @Test @@ -103,8 +103,11 @@ public void testMockMessageSource() { @Test public void testMockMessageSourceInConfig() { - this.applicationContext.getBean("mockMessageSourceTests.Config.testingMessageSource.inboundChannelAdapter", - Lifecycle.class).start(); + Lifecycle channelAdapter = + this.applicationContext + .getBean("mockMessageSourceTests.Config.testingMessageSource.inboundChannelAdapter", + Lifecycle.class); + channelAdapter.start(); Message receive = this.results.receive(10_000); assertThat(receive).isNotNull(); @@ -120,13 +123,13 @@ public void testMockMessageSourceInConfig() { assertThat(receive.getPayload()).isEqualTo(3); } - this.applicationContext.getBean("mockMessageSourceTests.Config.testingMessageSource.inboundChannelAdapter", - Lifecycle.class).stop(); + channelAdapter.stop(); } @Test public void testMockMessageSourceInXml() { - this.applicationContext.getBean("inboundChannelAdapter", Lifecycle.class).start(); + Lifecycle channelAdapter = this.applicationContext.getBean("inboundChannelAdapter", Lifecycle.class); + channelAdapter.start(); Message receive = this.results.receive(10_000); assertThat(receive).isNotNull(); @@ -142,7 +145,7 @@ public void testMockMessageSourceInXml() { assertThat(receive.getPayload()).isEqualTo("c"); } - this.applicationContext.getBean("inboundChannelAdapter", Lifecycle.class).stop(); + channelAdapter.stop(); } @Test @@ -198,6 +201,11 @@ public PollerSpec defaultPoller() { return Pollers.fixedDelay(10); } + @Bean + public QueueChannel results() { + return new QueueChannel(); + } + @Bean public IntegrationFlow myFlow() { return IntegrationFlows @@ -208,11 +216,6 @@ public IntegrationFlow myFlow() { .get(); } - @Bean - public QueueChannel results() { - return new QueueChannel(); - } - @InboundChannelAdapter(channel = "results") @Bean public MessageSource testingMessageSource() {