Skip to content

Commit a55f71f

Browse files
committed
GH-3183: Add ReactiveRequestHandlerAdvice
Fixes #3183 * Introduce a `BaseReplyProducingMessageHandler` with common advices logic for target message handler * Implement `BaseReplyProducingMessageHandler` from the `AbstractReplyProducingMessageHandler` and newly introduced `AbstractReactiveReplyProducingMessageHandler` for message handlers with `Mono` replies * Introduce a `ReactiveRequestHandlerAdvice` with a `BiFunction<Message<?>, Mono<?>, Publisher<?>>` logic to apply a `Mono.transform()` operator for a returned from the handler `Mono` reply * Make `RSocketOutboundGateway` implementing an `AbstractReactiveReplyProducingMessageHandler` * Fix `WebFluxRequestExecutingMessageHandler` to return a `Mono.then()` instead of an explicit subscription - it happens downstream anyway during reply producing with a proper error handling, too * Fix `ConsumerEndpointSpec` & `ConsumerEndpointFactoryBean` to deal with an `AbstractReactiveReplyProducingMessageHandler` as well * Demonstrate `ReactiveRequestHandlerAdvice` in the `RSocketDslTests` - without `retry()` it fails
1 parent 87c8e47 commit a55f71f

File tree

9 files changed

+314
-85
lines changed

9 files changed

