Skip to content

Commit 139f946

Browse files
committed
Add support for Kotlin suspending functions
Closes spring-projectsgh-28515
1 parent 3c11661 commit 139f946

File tree

5 files changed

+360
-48
lines changed

5 files changed

+360
-48
lines changed

spring-context/spring-context.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ dependencies {
3838
testImplementation("org.apache.commons:commons-pool2")
3939
testImplementation("org.awaitility:awaitility")
4040
testImplementation("jakarta.inject:jakarta.inject-tck")
41-
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core")
41+
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
4242
testRuntimeOnly("jakarta.xml.bind:jakarta.xml.bind-api")
4343
testRuntimeOnly("org.glassfish:jakarta.el")
4444
// Substitute for javax.management:jmxremote_optional:1.0.1_04 (not available on Maven Central)

spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -396,12 +396,15 @@ public Object postProcessAfterInitialization(Object bean, String beanName) {
396396
}
397397

398398
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
399-
// If Reactor is on the classpath, check for methods that return a Publisher
400-
if (ScheduledAnnotationReactiveSupport.reactorPresent) {
401-
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
402-
processScheduledReactive(scheduled, method, bean);
403-
return;
404-
}
399+
// Check for Kotlin suspending functions. If one is found but reactor bridge isn't on the classpath, throw
400+
if (ScheduledAnnotationReactiveSupport.checkKotlinRuntimeIfNeeded(method)) {
401+
processScheduledReactive(scheduled, method, bean, true);
402+
return;
403+
}
404+
// Check for Publisher-returning methods. If found but Reactor isn't on the classpath, throw.
405+
if (ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(method)) {
406+
processScheduledReactive(scheduled, method, bean, false);
407+
return;
405408
}
406409
processScheduledSync(scheduled, method, bean);
407410
}
@@ -539,16 +542,18 @@ protected void processScheduledSync(Scheduled scheduled, Method method, Object b
539542
}
540543

541544
/**
542-
* Process the given {@code @Scheduled} method declaration on the given bean which
543-
* returns a {@code Publisher}, repeatedly subscribing to the returned publisher
544-
* according to the fixedDelay/fixedRate configuration. Cron configuration isn't supported,
545-
* nor is non-Publisher return types (even if a {@code ReactiveAdapter} is registered).
545+
* Process the given {@code @Scheduled} bean method declaration which returns
546+
* a {@code Publisher}, or the given Kotlin suspending function converted to a
547+
* Publisher. The publisher is then repeatedly subscribed to, according to the
548+
* fixedDelay/fixedRate configuration. Cron configuration isn't supported,nor
549+
* is non-Publisher return types (even if a {@code ReactiveAdapter} is registered).
546550
* @param scheduled the {@code @Scheduled} annotation
547-
* @param method the method that the annotation has been declared on, which MUST return a Publisher
551+
* @param method the method that the annotation has been declared on, which
552+
* MUST either return a Publisher or be a Kotlin suspending function
548553
* @param bean the target bean instance
549554
* @see #createRunnable(Object, Method)
550555
*/
551-
protected void processScheduledReactive(Scheduled scheduled, Method method, Object bean) {
556+
protected void processScheduledReactive(Scheduled scheduled, Method method, Object bean, boolean isSuspendingFunction) {
552557
try {
553558
boolean processedSchedule = false;
554559
String errorMessage =
@@ -587,7 +592,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
587592
Duration fixedDelay = toDuration(scheduled.fixedDelay(), scheduled.timeUnit());
588593
if (!fixedDelay.isNegative()) {
589594
processedSchedule = true;
590-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
595+
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false, isSuspendingFunction));
591596
}
592597

593598
String fixedDelayString = scheduled.fixedDelayString();
@@ -605,7 +610,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
605610
throw new IllegalArgumentException(
606611
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
607612
}
608-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false));
613+
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedDelay, false, isSuspendingFunction));
609614
}
610615
}
611616

@@ -614,7 +619,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
614619
if (!fixedRate.isNegative()) {
615620
Assert.isTrue(!processedSchedule, errorMessage);
616621
processedSchedule = true;
617-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
622+
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true, isSuspendingFunction));
618623
}
619624
String fixedRateString = scheduled.fixedRateString();
620625
if (StringUtils.hasText(fixedRateString)) {
@@ -631,7 +636,7 @@ protected void processScheduledReactive(Scheduled scheduled, Method method, Obje
631636
throw new IllegalArgumentException(
632637
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
633638
}
634-
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true));
639+
reactiveTasks.add(new ScheduledAnnotationReactiveSupport.ReactiveTask(method, bean, initialDelay, fixedRate, true, isSuspendingFunction));
635640
}
636641
}
637642

spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupport.java

Lines changed: 84 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import reactor.core.publisher.Mono;
3030

3131
import org.springframework.aop.support.AopUtils;
32+
import org.springframework.core.CoroutinesUtils;
33+
import org.springframework.core.KotlinDetector;
3234
import org.springframework.util.Assert;
3335
import org.springframework.util.ClassUtils;
3436
import org.springframework.util.ReflectionUtils;
@@ -41,11 +43,86 @@
4143
*/
4244
abstract class ScheduledAnnotationReactiveSupport {
4345

46+
static final boolean publisherPresent = ClassUtils.isPresent(
47+
"org.reactivestreams.Publisher", ScheduledAnnotationReactiveSupport.class.getClassLoader());
48+
4449
static final boolean reactorPresent = ClassUtils.isPresent(
4550
"reactor.core.publisher.Flux", ScheduledAnnotationReactiveSupport.class.getClassLoader());
4651

47-
static boolean isReactive(Method method) {
48-
return Publisher.class.isAssignableFrom(method.getReturnType());
52+
static final boolean coroutinesReactorPresent = ClassUtils.isPresent(
53+
"kotlinx.coroutines.reactor.MonoKt", ScheduledAnnotationReactiveSupport.class.getClassLoader());
54+
55+
/**
56+
* Checks that if the method is reactive, it can be scheduled. If it isn't reactive
57+
* (Reactive Streams {@code Publisher} is not present at runtime or the method doesn't
58+
* return a form of Publisher) then this check returns {@code false}. Otherwise, it is
59+
* eligible for reactive scheduling and Reactor MUST also be present at runtime.
60+
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
61+
* an {@code IllegalStateException}.
62+
*/
63+
static boolean checkReactorRuntimeIfNeeded(Method method) {
64+
if (!(publisherPresent && Publisher.class.isAssignableFrom(method.getReturnType()))) {
65+
return false;
66+
}
67+
if (reactorPresent) {
68+
return true;
69+
}
70+
throw new IllegalStateException("Reactive methods returning a Publisher may only be annotated with @Scheduled"
71+
+ "if Reactor is present at runtime");
72+
}
73+
74+
/**
75+
* Checks that if the method is a Kotlin suspending function, it can be scheduled.
76+
* If it isn't a suspending function (or Kotlin is not detected at runtime) then this
77+
* check returns {@code false}. Otherwise, it is eligible for conversion to a
78+
* {@code Mono} for reactive scheduling and both Reactor and the
79+
* {@code kotlinx.coroutines.reactor} bridge MUST also be present at runtime.
80+
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
81+
* an {@code IllegalStateException}.
82+
*/
83+
static boolean checkKotlinRuntimeIfNeeded(Method method) {
84+
if (!(KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method))) {
85+
return false;
86+
}
87+
if (coroutinesReactorPresent) {
88+
return true;
89+
}
90+
throw new IllegalStateException("Kotlin suspending functions may only be annotated with @Scheduled"
91+
+ "if Reactor and the Reactor-Coroutine bridge (kotlinx.coroutines.reactor) are present at runtime");
92+
}
93+
94+
/**
95+
* Reflectively invoke a reactive method and return the resulting {@code Publisher}.
96+
* The {@link #checkReactorRuntimeIfNeeded(Method)} check is a precondition to calling
97+
* this method.
98+
*/
99+
static Publisher<?> getPublisherForReactiveMethod(Method method, Object bean) {
100+
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg reactive methods may be annotated with @Scheduled");
101+
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
102+
try {
103+
ReflectionUtils.makeAccessible(invocableMethod);
104+
Object r = invocableMethod.invoke(bean);
105+
return (Publisher<?>) r;
106+
}
107+
catch (InvocationTargetException ex) {
108+
throw new IllegalArgumentException("Cannot obtain a Publisher from the @Scheduled reactive method", ex.getTargetException());
109+
}
110+
catch (IllegalAccessException ex) {
111+
throw new IllegalArgumentException("Cannot obtain a Publisher from the @Scheduled reactive method", ex);
112+
}
113+
}
114+
115+
/**
116+
* Turn the provided suspending function method into a {@code Publisher} via
117+
* {@link CoroutinesUtils} and return that Publisher.
118+
* The {@link #checkKotlinRuntimeIfNeeded(Method)} check is a precondition to calling
119+
* this method.
120+
*/
121+
static Publisher<?> getPublisherForSuspendingFunction(Method method, Object bean) {
122+
Assert.isTrue(method.getParameterCount() == 1,"Only no-args suspending functions may be "
123+
+ "annotated with @Scheduled (with 1 self-referencing synthetic arg expected)");
124+
125+
return CoroutinesUtils.invokeSuspendingFunction(method, bean, (Object[]) method.getParameters());
49126
}
50127

51128
/**
@@ -64,20 +141,12 @@ static class ReactiveTask {
64141

65142

66143
protected ReactiveTask(Method method, Object bean, Duration initialDelay,
67-
Duration otherDelay, boolean isFixedRate) {
68-
69-
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
70-
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
71-
try {
72-
ReflectionUtils.makeAccessible(invocableMethod);
73-
Object r = invocableMethod.invoke(bean);
74-
this.publisher = (Publisher<?>) r;
144+
Duration otherDelay, boolean isFixedRate, boolean isSuspendingFunction) {
145+
if (isSuspendingFunction) {
146+
this.publisher = getPublisherForSuspendingFunction(method, bean);
75147
}
76-
catch (InvocationTargetException ex) {
77-
throw new IllegalArgumentException("Cannot obtain a Publisher from the @Scheduled reactive method", ex.getTargetException());
78-
}
79-
catch (IllegalAccessException ex) {
80-
throw new IllegalArgumentException("Cannot obtain a Publisher from the @Scheduled reactive method", ex);
148+
else {
149+
this.publisher = getPublisherForReactiveMethod(method, bean);
81150
}
82151

83152
this.initialDelay = initialDelay;
@@ -89,7 +158,6 @@ protected ReactiveTask(Method method, Object bean, Duration initialDelay,
89158
+ "#" + method.getName() + "()' [ScheduledAnnotationReactiveSupport]";
90159
}
91160

92-
93161
private Mono<Void> safeExecutionMono() {
94162
Mono<Void> executionMono;
95163
if (this.publisher instanceof Mono) {

spring-context/src/test/java/org/springframework/scheduling/annotation/ScheduledAnnotationReactiveSupportTests.java

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import static org.assertj.core.api.Assertions.assertThat;
3636
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
37+
import static org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupport.getPublisherForReactiveMethod;
3738

3839
class ScheduledAnnotationReactiveSupportTests {
3940

@@ -47,17 +48,17 @@ void ensureReactor() {
4748
"publisherString", "monoThrows" }) //note: monoWithParams can't be found by this test
4849
void checkIsReactive(String method) {
4950
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, method);
50-
assertThat(ScheduledAnnotationReactiveSupport.isReactive(m)).as(m.getName()).isTrue();
51+
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(m)).as(m.getName()).isTrue();
5152
}
5253

5354
@Test
5455
void checkNotReactive() {
5556
Method string = ReflectionUtils.findMethod(ReactiveMethods.class, "oops");
5657
Method future = ReflectionUtils.findMethod(ReactiveMethods.class, "future");
5758

58-
assertThat(ScheduledAnnotationReactiveSupport.isReactive(string))
59+
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(string))
5960
.as("String-returning").isFalse();
60-
assertThat(ScheduledAnnotationReactiveSupport.isReactive(future))
61+
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(future))
6162
.as("Future-returning").isFalse();
6263
}
6364

@@ -131,24 +132,37 @@ void init() {
131132
this.target = new ReactiveMethods();
132133
}
133134

135+
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
134136
@Test
135137
void rejectWithParams() {
136138
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoWithParam", String.class);
137139

138-
assertThat(ScheduledAnnotationReactiveSupport.isReactive(m)).as("isReactive").isTrue();
140+
assertThat(ScheduledAnnotationReactiveSupport.checkReactorRuntimeIfNeeded(m)).as("isReactive").isTrue();
139141

142+
//static helper method
143+
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
144+
.withMessage("Only no-arg reactive methods may be annotated with @Scheduled")
145+
.withNoCause();
146+
147+
//constructor of task
140148
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
141-
m, target, Duration.ZERO, Duration.ZERO, false))
142-
.withMessage("Only no-arg methods may be annotated with @Scheduled")
149+
m, target, Duration.ZERO, Duration.ZERO, false, false))
150+
.withMessage("Only no-arg reactive methods may be annotated with @Scheduled")
143151
.withNoCause();
144152
}
145153

146154
@Test
147155
void rejectCantProducePublisher() {
148156
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrows");
149157

158+
//static helper method
159+
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
160+
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
161+
.withCause(new IllegalStateException("expected"));
162+
163+
//constructor of task
150164
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
151-
m, target, Duration.ZERO, Duration.ZERO, false))
165+
m, target, Duration.ZERO, Duration.ZERO, false, false))
152166
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
153167
.withCause(new IllegalStateException("expected"));
154168
}
@@ -157,8 +171,14 @@ void rejectCantProducePublisher() {
157171
void rejectCantAccessMethod() {
158172
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "monoThrowsIllegalAccess");
159173

174+
//static helper method
175+
assertThatIllegalArgumentException().isThrownBy(() -> getPublisherForReactiveMethod(m, target))
176+
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
177+
.withCause(new IllegalAccessException("expected"));
178+
179+
//constructor of task
160180
assertThatIllegalArgumentException().isThrownBy(() -> new ScheduledAnnotationReactiveSupport.ReactiveTask(
161-
m, target, Duration.ZERO, Duration.ZERO, false))
181+
m, target, Duration.ZERO, Duration.ZERO, false, false))
162182
.withMessage("Cannot obtain a Publisher from the @Scheduled reactive method")
163183
.withCause(new IllegalAccessException("expected"));
164184
}
@@ -167,7 +187,7 @@ void rejectCantAccessMethod() {
167187
void hasCheckpointToString() {
168188
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "mono");
169189
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
170-
m, target, Duration.ZERO, Duration.ZERO, false);
190+
m, target, Duration.ZERO, Duration.ZERO, false, false);
171191

172192
assertThat(reactiveTask).hasToString("@Scheduled 'org.springframework.scheduling.annotation.ScheduledAnnotationReactiveSupportTests$ReactiveMethods#mono()' [ScheduledAnnotationReactiveSupport]");
173193
}
@@ -176,7 +196,7 @@ void hasCheckpointToString() {
176196
void cancelledEarlyPreventsSubscription() {
177197
Method m = ReflectionUtils.findMethod(ReactiveMethods.class, "trackingMono");
178198
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
179-
m, target, Duration.ZERO, Duration.ofSeconds(10), false);
199+
m, target, Duration.ZERO, Duration.ofSeconds(10), false, false);
180200
reactiveTask.cancel();
181201
reactiveTask.subscribe();
182202

@@ -187,7 +207,7 @@ void cancelledEarlyPreventsSubscription() {
187207
void noInitialDelayFixedDelay() throws InterruptedException {
188208
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
189209
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
190-
m, target, Duration.ZERO, Duration.ofSeconds(10), false);
210+
m, target, Duration.ZERO, Duration.ofSeconds(10), false, false);
191211
reactiveTask.subscribe();
192212
Thread.sleep(500);
193213
reactiveTask.cancel();
@@ -199,7 +219,7 @@ void noInitialDelayFixedDelay() throws InterruptedException {
199219
void noInitialDelayFixedRate() throws InterruptedException {
200220
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
201221
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
202-
m, target, Duration.ZERO, Duration.ofSeconds(10), true);
222+
m, target, Duration.ZERO, Duration.ofSeconds(10), true, false);
203223
reactiveTask.subscribe();
204224
Thread.sleep(500);
205225
reactiveTask.cancel();
@@ -211,7 +231,7 @@ void noInitialDelayFixedRate() throws InterruptedException {
211231
void initialDelayFixedDelay() throws InterruptedException {
212232
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
213233
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
214-
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), false);
234+
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), false, false);
215235
reactiveTask.subscribe();
216236
Thread.sleep(500);
217237
reactiveTask.cancel();
@@ -223,7 +243,7 @@ void initialDelayFixedDelay() throws InterruptedException {
223243
void initialDelayFixedRate() throws InterruptedException {
224244
Method m = ReflectionUtils.findMethod(target.getClass(), "trackingMono");
225245
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
226-
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), true);
246+
m, target, Duration.ofSeconds(10), Duration.ofMillis(500), true, false);
227247
reactiveTask.subscribe();
228248
Thread.sleep(500);
229249
reactiveTask.cancel();
@@ -235,7 +255,7 @@ void initialDelayFixedRate() throws InterruptedException {
235255
void monoErrorHasCheckpoint() throws InterruptedException {
236256
Method m = ReflectionUtils.findMethod(target.getClass(), "monoError");
237257
final ScheduledAnnotationReactiveSupport.ReactiveTask reactiveTask = new ScheduledAnnotationReactiveSupport.ReactiveTask(
238-
m, target, Duration.ZERO, Duration.ofSeconds(10), true);
258+
m, target, Duration.ZERO, Duration.ofSeconds(10), true, false);
239259

240260
assertThat(reactiveTask.checkpoint).isEqualTo("@Scheduled 'org.springframework.scheduling.annotation"
241261
+ ".ScheduledAnnotationReactiveSupportTests$ReactiveMethods#monoError()'"

0 commit comments

Comments
 (0)