Skip to content

Commit 45a91b2

Browse files
authored
Add Reactive TX Manager support (#3201)
* Add Reactive TX Manager support * Change `TransactionInterceptorBuilder` and `TransactionHandleMessageAdvice` to reply on a generic `TransactionManager`, so we can configure reactive one as well * Change a `<transactional>` XML element to support a generic `TransactionManager` reference, so we can configure reactive one as well * Support `adviceChain` configuration for the `ReactiveMessageHandler` in the `ConsumerEndpointFactoryBean` * Attemp to apply reactive transaction for the Reactive MongoDB channel adapter * * Revert `MongoDbTests` change - to support transactions we need the latest MongoDb server with replica enabled * Document reactive transactions * * Address PR reviews
1 parent 294cfc1 commit 45a91b2

File tree

8 files changed

+69
-47
lines changed

8 files changed

+69
-47
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,12 @@ public void afterPropertiesSet() {
200200
}
201201

202202
if (!(this.handler instanceof ReactiveMessageHandlerAdapter)) {
203-
adviceChain();
203+
this.handler = adviceChain(this.handler);
204204
}
205205
else if (!CollectionUtils.isEmpty(this.adviceChain)) {
206-
LOGGER.warn("the advice chain cannot be applied to a 'ReactiveMessageHandler'");
206+
this.handler =
207+
new ReactiveMessageHandlerAdapter(
208+
adviceChain(((ReactiveMessageHandlerAdapter) this.handler).getDelegate()));
207209
}
208210
if (this.channelResolver == null) {
209211
this.channelResolver = ChannelResolverUtils.getChannelResolver(this.beanFactory);
@@ -234,7 +236,9 @@ private void populateComponentNameIfAny() {
234236
}
235237
}
236238

237-
private void adviceChain() {
239+
@SuppressWarnings("unchecked")
240+
private <H> H adviceChain(H handler) {
241+
H theHandler = handler;
238242
if (!CollectionUtils.isEmpty(this.adviceChain)) {
239243
/*
240244
* ARPMHs advise the handleRequestMessage method internally and already have the advice chain injected.
@@ -243,24 +247,25 @@ private void adviceChain() {
243247
* If the handler is already advised,
244248
* add the configured advices to its chain, otherwise create a proxy.
245249
*/
246-
Class<?> targetClass = AopUtils.getTargetClass(this.handler);
250+
Class<?> targetClass = AopUtils.getTargetClass(theHandler);
247251
boolean replyMessageHandler = AbstractReplyProducingMessageHandler.class.isAssignableFrom(targetClass);
248252

249253
for (Advice advice : this.adviceChain) {
250254
if (!replyMessageHandler || advice instanceof HandleMessageAdvice) {
251255
NameMatchMethodPointcutAdvisor handlerAdvice = new NameMatchMethodPointcutAdvisor(advice);
252256
handlerAdvice.addMethodName("handleMessage");
253-
if (this.handler instanceof Advised) {
254-
((Advised) this.handler).addAdvisor(handlerAdvice);
257+
if (theHandler instanceof Advised) {
258+
((Advised) theHandler).addAdvisor(handlerAdvice);
255259
}
256260
else {
257-
ProxyFactory proxyFactory = new ProxyFactory(this.handler);
261+
ProxyFactory proxyFactory = new ProxyFactory(theHandler);
258262
proxyFactory.addAdvisor(handlerAdvice);
259-
this.handler = (MessageHandler) proxyFactory.getProxy(this.beanClassLoader);
263+
theHandler = (H) proxyFactory.getProxy(this.beanClassLoader);
260264
}
261265
}
262266
}
263267
}
268+
return theHandler;
264269
}
265270

266271
@Override

spring-integration-core/src/main/java/org/springframework/integration/dsl/ConsumerEndpointSpec.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,7 @@
3131
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
3232
import org.springframework.messaging.MessageHandler;
3333
import org.springframework.scheduling.TaskScheduler;
34-
import org.springframework.transaction.PlatformTransactionManager;
34+
import org.springframework.transaction.TransactionManager;
3535
import org.springframework.transaction.interceptor.TransactionInterceptor;
3636
import org.springframework.util.Assert;
3737

@@ -112,10 +112,10 @@ public S advice(Advice... advice) {
112112
* {@code PlatformTransactionManager} and default
113113
* {@link org.springframework.transaction.interceptor.DefaultTransactionAttribute}
114114
* for the {@link MessageHandler}.
115-
* @param transactionManager the {@link PlatformTransactionManager} to use.
115+
* @param transactionManager the {@link TransactionManager} to use.
116116
* @return the spec.
117117
*/
118-
public S transactional(PlatformTransactionManager transactionManager) {
118+
public S transactional(TransactionManager transactionManager) {
119119
return transactional(transactionManager, false);
120120
}
121121

@@ -124,14 +124,14 @@ public S transactional(PlatformTransactionManager transactionManager) {
124124
* {@code PlatformTransactionManager} and default
125125
* {@link org.springframework.transaction.interceptor.DefaultTransactionAttribute}
126126
* for the {@link MessageHandler}.
127-
* @param transactionManager the {@link PlatformTransactionManager} to use.
127+
* @param transactionManager the {@link TransactionManager} to use.
128128
* @param handleMessageAdvice the flag to indicate the target {@link Advice} type:
129129
* {@code false} - regular {@link TransactionInterceptor}; {@code true} -
130130
* {@link org.springframework.integration.transaction.TransactionHandleMessageAdvice}
131131
* extension.
132132
* @return the spec.
133133
*/
134-
public S transactional(PlatformTransactionManager transactionManager, boolean handleMessageAdvice) {
134+
public S transactional(TransactionManager transactionManager, boolean handleMessageAdvice) {
135135
return transactional(new TransactionInterceptorBuilder(handleMessageAdvice)
136136
.transactionManager(transactionManager)
137137
.build());

spring-integration-core/src/main/java/org/springframework/integration/dsl/PollerSpec.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,7 +31,7 @@
3131
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
3232
import org.springframework.messaging.MessageChannel;
3333
import org.springframework.scheduling.Trigger;
34-
import org.springframework.transaction.PlatformTransactionManager;
34+
import org.springframework.transaction.TransactionManager;
3535
import org.springframework.transaction.interceptor.TransactionInterceptor;
3636
import org.springframework.util.ErrorHandler;
3737

@@ -147,10 +147,10 @@ public PollerSpec advice(Advice... advice) {
147147
* provided {@code PlatformTransactionManager} and default
148148
* {@link org.springframework.transaction.interceptor.DefaultTransactionAttribute}
149149
* for the {@code pollingTask}.
150-
* @param transactionManager the {@link PlatformTransactionManager} to use.
150+
* @param transactionManager the {@link TransactionManager} to use.
151151
* @return the spec.
152152
*/
153-
public PollerSpec transactional(PlatformTransactionManager transactionManager) {
153+
public PollerSpec transactional(TransactionManager transactionManager) {
154154
return transactional(new TransactionInterceptorBuilder()
155155
.transactionManager(transactionManager)
156156
.build());

spring-integration-core/src/main/java/org/springframework/integration/transaction/TransactionHandleMessageAdvice.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,7 @@
1919
import java.util.Properties;
2020

2121
import org.springframework.integration.handler.advice.HandleMessageAdvice;
22-
import org.springframework.transaction.PlatformTransactionManager;
22+
import org.springframework.transaction.TransactionManager;
2323
import org.springframework.transaction.interceptor.TransactionAttributeSource;
2424
import org.springframework.transaction.interceptor.TransactionInterceptor;
2525

@@ -46,12 +46,16 @@ public class TransactionHandleMessageAdvice extends TransactionInterceptor imple
4646
public TransactionHandleMessageAdvice() {
4747
}
4848

49-
public TransactionHandleMessageAdvice(PlatformTransactionManager ptm, Properties attributes) {
50-
super(ptm, attributes);
49+
public TransactionHandleMessageAdvice(TransactionManager transactionManager, Properties transactionAttributes) {
50+
setTransactionManager(transactionManager);
51+
setTransactionAttributes(transactionAttributes);
5152
}
5253

53-
public TransactionHandleMessageAdvice(PlatformTransactionManager ptm, TransactionAttributeSource tas) {
54-
super(ptm, tas);
54+
public TransactionHandleMessageAdvice(TransactionManager transactionManager,
55+
TransactionAttributeSource transactionAttributeSource) {
56+
57+
setTransactionManager(transactionManager);
58+
setTransactionAttributeSource(transactionAttributeSource);
5559
}
5660

5761
}

spring-integration-core/src/main/java/org/springframework/integration/transaction/TransactionInterceptorBuilder.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
1616

1717
package org.springframework.integration.transaction;
1818

19-
import org.springframework.transaction.PlatformTransactionManager;
19+
import org.springframework.transaction.TransactionManager;
2020
import org.springframework.transaction.annotation.Isolation;
2121
import org.springframework.transaction.annotation.Propagation;
2222
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
@@ -29,8 +29,8 @@
2929
* Provides a fluent API to build a transaction interceptor. See
3030
* {@link TransactionAttribute} for property meanings; if a {@link TransactionAttribute}
3131
* is provided, the individual properties are ignored. If a
32-
* {@link PlatformTransactionManager} is not provided, a single instance of
33-
* {@link PlatformTransactionManager} will be discovered at runtime; if you have more
32+
* {@link TransactionManager} is not provided, a single instance of
33+
* {@link TransactionManager} will be discovered at runtime; if you have more
3434
* than one transaction manager, you must inject the one you want to use here.
3535
* <p>
3636
* When the {@code handleMessageAdvice} option is in use, this builder produces
@@ -91,7 +91,7 @@ public final TransactionInterceptorBuilder transactionAttribute(TransactionAttri
9191
return this;
9292
}
9393

94-
public TransactionInterceptorBuilder transactionManager(PlatformTransactionManager transactionManager) {
94+
public TransactionInterceptorBuilder transactionManager(TransactionManager transactionManager) {
9595
this.transactionInterceptor.setTransactionManager(transactionManager);
9696
return this;
9797
}

spring-integration-core/src/main/resources/org/springframework/integration/config/spring-integration.xsd

+18-18
Original file line numberDiff line numberDiff line change
@@ -4155,56 +4155,56 @@
41554155
<xsd:complexType name="transactionalType">
41564156
<xsd:attribute name="transaction-manager" type="xsd:string" default="transactionManager">
41574157
<xsd:annotation>
4158-
<xsd:documentation><![CDATA[
4159-
The bean name of the PlatformTransactionManager to use.
4160-
]]></xsd:documentation>
4158+
<xsd:documentation>
4159+
The bean name of the PlatformTransactionManager to use.
4160+
</xsd:documentation>
41614161
<xsd:appinfo>
41624162
<tool:annotation kind="ref">
4163-
<tool:expected-type type="org.springframework.transaction.PlatformTransactionManager"/>
4163+
<tool:expected-type type="org.springframework.transaction.TransactionManager"/>
41644164
</tool:annotation>
41654165
</xsd:appinfo>
41664166
</xsd:annotation>
41674167
</xsd:attribute>
41684168
<xsd:attribute name="propagation" default="REQUIRED">
41694169
<xsd:annotation>
4170-
<xsd:documentation source="java:org.springframework.transaction.annotation.Propagation"><![CDATA[
4171-
The transaction propagation behavior.
4172-
]]></xsd:documentation>
4170+
<xsd:documentation source="java:org.springframework.transaction.annotation.Propagation">
4171+
The transaction propagation behavior.
4172+
</xsd:documentation>
41734173
</xsd:annotation>
41744174
<xsd:simpleType>
41754175
<xsd:union memberTypes="propagationEnumeration xsd:string"/>
41764176
</xsd:simpleType>
41774177
</xsd:attribute>
41784178
<xsd:attribute name="isolation" default="DEFAULT">
41794179
<xsd:annotation>
4180-
<xsd:documentation source="java:org.springframework.transaction.annotation.Isolation"><![CDATA[
4181-
The transaction isolation level.
4182-
]]></xsd:documentation>
4180+
<xsd:documentation source="java:org.springframework.transaction.annotation.Isolation">
4181+
The transaction isolation level.
4182+
</xsd:documentation>
41834183
</xsd:annotation>
41844184
<xsd:simpleType>
41854185
<xsd:union memberTypes="isolationEnumeration xsd:string"/>
41864186
</xsd:simpleType>
41874187
</xsd:attribute>
41884188
<xsd:attribute name="timeout" type="xsd:string" default="-1">
41894189
<xsd:annotation>
4190-
<xsd:documentation><![CDATA[
4191-
The transaction timeout value (in seconds).
4192-
]]></xsd:documentation>
4190+
<xsd:documentation>
4191+
The transaction timeout value (in seconds).
4192+
</xsd:documentation>
41934193
</xsd:annotation>
41944194
</xsd:attribute>
41954195
<xsd:attribute name="read-only" type="xsd:string" default="false">
41964196
<xsd:annotation>
4197-
<xsd:documentation><![CDATA[
4198-
Is this transaction read-only?
4199-
]]></xsd:documentation>
4197+
<xsd:documentation>
4198+
Is this transaction read-only?
4199+
</xsd:documentation>
42004200
</xsd:annotation>
42014201
</xsd:attribute>
42024202
<xsd:attribute name="synchronization-factory" type="xsd:string">
42034203
<xsd:annotation>
4204-
<xsd:documentation><![CDATA[
4204+
<xsd:documentation>
42054205
Reference to an instance of org.springframework.integration.transaction.TransactionSynchronizationFactory
42064206
which will return an instance of org.springframework.transaction.support.TransactionSynchronization via its create(..) method.
4207-
]]></xsd:documentation>
4207+
</xsd:documentation>
42084208
<xsd:appinfo>
42094209
<tool:annotation kind="ref">
42104210
<tool:expected-type

src/reference/asciidoc/transactions.adoc

+9
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,12 @@ The following example shows how to do so:
255255
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
256256
----
257257
====
258+
259+
[[reactive-transactions]]
260+
=== Reactive Transactions
261+
262+
Starting with version 5.3, a `ReactiveTransactionManager` can also be used together with a `TransactionInterceptor` advice for endpoints returning a reactive type.
263+
This includes `MessageSource` and `ReactiveMessageHandler` implementations (e.g. `ReactiveMongoDbMessageSource`) which produce a message with a `Flux` or `Mono` payload.
264+
All other reply producing message handler implementations can rely on a `ReactiveTransactionManager` when their reply payload is also some reactive type.
265+
266+

src/reference/asciidoc/whats-new.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ A new `publishSubscribeChannel()` operator, based on the `BroadcastCapableChanne
5757
This fluent API has its advantage when we configure sub-flows as pub-sub subscribers for broker-backed channels like `SubscribableJmsChannel`, `SubscribableRedisChannel` etc.
5858
See <<./dsl.adoc#java-dsl-subflows,Sub-flows support>> for more information.
5959

60+
Transactional support in Spring Integration now also includes options to configure a `ReactiveTransactionManager` if a `MessageSource` or `MessageHandler` implementation produces a reactive type for payload to send.
61+
See `TransactionInterceptorBuilder` for more information.
62+
See also <<./transactions.adoc#reactive-transactions,Reactive Transactions>>.
63+
6064
[[x5.3-amqp]]
6165
=== AMQP Changes
6266

0 commit comments

Comments
 (0)