+314
-85
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.springframework.integration.endpoint.EventDrivenConsumer;
4242
import org.springframework.integration.endpoint.PollingConsumer;
4343
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
44+
import org.springframework.integration.handler.AbstractReactiveReplyProducingMessageHandler;
4445
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
4546
import org.springframework.integration.handler.ReactiveMessageHandlerAdapter;
4647
import org.springframework.integration.handler.advice.HandleMessageAdvice;
@@ -244,7 +245,9 @@ private void adviceChain() {
244245
* add the configured advices to its chain, otherwise create a proxy.
245246
*/
246247
Class<?> targetClass = AopUtils.getTargetClass(this.handler);
247-
boolean replyMessageHandler = AbstractReplyProducingMessageHandler.class.isAssignableFrom(targetClass);
248+
boolean replyMessageHandler =
249+
AbstractReplyProducingMessageHandler.class.isAssignableFrom(targetClass) ||
250+
AbstractReactiveReplyProducingMessageHandler.class.isAssignableFrom(targetClass);
248251

249252
for (Advice advice : this.adviceChain) {
250253
if (!replyMessageHandler || advice instanceof HandleMessageAdvice) {

spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import org.springframework.integration.config.ConsumerEndpointFactoryBean;
2626
import org.springframework.integration.handler.AbstractMessageHandler;
2727
import org.springframework.integration.handler.AbstractMessageProducingHandler;
28+
import org.springframework.integration.handler.AbstractReactiveReplyProducingMessageHandler;
2829
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
2930
import org.springframework.integration.router.AbstractMessageRouter;
3031
import org.springframework.integration.scheduling.PollerMetadata;
@@ -271,8 +272,13 @@ public S notPropagatedHeaders(String... headerPatterns) {
271272
@Override
272273
protected Tuple2<ConsumerEndpointFactoryBean, H> doGet() {
273274
this.endpointFactoryBean.setAdviceChain(this.adviceChain);
274-
if (this.handler instanceof AbstractReplyProducingMessageHandler && !this.adviceChain.isEmpty()) {
275-
((AbstractReplyProducingMessageHandler) this.handler).setAdviceChain(this.adviceChain);
275+
if (!this.adviceChain.isEmpty()) {
276+
if (this.handler instanceof AbstractReplyProducingMessageHandler) {
277+
((AbstractReplyProducingMessageHandler) this.handler).setAdviceChain(this.adviceChain);
278+
}
279+
else if (this.handler instanceof AbstractReactiveReplyProducingMessageHandler) {
280+
((AbstractReactiveReplyProducingMessageHandler) this.handler).setAdviceChain(this.adviceChain);
281+
}
276282
}
277283
this.endpointFactoryBean.setHandler(this.handler);
278284
return super.doGet();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package org.springframework.integration.handler;
2+
3+
import org.springframework.messaging.Message;
4+
5+
import reactor.core.publisher.Mono;
6+
7+
public abstract class AbstractReactiveReplyProducingMessageHandler
8+
extends BaseReplyProducingMessageHandler<AbstractReactiveReplyProducingMessageHandler.RequestHandler> {
9+
10+
protected AbstractReactiveReplyProducingMessageHandler() {
11+
setAsync(true);
12+
}
13+
14+
@Override
15+
protected AbstractReactiveReplyProducingMessageHandler.RequestHandler createdAdvisedRequestHandler() {
16+
return new AbstractReactiveReplyProducingMessageHandler.AdvisedRequestHandler();
17+
}
18+
19+
@Override
20+
protected final void handleMessageInternal(Message<?> message) {
21+
Object result;
22+
if (getAdvisedRequestHandler() == null) {
23+
result = handleRequestMessage(message);
24+
}
25+
else {
26+
result = doInvokeAdvisedRequestHandler(message);
27+
}
28+
if (result != null) {
29+
sendOutputs(result, message);
30+
}
31+
else if (logger.isDebugEnabled()) {
32+
logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
33+
}
34+
}
35+
36+
@Override
37+
protected Object doInvokeAdvisedRequestHandler(Message<?> message) {
38+
return getAdvisedRequestHandler().handleRequestMessage(message); // NOSONAR - cannot be null after adviceChain
39+
}
40+
41+
/**
42+
* Subclasses must implement this method to handle the request Message. The return
43+
* value may be a Mono<?> or null. The base class
44+
* will handle the final creation of a reply Message from any of those starting
45+
* points. If the return value is null, the Message flow will end here.
46+
* @param requestMessage The request message.
47+
* @return The result {@link Mono} of handling the message.
48+
*/
49+
protected abstract Mono<?> handleRequestMessage(Message<?> requestMessage);
50+
51+
/**
52+
* An implementation of this interface is used to wrap the
53+
* {@link AbstractReplyProducingMessageHandler#handleRequestMessage(Message)}
54+
* method. Also allows access to the underlying
55+
* {@link AbstractReplyProducingMessageHandler} to obtain properties.
56+
* @see #getAdvisedHandler()
57+
*/
58+
public interface RequestHandler {
59+
60+
Mono<?> handleRequestMessage(Message<?> requestMessage);
61+
62+
/**
63+
* Utility method, intended for use in message handler advice classes to get
64+
* information about the advised object. For example:
65+
* <p>
66+
* {@code ((AbstractReplyProducingMessageHandler.RequestHandler)
67+
* invocation.getThis()).getAdvisedHandler().getComponentName()}
68+
* @return the outer class instance.
69+
* @since 4.3.2
70+
*/
71+
AbstractReactiveReplyProducingMessageHandler getAdvisedHandler();
72+
73+
}
74+
75+
private class AdvisedRequestHandler implements RequestHandler {
76+
77+
AdvisedRequestHandler() {
78+
}
79+
80+
@Override
81+
public Mono<?> handleRequestMessage(Message<?> requestMessage) {
82+
return AbstractReactiveReplyProducingMessageHandler.this.handleRequestMessage(requestMessage);
83+
}
84+
85+
@Override
86+
public String toString() {
87+
return AbstractReactiveReplyProducingMessageHandler.this.toString();
88+
}
89+
90+
@Override
91+
public AbstractReactiveReplyProducingMessageHandler getAdvisedHandler() {
92+
return AbstractReactiveReplyProducingMessageHandler.this;
93+
}
94+
95+
}
96+
97+
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandler.java

+8-71
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,17 +40,11 @@
4040
* @author Artem Bilan
4141
* @author David Liu
4242
*/
43-
public abstract class AbstractReplyProducingMessageHandler extends AbstractMessageProducingHandler
44-
implements BeanClassLoaderAware {
45-
46-
private final List<Advice> adviceChain = new LinkedList<>();
47-
48-
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
43+
public abstract class AbstractReplyProducingMessageHandler
44+
extends BaseReplyProducingMessageHandler<AbstractReplyProducingMessageHandler.RequestHandler> {
4945

5046
private boolean requiresReply = false;
5147

52-
private volatile RequestHandler advisedRequestHandler;
53-
5448
/**
5549
* Flag whether a reply is required. If true an incoming message MUST result in a reply message being sent.
5650
* If false an incoming message MAY result in a reply message being sent. Default is false.
@@ -64,73 +58,15 @@ protected boolean getRequiresReply() {
6458
return this.requiresReply;
6559
}
6660

67-
/**
68-
* Configure a list of {@link Advice}s to proxy a {@link #handleRequestMessage(Message)} method.
69-
* @param adviceChain the list of {@link Advice}s to use.
70-
*/
71-
public void setAdviceChain(List<Advice> adviceChain) {
72-
Assert.notEmpty(adviceChain, "adviceChain cannot be empty");
73-
synchronized (this.adviceChain) {
74-
this.adviceChain.clear();
75-
this.adviceChain.addAll(adviceChain);
76-
if (isInitialized()) {
77-
initAdvisedRequestHandlerIfAny();
78-
}
79-
}
80-
}
81-
82-
protected boolean hasAdviceChain() {
83-
return this.adviceChain.size() > 0;
84-
}
85-
86-
@Override
87-
public void setBeanClassLoader(ClassLoader beanClassLoader) {
88-
this.beanClassLoader = beanClassLoader;
89-
}
90-
91-
protected ClassLoader getBeanClassLoader() {
92-
return this.beanClassLoader;
93-
}
94-
95-
@Override
96-
public IntegrationPatternType getIntegrationPatternType() {
97-
// Most out-of-the-box Spring Integration implementations provide an outbound gateway
98-
// for particular external protocol. If an implementation doesn't belong to this category,
99-
// it overrides this method to provide its own specific integration pattern type:
100-
// service-activator, splitter, aggregator, router etc.
101-
return IntegrationPatternType.outbound_gateway;
102-
}
103-
10461
@Override
105-
protected final void onInit() {
106-
super.onInit();
107-
initAdvisedRequestHandlerIfAny();
108-
doInit();
109-
}
110-
111-
private void initAdvisedRequestHandlerIfAny() {
112-
if (!this.adviceChain.isEmpty()) {
113-
ProxyFactory proxyFactory = new ProxyFactory(new AdvisedRequestHandler());
114-
boolean advised = false;
115-
for (Advice advice : this.adviceChain) {
116-
if (!(advice instanceof HandleMessageAdvice)) {
117-
proxyFactory.addAdvice(advice);
118-
advised = true;
119-
}
120-
}
121-
if (advised) {
122-
this.advisedRequestHandler = (RequestHandler) proxyFactory.getProxy(this.beanClassLoader);
123-
}
124-
}
125-
}
126-
127-
protected void doInit() {
62+
protected RequestHandler createdAdvisedRequestHandler() {
63+
return new AdvisedRequestHandler();
12864
}
12965

13066
@Override
13167
protected final void handleMessageInternal(Message<?> message) {
13268
Object result;
133-
if (this.advisedRequestHandler == null) {
69+
if (getAdvisedRequestHandler() == null) {
13470
result = handleRequestMessage(message);
13571
}
13672
else {
@@ -148,9 +84,10 @@ else if (!isAsync() && logger.isDebugEnabled()) {
14884
}
14985
}
15086

87+
@Override
15188
@Nullable
15289
protected Object doInvokeAdvisedRequestHandler(Message<?> message) {
153-
return this.advisedRequestHandler.handleRequestMessage(message);
90+
return getAdvisedRequestHandler().handleRequestMessage(message); // NOSONAR - cannot be null after adviceChain
15491
}
15592

15693
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package org.springframework.integration.handler;
2+
3+
import java.util.LinkedList;
4+
import java.util.List;
5+
6+
import org.aopalliance.aop.Advice;
7+
8+
import org.springframework.aop.framework.ProxyFactory;
9+
import org.springframework.beans.factory.BeanClassLoaderAware;
10+
import org.springframework.integration.IntegrationPatternType;
11+
import org.springframework.integration.handler.advice.HandleMessageAdvice;
12+
import org.springframework.lang.Nullable;
13+
import org.springframework.messaging.Message;
14+
import org.springframework.util.Assert;
15+
import org.springframework.util.ClassUtils;
16+
17+
public abstract class BaseReplyProducingMessageHandler<H> extends AbstractMessageProducingHandler
18+
implements BeanClassLoaderAware {
19+
20+
private final List<Advice> adviceChain = new LinkedList<>();
21+
22+
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
23+
24+
private volatile H advisedRequestHandler;
25+
26+
@Nullable
27+
protected H getAdvisedRequestHandler() {
28+
return this.advisedRequestHandler;
29+
}
30+
31+
/**
32+
* Configure a list of {@link Advice}s to proxy a {@code handleRequestMessage(Message)} method.
33+
* @param adviceChain the list of {@link Advice}s to use.
34+
*/
35+
public void setAdviceChain(List<Advice> adviceChain) {
36+
Assert.notEmpty(adviceChain, "adviceChain cannot be empty");
37+
synchronized (this.adviceChain) {
38+
this.adviceChain.clear();
39+
this.adviceChain.addAll(adviceChain);
40+
if (isInitialized()) {
41+
initAdvisedRequestHandlerIfAny();
42+
}
43+
}
44+
}
45+
46+
protected boolean hasAdviceChain() {
47+
return this.adviceChain.size() > 0;
48+
}
49+
50+
@Override
51+
public void setBeanClassLoader(ClassLoader beanClassLoader) {
52+
this.beanClassLoader = beanClassLoader;
53+
}
54+
55+
protected ClassLoader getBeanClassLoader() {
56+
return this.beanClassLoader;
57+
}
58+
59+
@Override
60+
public IntegrationPatternType getIntegrationPatternType() {
61+
// Most out-of-the-box Spring Integration implementations provide an outbound gateway
62+
// for particular external protocol. If an implementation doesn't belong to this category,
63+
// it overrides this method to provide its own specific integration pattern type:
64+
// service-activator, splitter, aggregator, router etc.
65+
return IntegrationPatternType.outbound_gateway;
66+
}
67+
68+
@Override
69+
protected final void onInit() {
70+
super.onInit();
71+
initAdvisedRequestHandlerIfAny();
72+
doInit();
73+
}
74+
75+
@SuppressWarnings("unchecked")
76+
private void initAdvisedRequestHandlerIfAny() {
77+
if (!this.adviceChain.isEmpty()) {
78+
ProxyFactory proxyFactory = new ProxyFactory(createdAdvisedRequestHandler());
79+
boolean advised = false;
80+
for (Advice advice : this.adviceChain) {
81+
if (!(advice instanceof HandleMessageAdvice)) {
82+
proxyFactory.addAdvice(advice);
83+
advised = true;
84+
}
85+
}
86+
if (advised) {
87+
this.advisedRequestHandler = (H) proxyFactory.getProxy(this.beanClassLoader);
88+
}
89+
}
90+
}
91+
92+
protected void doInit() {
93+
}
94+
95+
protected abstract H createdAdvisedRequestHandler();
96+
97+
@Nullable
98+
protected abstract Object doInvokeAdvisedRequestHandler(Message<?> message);
99+
100+
}

0 commit comments

Comments
 (0)