Skip to content

Commit 2b4eb61

Browse files
committed
Add support for Coroutines transactions
This commit adds Coroutines extensions for TransactionalOperator.transactional that accept suspending lambda or Kotlin Flow parameters. @transactional on suspending functions is not supported yet, gh-23575 has been created for that purpose. Closes gh-22915
1 parent aeb857c commit 2b4eb61

File tree

5 files changed

+161
-0
lines changed

5 files changed

+161
-0
lines changed

spring-tx/spring-tx.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,24 @@
11
description = "Spring Transaction"
22

3+
apply plugin: "kotlin"
4+
35
dependencies {
46
compile(project(":spring-beans"))
57
compile(project(":spring-core"))
68
optional(project(":spring-aop"))
79
optional(project(":spring-context")) // for JCA, @EnableTransactionManagement
10+
optional(project(":kotlin-coroutines"))
811
optional("javax.ejb:javax.ejb-api")
912
optional("javax.interceptor:javax.interceptor-api")
1013
optional("javax.resource:javax.resource-api")
1114
optional("javax.transaction:javax.transaction-api")
1215
optional("com.ibm.websphere:uow")
1316
optional("io.projectreactor:reactor-core")
1417
optional("io.vavr:vavr")
18+
optional("org.jetbrains.kotlin:kotlin-reflect")
19+
optional("org.jetbrains.kotlin:kotlin-stdlib")
20+
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
21+
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
1522
testCompile("org.aspectj:aspectjweaver")
1623
testCompile("org.codehaus.groovy:groovy")
1724
testCompile("org.eclipse.persistence:javax.persistence")

spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.concurrent.ConcurrentMap;
2222

2323
import io.vavr.control.Try;
24+
import kotlin.reflect.KFunction;
25+
import kotlin.reflect.jvm.ReflectJvmMapping;
2426
import org.apache.commons.logging.Log;
2527
import org.apache.commons.logging.LogFactory;
2628
import reactor.core.publisher.Flux;
@@ -30,6 +32,7 @@
3032
import org.springframework.beans.factory.BeanFactoryAware;
3133
import org.springframework.beans.factory.InitializingBean;
3234
import org.springframework.beans.factory.annotation.BeanFactoryAnnotationUtils;
35+
import org.springframework.core.KotlinDetector;
3336
import org.springframework.core.NamedThreadLocal;
3437
import org.springframework.core.ReactiveAdapter;
3538
import org.springframework.core.ReactiveAdapterRegistry;
@@ -41,6 +44,7 @@
4144
import org.springframework.transaction.TransactionManager;
4245
import org.springframework.transaction.TransactionStatus;
4346
import org.springframework.transaction.TransactionSystemException;
47+
import org.springframework.transaction.TransactionUsageException;
4448
import org.springframework.transaction.reactive.TransactionContextManager;
4549
import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
4650
import org.springframework.util.Assert;
@@ -322,6 +326,10 @@ protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targe
322326
final InvocationCallback invocation) throws Throwable {
323327

324328
if (this.reactiveAdapterRegistry != null) {
329+
if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
330+
throw new TransactionUsageException("Annotated transactions on suspending functions are not supported," +
331+
" use TransactionalOperator.transactional extensions instead.");
332+
}
325333
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
326334
if (adapter != null) {
327335
return new ReactiveTransactionSupport(adapter).invokeWithinTransaction(method, targetClass, invocation);
@@ -809,6 +817,17 @@ public static Object evaluateTryFailure(Object retVal, TransactionAttribute txAt
809817
}
810818
}
811819

820+
/**
821+
* Inner class to avoid a hard dependency on Kotlin at runtime.
822+
*/
823+
private static class KotlinDelegate {
824+
825+
static private boolean isSuspend(Method method) {
826+
KFunction<?> function = ReflectJvmMapping.getKotlinFunction(method);
827+
return function != null && function.isSuspend();
828+
}
829+
}
830+
812831

813832
/**
814833
* Delegate for Reactor-based management of transactional methods with a
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.springframework.transaction.reactive
2+
3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.ExperimentalCoroutinesApi
5+
import kotlinx.coroutines.flow.Flow
6+
import kotlinx.coroutines.reactive.asFlow
7+
import kotlinx.coroutines.reactive.awaitFirstOrNull
8+
import kotlinx.coroutines.reactor.asFlux
9+
import kotlinx.coroutines.reactor.mono
10+
11+
/**
12+
* Coroutines variant of [TransactionalOperator.transactional] with a [Flow] parameter.
13+
*
14+
* @author Sebastien Deleuze
15+
* @since 5.2
16+
*/
17+
@ExperimentalCoroutinesApi
18+
fun <T : Any> TransactionalOperator.transactional(flow: Flow<T>): Flow<T> =
19+
transactional(flow.asFlux()).asFlow()
20+
21+
/**
22+
* Coroutines variant of [TransactionalOperator.transactional] with a suspending lambda
23+
* parameter.
24+
*
25+
* @author Sebastien Deleuze
26+
* @since 5.2
27+
*/
28+
suspend fun <T : Any> TransactionalOperator.transactional(f: suspend () -> T?): T? =
29+
transactional(mono(Dispatchers.Unconfined) { f() }).awaitFirstOrNull()
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2002-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.transaction.reactive
18+
19+
import kotlinx.coroutines.ExperimentalCoroutinesApi
20+
import kotlinx.coroutines.delay
21+
import kotlinx.coroutines.flow.flow
22+
import kotlinx.coroutines.flow.toList
23+
import kotlinx.coroutines.runBlocking
24+
import org.assertj.core.api.Assertions.assertThat
25+
import org.junit.jupiter.api.Test
26+
import org.junit.jupiter.api.fail
27+
import org.springframework.transaction.support.DefaultTransactionDefinition
28+
29+
class TransactionalOperatorExtensionsTests {
30+
31+
private val tm = ReactiveTestTransactionManager(false, true)
32+
33+
@Test
34+
fun commitWithSuspendingFunction() {
35+
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
36+
runBlocking {
37+
operator.transactional {
38+
delay(1)
39+
true
40+
}
41+
}
42+
assertThat(tm.commit).isTrue()
43+
assertThat(tm.rollback).isFalse()
44+
}
45+
46+
@Test
47+
fun rollbackWithSuspendingFunction() {
48+
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
49+
runBlocking {
50+
try {
51+
operator.transactional {
52+
delay(1)
53+
throw IllegalStateException()
54+
}
55+
} catch (ex: IllegalStateException) {
56+
assertThat(tm.commit).isFalse()
57+
assertThat(tm.rollback).isTrue()
58+
return@runBlocking
59+
}
60+
fail("")
61+
}
62+
}
63+
64+
@Test
65+
@ExperimentalCoroutinesApi
66+
fun commitWithFlow() {
67+
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
68+
val flow = flow {
69+
emit(1)
70+
emit(2)
71+
emit(3)
72+
emit(4)
73+
}
74+
runBlocking {
75+
val list = operator.transactional(flow).toList()
76+
assertThat(list).hasSize(4)
77+
}
78+
assertThat(tm.commit).isTrue()
79+
assertThat(tm.rollback).isFalse()
80+
}
81+
82+
@Test
83+
@ExperimentalCoroutinesApi
84+
fun rollbackWithFlow() {
85+
val operator = TransactionalOperator.create(tm, DefaultTransactionDefinition())
86+
val flow = flow<Int> {
87+
delay(1)
88+
throw IllegalStateException()
89+
}
90+
runBlocking {
91+
try {
92+
operator.transactional(flow).toList()
93+
} catch (ex: IllegalStateException) {
94+
assertThat(tm.commit).isFalse()
95+
assertThat(tm.rollback).isTrue()
96+
return@runBlocking
97+
}
98+
}
99+
}
100+
}

src/docs/asciidoc/languages/kotlin.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,12 @@ class UserHandler(builder: WebClient.Builder) {
574574
}
575575
----
576576

577+
=== Transactions
578+
579+
Transactions on Coroutines are supported via the programmatic variant of the Reactive
580+
transaction management provided as of Spring Framework 5.2. `TransactionalOperator.transactional`
581+
extensions with suspending lambda and Kotlin `Flow` parameter are provided for that purpose.
582+
577583
[[kotlin-spring-projects-in-kotlin]]
578584
== Spring Projects in Kotlin
579585

0 commit comments

Comments
 (0)