Skip to content

Commit 33894dc

Browse files
artembilanspring-builds
authored andcommitted
GH-9866: Fix FluxMessageChannel for race condition on destroy
Fixes: #9866 Issue link: #9866 Due to race condition between `sink.tryEmitNext()` and `sink.emitComplete()``, there could be a situation when `onNext` signal slips after `onComplete`. Appears on fast producers into this `FluxMessageChannel`. That leads to error like `Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.` * Fix `FluxMessageChannel` to check for `this.active` one more time just before `sink.tryEmitNext()` call * Also mitigate a race condition with a `Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1))` in the `destroy()` instead of `Sinks.EmitFailureHandler.FAIL_FAST`. The `busyLooping()` would retry requested `onComplete()` signal for that specific error until success or timeout (cherry picked from commit 7680e02)
1 parent 3a3ea9b commit 33894dc

File tree

1 file changed

+17
-11
lines changed

1 file changed

+17
-11
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

+17-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2024 the original author or authors.
2+
* Copyright 2015-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,15 +86,21 @@ private boolean tryEmitMessage(Message<?> message) {
8686
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
8787
.build();
8888
}
89-
return switch (this.sink.tryEmitNext(messageToEmit)) {
90-
case OK -> true;
91-
case FAIL_NON_SERIALIZED, FAIL_OVERFLOW -> false;
92-
case FAIL_ZERO_SUBSCRIBER ->
93-
throw new IllegalStateException("The [" + this + "] doesn't have subscribers to accept messages");
94-
case FAIL_TERMINATED, FAIL_CANCELLED ->
95-
throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: "
96-
+ this.sink);
97-
};
89+
90+
if (this.active) {
91+
return switch (this.sink.tryEmitNext(messageToEmit)) {
92+
case OK -> true;
93+
case FAIL_NON_SERIALIZED, FAIL_OVERFLOW -> false;
94+
case FAIL_ZERO_SUBSCRIBER ->
95+
throw new IllegalStateException("The [" + this + "] doesn't have subscribers to accept messages");
96+
case FAIL_TERMINATED, FAIL_CANCELLED ->
97+
throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink: "
98+
+ this.sink);
99+
};
100+
}
101+
else {
102+
return false;
103+
}
98104
}
99105

100106
@Override
@@ -171,7 +177,7 @@ private void sendReactiveMessage(Message<?> message) {
171177
public void destroy() {
172178
this.active = false;
173179
this.upstreamSubscriptions.dispose();
174-
this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
180+
this.sink.emitComplete(Sinks.EmitFailureHandler.busyLooping(Duration.ofSeconds(1)));
175181
super.destroy();
176182
}
177183

0 commit comments

Comments
 (0)