Skip to content

Commit c84d264

Browse files
authored
GH-3183: Add ReactiveRequestHandlerAdvice (#3197)
* GH-3183: Add ReactiveRequestHandlerAdvice Fixes #3183 * Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>` logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply * Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription - it happens downstream anyway during reply producing with a proper error handling, too * Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails * Add `ConsumerEndpointSpec.customizeMonoReply()` for convenience * Document `ReactiveRequestHandlerAdvice` feature * * Fix language in docs
1 parent 8a38a5e commit c84d264

File tree

10 files changed

+169
-20
lines changed

10 files changed

+169
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java

+16
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,27 @@
1919
import java.util.Arrays;
2020
import java.util.LinkedList;
2121
import java.util.List;
22+
import java.util.function.BiFunction;
2223

2324
import org.aopalliance.aop.Advice;
25+
import org.reactivestreams.Publisher;
2426

2527
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
2628
import org.springframework.integration.handler.AbstractMessageHandler;
2729
import org.springframework.integration.handler.AbstractMessageProducingHandler;
2830
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
31+
import org.springframework.integration.handler.advice.ReactiveRequestHandlerAdvice;
2932
import org.springframework.integration.router.AbstractMessageRouter;
3033
import org.springframework.integration.scheduling.PollerMetadata;
3134
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
35+
import org.springframework.messaging.Message;
3236
import org.springframework.messaging.MessageHandler;
3337
import org.springframework.scheduling.TaskScheduler;
3438
import org.springframework.transaction.TransactionManager;
3539
import org.springframework.transaction.interceptor.TransactionInterceptor;
3640
import org.springframework.util.Assert;
3741

42+
import reactor.core.publisher.Mono;
3843
import reactor.util.function.Tuple2;
3944

4045
/**
@@ -176,6 +181,17 @@ public S transactional(boolean handleMessageAdvice) {
176181
return transactional(transactionInterceptor);
177182
}
178183

184+
/**
185+
* Specify a {@link BiFunction} for customizing {@link Mono} replies via {@link ReactiveRequestHandlerAdvice}.
186+
* @param replyCustomizer the {@link BiFunction} to propagate into {@link ReactiveRequestHandlerAdvice}.
187+
* @return the spec.
188+
* @since 5.3
189+
* @see ReactiveRequestHandlerAdvice
190+
*/
191+
public S customizeMonoReply(BiFunction<Message<?>, Mono<?>, Publisher<?>> replyCustomizer) {
192+
return advice(new ReactiveRequestHandlerAdvice(replyCustomizer));
193+
}
194+
179195
/**
180196
* @param requiresReply the requiresReply.
181197
* @return the endpoint spec.

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.reactivestreams.Publisher;
2929

30+
import org.springframework.beans.factory.BeanFactory;
3031
import org.springframework.core.ReactiveAdapter;
3132
import org.springframework.core.ReactiveAdapterRegistry;
3233
import org.springframework.integration.IntegrationMessageHeaderAccessor;
@@ -202,8 +203,9 @@ protected void onInit() {
202203
super.onInit();
203204
Assert.state(!(this.outputChannelName != null && this.outputChannel != null), //NOSONAR (inconsistent sync)
204205
"'outputChannelName' and 'outputChannel' are mutually exclusive.");
205-
if (getBeanFactory() != null) {
206-
this.messagingTemplate.setBeanFactory(getBeanFactory());
206+
BeanFactory beanFactory = getBeanFactory();
207+
if (beanFactory != null) {
208+
this.messagingTemplate.setBeanFactory(beanFactory);
207209
}
208210
this.messagingTemplate.setDestinationResolver(getChannelResolver());
209211
}
@@ -222,11 +224,11 @@ public MessageChannel getOutputChannel() {
222224
protected void sendOutputs(Object result, Message<?> requestMessage) {
223225
if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
224226
for (Object o : (Iterable<?>) result) {
225-
this.produceOutput(o, requestMessage);
227+
produceOutput(o, requestMessage);
226228
}
227229
}
228230
else if (result != null) {
229-
this.produceOutput(result, requestMessage);
231+
produceOutput(result, requestMessage);
230232
}
231233
}
232234

@@ -246,8 +248,7 @@ protected void produceOutput(Object replyArg, final Message<?> requestMessage) {
246248
if (getOutputChannel() == null) {
247249
Map<?, ?> routingSlipHeader = obtainRoutingSlipHeader(requestHeaders, reply);
248250
if (routingSlipHeader != null) {
249-
Assert.isTrue(routingSlipHeader.size() == 1,
250-
"The RoutingSlip header value must be a SingletonMap");
251+
Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
251252
Object key = routingSlipHeader.keySet().iterator().next();
252253
Object value = routingSlipHeader.values().iterator().next();
253254
Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
@@ -298,7 +299,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder<?>) {
298299
}
299300

300301
private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHeaders, Object reply,
301-
Object replyChannel) {
302+
@Nullable Object replyChannel) {
302303

303304
if (this.async && (reply instanceof ListenableFuture<?> || reply instanceof Publisher<?>)) {
304305
MessageChannel messageChannel = getOutputChannel();
@@ -341,7 +342,7 @@ else if (reply instanceof AbstractIntegrationMessageBuilder) {
341342
return builder;
342343
}
343344

344-
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, Object replyChannel) {
345+
private void asyncNonReactiveReply(Message<?> requestMessage, Object reply, @Nullable Object replyChannel) {
345346
ListenableFuture<?> future;
346347
if (reply instanceof ListenableFuture<?>) {
347348
future = (ListenableFuture<?>) reply;
@@ -508,9 +509,10 @@ private final class ReplyFutureCallback implements ListenableFutureCallback<Obje
508509

509510
private final Message<?> requestMessage;
510511

512+
@Nullable
511513
private final Object replyChannel;
512514

513-
ReplyFutureCallback(Message<?> requestMessage, Object replyChannel) {
515+
ReplyFutureCallback(Message<?> requestMessage, @Nullable Object replyChannel) {
514516
this.requestMessage = requestMessage;
515517
this.replyChannel = replyChannel;
516518
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.function.BiFunction;
21+
import java.util.function.Function;
22+
23+
import org.aopalliance.intercept.MethodInterceptor;
24+
import org.aopalliance.intercept.MethodInvocation;
25+
import org.apache.commons.logging.Log;
26+
import org.apache.commons.logging.LogFactory;
27+
import org.reactivestreams.Publisher;
28+
29+
import org.springframework.messaging.Message;
30+
import org.springframework.util.Assert;
31+
32+
import reactor.core.publisher.Mono;
33+
34+
/**
35+
* A {@link MethodInterceptor} for message handlers producing a {@link Mono} as a payload for reply.
36+
* The returned {@link Mono} is customized via {@link Mono#transform(Function)} operator
37+
* calling provided {@code replyCustomizer} {@link BiFunction} with request message as a context.
38+
*
39+
* A customization assumes to use supporting reactive operators like {@link Mono#timeout},
40+
* {@link Mono#retry}, {@link Mono#tag} etc.
41+
* A {@link Mono#transform(Function)} also can be used for further customization like reactive circuit breaker.
42+
*
43+
* @author Artem Bilan
44+
*
45+
* @since 5.3
46+
*/
47+
public class ReactiveRequestHandlerAdvice implements MethodInterceptor {
48+
49+
private static final Log LOGGER = LogFactory.getLog(ReactiveRequestHandlerAdvice.class);
50+
51+
private final BiFunction<Message<?>, Mono<?>, Publisher<?>> replyCustomizer;
52+
53+
/**
54+
* Instantiate advice based on a provided {@link BiFunction} customizer.
55+
* @param replyCustomizer the {@link BiFunction} to customize produced {@link Mono}.
56+
*/
57+
public ReactiveRequestHandlerAdvice(BiFunction<Message<?>, Mono<?>, Publisher<?>> replyCustomizer) {
58+
Assert.notNull(replyCustomizer, "'replyCustomizer' must not be null");
59+
this.replyCustomizer = replyCustomizer;
60+
}
61+
62+
@Override
63+
public final Object invoke(MethodInvocation invocation) throws Throwable {
64+
Object result = invocation.proceed();
65+
66+
Method method = invocation.getMethod();
67+
Object invocationThis = invocation.getThis();
68+
Object[] arguments = invocation.getArguments();
69+
boolean isReactiveMethod =
70+
method.getName().equals("handleRequestMessage") &&
71+
(arguments.length == 1 && arguments[0] instanceof Message) &&
72+
result instanceof Mono<?>;
73+
if (!isReactiveMethod) {
74+
if (LOGGER.isWarnEnabled()) {
75+
String clazzName =
76+
invocationThis == null
77+
? method.getDeclaringClass().getName()
78+
: invocationThis.getClass().getName();
79+
LOGGER.warn("This advice " + getClass().getName() +
80+
" can only be used for MessageHandlers with reactive reply; an attempt to advise method '"
81+
+ method.getName() + "' in '" + clazzName + "' is ignored.");
82+
}
83+
return result;
84+
}
85+
86+
Mono<?> replyMono = (Mono<?>) result;
87+
88+
Message<?> requestMessage = (Message<?>) arguments[0];
89+
90+
return replyMono
91+
.transform(mono -> this.replyCustomizer.apply(requestMessage, mono));
92+
}
93+
94+
}

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/outbound/RSocketOutboundGateway.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ private RSocketRequester.RetrieveSpec prepareRequestSpecForPublisher(RSocketRequ
280280
private Mono<?> performRetrieve(RSocketRequester.RetrieveSpec retrieveSpec, Message<?> requestMessage) {
281281
RSocketInteractionModel interactionModel = evaluateInteractionModel(requestMessage);
282282
Assert.notNull(interactionModel,
283-
() -> "The 'interactionModelExpression' [" + this.interactionModelExpression + "] must not evaluate to null");
283+
() -> "The 'interactionModelExpression' [" + this.interactionModelExpression +
284+
"] must not evaluate to null");
284285

285286
Object expectedResponseType = null;
286287
if (!RSocketInteractionModel.fireAndForget.equals(interactionModel)) {

spring-integration-rsocket/src/test/java/org/springframework/integration/rsocket/dsl/RSocketDslTests.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.rsocket.dsl;
1818

19+
import java.time.Duration;
1920
import java.util.function.Function;
2021

2122
import org.junit.jupiter.api.Test;
@@ -80,9 +81,13 @@ public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector client
8081
return IntegrationFlows
8182
.from(Function.class)
8283
.handle(RSockets.outboundGateway("/uppercase")
83-
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
84-
.expectedResponseType("T(java.lang.String)")
85-
.clientRSocketConnector(clientRSocketConnector))
84+
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
85+
.expectedResponseType("T(java.lang.String)")
86+
.clientRSocketConnector(clientRSocketConnector),
87+
e -> e.customizeMonoReply(
88+
(message, mono) ->
89+
mono.timeout(Duration.ofMillis(100))
90+
.retry()))
8691
.get();
8792
}
8893

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,7 @@ protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpR
217217
return createReplyFromResponse(expectedResponseType, responseMono);
218218
}
219219
else {
220-
responseMono.subscribe(v -> { }, ex -> sendErrorMessage(requestMessage, ex));
221-
return null;
220+
return responseMono.then();
222221
}
223222
}
224223

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -383,7 +383,12 @@ public IntegrationFlow webFluxFlowWithReplyPayloadToFlux() {
383383
.httpMethod(HttpMethod.GET)
384384
.replyPayloadToFlux(true)
385385
.expectedResponseType(String.class),
386-
e -> e.id("webFluxWithReplyPayloadToFlux"));
386+
e -> e
387+
.id("webFluxWithReplyPayloadToFlux")
388+
.customizeMonoReply(
389+
(message, mono) ->
390+
mono.timeout(Duration.ofMillis(100))
391+
.retry()));
387392
}
388393

