Skip to content

Commit 7a5f8e0

Browse files
committed
Refine check for multiple subscribers
Commit #c187cb2 introduced proactive rejection of multiple subscribers in ReactorClientHttpResponse, instead of hanging indefinitely as per reactor/reactor-netty#503. However FluxReceive also rejects subsequent subscribers if the response is consumed fully, as opposed to being canceled, e.g. as with bodyToMono(Void.class). In that case, a subsequent subscriber causes two competing error signals to be sent, and one gets dropped and logged by reactor-core. This fix ensures that a rejection is raised in ReactorClientHttpResponse only after a cancel() was detected. Issue: SPR-17564
1 parent 50e5bdb commit 7a5f8e0

File tree

2 files changed

+16
-8
lines changed

2 files changed

+16
-8
lines changed

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.springframework.http.HttpHeaders;
3030
import org.springframework.http.HttpStatus;
3131
import org.springframework.http.ResponseCookie;
32-
import org.springframework.util.Assert;
3332
import org.springframework.util.CollectionUtils;
3433
import org.springframework.util.LinkedMultiValueMap;
3534
import org.springframework.util.MultiValueMap;
@@ -49,7 +48,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
4948

5049
private final NettyInbound inbound;
5150

52-
private final AtomicBoolean bodyConsumed = new AtomicBoolean();
51+
private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
5352

5453

5554
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
@@ -62,10 +61,18 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
6261
@Override
6362
public Flux<DataBuffer> getBody() {
6463
return this.inbound.receive()
65-
.doOnSubscribe(s ->
66-
// See https://github.com/reactor/reactor-netty/issues/503
67-
Assert.state(this.bodyConsumed.compareAndSet(false, true),
68-
"The client response body can only be consumed once."))
64+
.doOnSubscribe(s -> {
65+
if (this.rejectSubscribers.get()) {
66+
throw new IllegalStateException("The client response body can only be consumed once.");
67+
}
68+
})
69+
.doOnCancel(() -> {
70+
// https://github.com/reactor/reactor-netty/issues/503
71+
// FluxReceive rejects multiple subscribers, but not after a cancel().
72+
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
73+
// So we need to intercept and reject them in that case.
74+
this.rejectSubscribers.set(true);
75+
})
6976
.map(byteBuf -> {
7077
byteBuf.retain();
7178
return this.bufferFactory.wrap(byteBuf);

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,8 +447,9 @@ private <T extends Publisher<?>> T handleBody(ClientResponse response,
447447
@SuppressWarnings("unchecked")
448448
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
449449
// Ensure the body is drained, even if the StatusHandler didn't consume it,
450-
// but ignore errors in case it did consume it.
451-
return (Mono<T>) response.bodyToMono(Void.class).onErrorMap(ex2 -> ex).thenReturn(ex);
450+
// but ignore exception, in case the handler did consume.
451+
return (Mono<T>) response.bodyToMono(Void.class)
452+
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
452453
}
453454

454455
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {

0 commit comments

Comments
 (0)