diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java index 16f4c5da8d6..e827c705ae6 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java @@ -19,22 +19,27 @@ import java.util.Arrays; import java.util.LinkedList; import java.util.List; +import java.util.function.BiFunction; import org.aopalliance.aop.Advice; +import org.reactivestreams.Publisher; import org.springframework.integration.config.ConsumerEndpointFactoryBean; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.AbstractMessageProducingHandler; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.advice.ReactiveRequestHandlerAdvice; import org.springframework.integration.router.AbstractMessageRouter; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.transaction.TransactionInterceptorBuilder; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.scheduling.TaskScheduler; import org.springframework.transaction.TransactionManager; import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.Assert; +import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; /** @@ -176,6 +181,17 @@ public S transactional(boolean handleMessageAdvice) { return transactional(transactionInterceptor); } + /** + * Specify a {@link BiFunction} for customizing {@link Mono} replies via {@link ReactiveRequestHandlerAdvice}. + * @param replyCustomizer the {@link BiFunction} to propagate into {@link ReactiveRequestHandlerAdvice}. + * @return the spec. + * @since 5.3 + * @see ReactiveRequestHandlerAdvice + */ + public S customizeMonoReply(BiFunction, Mono, Publisher> replyCustomizer) { + return advice(new ReactiveRequestHandlerAdvice(replyCustomizer)); + } + /** * @param requiresReply the requiresReply. * @return the endpoint spec. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 23aa6fee12a..df685485c44 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -27,6 +27,7 @@ import org.reactivestreams.Publisher; +import org.springframework.beans.factory.BeanFactory; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.integration.IntegrationMessageHeaderAccessor; @@ -202,8 +203,9 @@ protected void onInit() { super.onInit(); Assert.state(!(this.outputChannelName != null && this.outputChannel != null), //NOSONAR (inconsistent sync) "'outputChannelName' and 'outputChannel' are mutually exclusive."); - if (getBeanFactory() != null) { - this.messagingTemplate.setBeanFactory(getBeanFactory()); + BeanFactory beanFactory = getBeanFactory(); + if (beanFactory != null) { + this.messagingTemplate.setBeanFactory(beanFactory); } this.messagingTemplate.setDestinationResolver(getChannelResolver()); } @@ -222,11 +224,11 @@ public MessageChannel getOutputChannel() { protected void sendOutputs(Object result, Message requestMessage) { if (result instanceof Iterable && shouldSplitOutput((Iterable) result)) { for (Object o : (Iterable) result) { - this.produceOutput(o, requestMessage); + produceOutput(o, requestMessage); } } else if (result != null) { - this.produceOutput(result, requestMessage); + produceOutput(result, requestMessage); } } @@ -246,8 +248,7 @@ protected void produceOutput(Object replyArg, final Message requestMessage) { if (getOutputChannel() == null) { Map routingSlipHeader = obtainRoutingSlipHeader(requestHeaders, reply); if (routingSlipHeader != null) { - Assert.isTrue(routingSlipHeader.size() == 1, - "The RoutingSlip header value must be a SingletonMap"); + Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap"); Object key = routingSlipHeader.keySet().iterator().next(); Object value = routingSlipHeader.values().iterator().next(); Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List"); @@ -298,7 +299,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) { } private void doProduceOutput(Message requestMessage, MessageHeaders requestHeaders, Object reply, - Object replyChannel) { + @Nullable Object replyChannel) { if (this.async && (reply instanceof ListenableFuture || reply instanceof Publisher)) { MessageChannel messageChannel = getOutputChannel(); @@ -341,7 +342,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) { return builder; } - private void asyncNonReactiveReply(Message requestMessage, Object reply, Object replyChannel) { + private void asyncNonReactiveReply(Message requestMessage, Object reply, @Nullable Object replyChannel) { ListenableFuture future; if (reply instanceof ListenableFuture) { future = (ListenableFuture) reply; @@ -508,9 +509,10 @@ private final class ReplyFutureCallback implements ListenableFutureCallback requestMessage; + @Nullable private final Object replyChannel; - ReplyFutureCallback(Message requestMessage, Object replyChannel) { + ReplyFutureCallback(Message requestMessage, @Nullable Object replyChannel) { this.requestMessage = requestMessage; this.replyChannel = replyChannel; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java index 357fca5427a..79efba71c3b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ReactiveRequestHandlerAdvice.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ReactiveRequestHandlerAdvice.java new file mode 100644 index 00000000000..a1696bfc76d --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/advice/ReactiveRequestHandlerAdvice.java @@ -0,0 +1,94 @@ +/* + * Copyright 2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.handler.advice; + +import java.lang.reflect.Method; +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.aopalliance.intercept.MethodInterceptor; +import org.aopalliance.intercept.MethodInvocation; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; + +import org.springframework.messaging.Message; +import org.springframework.util.Assert; + +import reactor.core.publisher.Mono; + +/** + * A {@link MethodInterceptor} for message handlers producing a {@link Mono} as a payload for reply. + * The returned {@link Mono} is customized via {@link Mono#transform(Function)} operator + * calling provided {@code replyCustomizer} {@link BiFunction} with request message as a context. + * + * A customization assumes to use supporting reactive operators like {@link Mono#timeout}, + * {@link Mono#retry}, {@link Mono#tag} etc. + * A {@link Mono#transform(Function)} also can be used for further customization like reactive circuit breaker. + * + * @author Artem Bilan + * + * @since 5.3 + */ +public class ReactiveRequestHandlerAdvice implements MethodInterceptor { + + private static final Log LOGGER = LogFactory.getLog(ReactiveRequestHandlerAdvice.class); + + private final BiFunction, Mono, Publisher> replyCustomizer; + + /** + * Instantiate advice based on a provided {@link BiFunction} customizer. + * @param replyCustomizer the {@link BiFunction} to customize produced {@link Mono}. + */ + public ReactiveRequestHandlerAdvice(BiFunction, Mono, Publisher> replyCustomizer) { + Assert.notNull(replyCustomizer, "'replyCustomizer' must not be null"); + this.replyCustomizer = replyCustomizer; + } + + @Override + public final Object invoke(MethodInvocation invocation) throws Throwable { + Object result = invocation.proceed(); + + Method method = invocation.getMethod(); + Object invocationThis = invocation.getThis(); + Object[] arguments = invocation.getArguments(); + boolean isReactiveMethod = + method.getName().equals("handleRequestMessage") && + (arguments.length == 1 && arguments[0] instanceof Message) && + result instanceof Mono; + if (!isReactiveMethod) { + if (LOGGER.isWarnEnabled()) { + String clazzName = + invocationThis == null + ? method.getDeclaringClass().getName() + : invocationThis.getClass().getName(); + LOGGER.warn("This advice " + getClass().getName() + + " can only be used for MessageHandlers with reactive reply; an attempt to advise method '" + + method.getName() + "' in '" + clazzName + "' is ignored."); + } + return result; + } + + Mono replyMono = (Mono) result; + + Message requestMessage = (Message) arguments[0]; + + return replyMono + .transform(mono -> this.replyCustomizer.apply(requestMessage, mono)); + } + +} diff --git a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java index 2b6502087e0..2651bac319f 100644 --- a/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java +++ b/spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java @@ -280,7 +280,8 @@ private RSocketRequester.RetrieveSpec prepareRequestSpecForPublisher(RSocketRequ private Mono performRetrieve(RSocketRequester.RetrieveSpec retrieveSpec, Message requestMessage) { RSocketInteractionModel interactionModel = evaluateInteractionModel(requestMessage); Assert.notNull(interactionModel, - () -> "The 'interactionModelExpression' [" + this.interactionModelExpression + "] must not evaluate to null"); + () -> "The 'interactionModelExpression' [" + this.interactionModelExpression + + "] must not evaluate to null"); Object expectedResponseType = null; if (!RSocketInteractionModel.fireAndForget.equals(interactionModel)) { diff --git a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java index a3a589dbfd0..de6383623c0 100644 --- a/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java +++ b/spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.rsocket.dsl; +import java.time.Duration; import java.util.function.Function; import org.junit.jupiter.api.Test; @@ -80,9 +81,13 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client return IntegrationFlows .from(Function.class) .handle(RSockets.outboundGateway("/uppercase") - .interactionModel((message) -> RSocketInteractionModel.requestChannel) - .expectedResponseType("T(java.lang.String)") - .clientRSocketConnector(clientRSocketConnector)) + .interactionModel((message) -> RSocketInteractionModel.requestChannel) + .expectedResponseType("T(java.lang.String)") + .clientRSocketConnector(clientRSocketConnector), + e -> e.customizeMonoReply( + (message, mono) -> + mono.timeout(Duration.ofMillis(100)) + .retry())) .get(); } diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java index 525ad8565f6..2c34c7fa2e8 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java @@ -217,8 +217,7 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity httpR return createReplyFromResponse(expectedResponseType, responseMono); } else { - responseMono.subscribe(v -> { }, ex -> sendErrorMessage(requestMessage, ex)); - return null; + return responseMono.then(); } } diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java index 72ac27815d9..026f3242319 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -383,7 +383,12 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() { .httpMethod(HttpMethod.GET) .replyPayloadToFlux(true) .expectedResponseType(String.class), - e -> e.id("webFluxWithReplyPayloadToFlux")); + e -> e + .id("webFluxWithReplyPayloadToFlux") + .customizeMonoReply( + (message, mono) -> + mono.timeout(Duration.ofMillis(100)) + .retry())); } @Bean diff --git a/src/reference/asciidoc/handler-advice.adoc b/src/reference/asciidoc/handler-advice.adoc index 431ac278ca9..14c51cb78bf 100644 --- a/src/reference/asciidoc/handler-advice.adoc +++ b/src/reference/asciidoc/handler-advice.adoc @@ -56,6 +56,7 @@ In addition to providing the general mechanism to apply AOP advice classes, Spri * `ExpressionEvaluatingRequestHandlerAdvice` (described in <>) * `RateLimiterRequestHandlerAdvice` (described in <>) * `CacheRequestHandlerAdvice` (described in <>) +* `ReactiveRequestHandlerAdvice` (described in <>) [[retry-advice]] ===== Retry Advice @@ -514,6 +515,7 @@ This configuration functionality is similar to Spring Framework's `@CacheConfig` If a `CacheManager` is not provided, a single bean is resolved by default from the `BeanFactory` in the `CacheAspectSupport`. The following example configures two advices with different set of caching operations: + ==== [source, java] ---- @@ -549,6 +551,26 @@ public Message service(Message message) { ---- ==== +[[reactive-advice]] +==== Reactive Advice + +Starting with version 5.3, a `ReactiveRequestHandlerAdvice` can be used for request message handlers producing a `Mono` replies. +A `BiFunction, Mono, Publisher>` has to be provided for this advice and it is called from the `Mono.transform()` operator on a reply produced by the intercepted `handleRequestMessage()` method implementation. +Typically such a `Mono` customization is necessary when we would like to control network fluctuations via `timeout()`, `retry()` and similar support operators. +For example when we can an HTTP request over WebFlux client, we could use below configuration to not wait for response more than 5 seconds: + +==== +[source, java] +---- +.handle(WebFlux.outboundGateway("https://somehost/"), + e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5)))); +---- +==== + +The `message` argument is the request message for the message handler and can be used to determine request-scope attributes. +The `mono` argument is the result of this message handler's `handleRequestMessage()` method implementation. +A nested `Mono.transform()` can also be called from this function to apply, for example, a https://spring.io/projects/spring-cloud-circuitbreaker[Reactive Circuit Breaker]. + [[custom-advice]] ==== Custom Advice Classes @@ -678,7 +700,7 @@ The following example shows `` in use: - + ---- diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index e9b02328a62..66f5ddbc3f5 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -34,6 +34,11 @@ A new `IntegrationFlowExtension` API has been introduced to allow extension of t This also can be used to introduce customizers for any out-of-the-box `IntegrationComponentSpec` extensions. See <<./dsl.adoc#java-dsl-extensions,DSL Extensions>> for more information. +[[x5.3-reactive-request-handler-advice]] +==== ReactiveRequestHandlerAdvice + +A `ReactiveRequestHandlerAdvice` is provided to customize `Mono` replies from message handlers. +See <<./handler-advice.adoc#reactive-advice,Reactive Advice>> for more information. [[x5.3-mongodb-reactive-channel-adapters]] ==== MongoDB Reactive Channel Adapters