389394
@Bean

src/reference/asciidoc/handler-advice.adoc

+23-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ In addition to providing the general mechanism to apply AOP advice classes, Spri
5656
* `ExpressionEvaluatingRequestHandlerAdvice` (described in <<expression-advice>>)
5757
* `RateLimiterRequestHandlerAdvice` (described in <<rate-limiter-advice>>)
5858
* `CacheRequestHandlerAdvice` (described in <<cache-advice>>)
59+
* `ReactiveRequestHandlerAdvice` (described in <<reactive-advice>>)
5960

6061
[[retry-advice]]
6162
===== Retry Advice
@@ -514,6 +515,7 @@ This configuration functionality is similar to Spring Framework's `@CacheConfig`
514515
If a `CacheManager` is not provided, a single bean is resolved by default from the `BeanFactory` in the `CacheAspectSupport`.
515516

516517
The following example configures two advices with different set of caching operations:
518+
517519
====
518520
[source, java]
519521
----
@@ -549,6 +551,26 @@ public Message<?> service(Message<?> message) {
549551
----
550552
====
551553

554+
[[reactive-advice]]
555+
==== Reactive Advice
556+
557+
Starting with version 5.3, a `ReactiveRequestHandlerAdvice` can be used for request message handlers producing a `Mono` replies.
558+
A `BiFunction<Message<?>, Mono<?>, Publisher<?>>` has to be provided for this advice and it is called from the `Mono.transform()` operator on a reply produced by the intercepted `handleRequestMessage()` method implementation.
559+
Typically such a `Mono` customization is necessary when we would like to control network fluctuations via `timeout()`, `retry()` and similar support operators.
560+
For example when we can an HTTP request over WebFlux client, we could use below configuration to not wait for response more than 5 seconds:
561+
562+
====
563+
[source, java]
564+
----
565+
.handle(WebFlux.outboundGateway("https://somehost/"),
566+
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
567+
----
568+
====
569+
570+
The `message` argument is the request message for the message handler and can be used to determine request-scope attributes.
571+
The `mono` argument is the result of this message handler's `handleRequestMessage()` method implementation.
572+
A nested `Mono.transform()` can also be called from this function to apply, for example, a https://spring.io/projects/spring-cloud-circuitbreaker[Reactive Circuit Breaker].
573+
552574
[[custom-advice]]
553575
==== Custom Advice Classes
554576

@@ -678,7 +700,7 @@ The following example shows `<transactional>` in use:
678700
</int-rmi:outbound-gateway>
679701
680702
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
681-
<constructor-arg value="org.springframework.transaction.PlatformTransactionManager"/>
703+
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
682704
</bean>
683705
----
684706

src/reference/asciidoc/whats-new.adoc

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ A new `IntegrationFlowExtension` API has been introduced to allow extension of t
3434
This also can be used to introduce customizers for any out-of-the-box `IntegrationComponentSpec` extensions.
3535
See <<./dsl.adoc#java-dsl-extensions,DSL Extensions>> for more information.
3636

37+
[[x5.3-reactive-request-handler-advice]]
38+
==== ReactiveRequestHandlerAdvice
39+
40+
A `ReactiveRequestHandlerAdvice` is provided to customize `Mono` replies from message handlers.
41+
See <<./handler-advice.adoc#reactive-advice,Reactive Advice>> for more information.
3742

3843
[[x5.3-mongodb-reactive-channel-adapters]]
3944
==== MongoDB Reactive Channel Adapters

0 commit comments

Comments
 (0)