Skip to content

WebClient logs "Only one connection receive subscriber allowed" when response status is an error [SPR-17564] #22096

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
spring-projects-issues opened this issue Dec 4, 2018 · 14 comments
Assignees
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: bug A general bug
Milestone

Comments

@spring-projects-issues
Copy link
Collaborator

spring-projects-issues commented Dec 4, 2018

Fethullah Misir opened SPR-17564 and commented

WebClient throws an IllegalStateException with:

Only one connection receive subscriber allowed

 when the server response status is 4xx / 5xx.

The issue is not reproducible with Spring Boot 2.1.0.RELEASE / WebFlux 5.1.2.RELEASE. 

 

I  reproduced the issue with the test below:

@Test
public void testRetrieveDoesThrowMultipleSubscriberError() {
   stubFor( get( urlEqualTo( "/notFound" ) )
         .willReturn( aResponse().withHeader( HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE )
                                 .withBody( "{}" )
                                 .withStatus( HttpStatus.NOT_FOUND_404 ) ) );

   StepVerifier.create( webClient.get()
                                 .uri( "/notFound" )
                                 .accept( MediaType.APPLICATION_JSON )
                                 .retrieve()

                                 .bodyToMono( String.class ) )
               .expectError( WebClientException.class )
               .verify();
}
 

The test pasess, however the below stack trace is logged: 

Caused by: java.lang.IllegalStateException: Only one connection receive subscriber allowed.
        ... 143 common frames omitted
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxMap] :
        reactor.core.publisher.Flux.map(Flux.java:5655)
        reactor.netty.ByteBufFlux.fromInbound(ByteBufFlux.java:70)
        reactor.netty.channel.ChannelOperations.receive(ChannelOperations.java:225)
        org.springframework.http.client.reactive.ReactorClientHttpResponse.getBody(ReactorClientHttpResponse.java:64)
        org.springframework.web.reactive.function.BodyExtractors.consumeAndCancel(BodyExtractors.java:268)
        org.springframework.web.reactive.function.BodyExtractors.lambda$skipBodyAsMono$21(BodyExtractors.java:264)
.....
.....
        io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
        io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
        io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
Error has been observed by the following operator(s):
        |_      Flux.map ? reactor.netty.ByteBufFlux.fromInbound(ByteBufFlux.java:70)
        |_      Flux.doOnSubscribe ? org.springframework.http.client.reactive.ReactorClientHttpResponse.getBody(ReactorClientHttpResponse.java:65)
 

 

I attached a sample, to reproduce the issue.

It seams that the issue is related to the default status handler. When registering a custom one, the stacktrace is not logged and it works as expected.

StepVerifier.create( webClient.get()
                              .uri( "/notFound" )
                              .accept( MediaType.APPLICATION_JSON )
                              .retrieve()
                              .onStatus( status -> true, response -> Mono.error(IllegalStateException::new ) )
                              .bodyToMono( String.class ) )
            .expectError( IllegalStateException.class )
            .verify(); 

 

 

 

 


Affects: 5.1.3

Attachments:

Issue Links:

Referenced from: commits 7a5f8e0

@spring-projects-issues
Copy link
Collaborator Author

spring-projects-issues commented Dec 4, 2018

Rossen Stoyanchev commented

This is as a result of the work for #22005. We to ensure the body is consumed, and that may result in attempting to consume it twice, and hence the error. We then ignore the error, so in that sense it doesn't do any harm, but the error is logged by Reactor Core.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

The same issue was also reported in Reactor Netty #540.

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

There is a fix available in master.

@spring-projects-issues
Copy link
Collaborator Author

Fethullah Misir commented

 

In an application on that i'am working on, the issue does cause the unit tests to fail. 

The use-case is to do a fallback using the same webclient when the first requests results in an error.

I can reproduce the same issue with the below test: 

 

@Test
public void testRetrieveDoesThrowMultipleSubscriberError() {
   //Stub 404
   stubFor( get( urlEqualTo( "/notFound" ) )
         .willReturn( aResponse().withHeader( HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE )
                                 .withBody( "{}" )
                                 .withStatus( HttpStatus.NOT_FOUND_404 ) ) );
   //Stub 200
   stubFor( get( urlEqualTo( "/ok" ) )
         .willReturn( aResponse().withHeader( HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE )
                                 .withBody( "{}" )
                                 .withStatus( HttpStatus.OK_200 ) ) );

   StepVerifier.create( webClient.get()
                                 .uri( "/notFound" )
                                 .accept( MediaType.APPLICATION_JSON )
                                 .retrieve()
                                 .bodyToMono( String.class )
                                 // ---> fallback
                                 .onErrorResume( error -> webClient.get().uri( "/ok" )
                                                                   .accept( MediaType.APPLICATION_JSON )
                                                                   .retrieve()
                                                                   .bodyToMono( String.class ) ) )
               .expectNext( "{}" )
               .verifyComplete();
} 

//results in: 

java.lang.AssertionError: expectation "expectNext({})" failed (expected: onNext({}); actual: onError(reactor.core.Exceptions$BubblingException: java.lang.IllegalStateException: Only one connection receive subscriber allowed.))
 at reactor.test.ErrorFormatter.assertionError(ErrorFormatter.java:105) at reactor.test.ErrorFormatter.failPrefix(ErrorFormatter.java:94)

I think the reuse of the WebClient causes the error. When replacing the fallback with 

.onErrorResume( error -> Mono.just("{}" )

the test passes successfully (the reactor error is still logged).

 

I also tried your fix with the latest snapshot build.

[INFO] +- org.springframework:spring-webflux:jar:5.1.4.BUILD-SNAPSHOT:compile
[INFO] |  +- io.projectreactor:reactor-core:jar:3.2.3.RELEASE:compile
[INFO] |  |  \- org.reactivestreams:reactive-streams:jar:1.0.2:compile
 

The excpetion is still logged and additionally the test does not terminate anymore.

(I ran the initial test without the fallback)

reactor-http-nio-42018-12-05 09:07:50 ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
java.lang.IllegalStateException: Only one connection receive subscriber allowed. 

 

 

@spring-projects-issues
Copy link
Collaborator Author

Fethullah Misir commented

Forget about my last comment. I missed to update the spring-web artifact to the latest build. 

The reactor error is gone and my tests are passing now. Awesome, thank you! 

 

 

@spring-projects-issues
Copy link
Collaborator Author

Rossen Stoyanchev commented

Thanks for checking! I did verify the fix with your test, and with the sample from the Reactor Netty issue, but always good to know it works in your environment.

@awilhelmer
Copy link

I don't know why this issue is closed because i'am getting the same error on Spring Boot v.2.1.2

java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:271)
	at reactor.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:124)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:466)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
	at java.lang.Thread.run(Thread.java:745)

Code:

@PostMapping(value = "/route")
   public Flux<DataBuffer> postSlice(ServerHttpRequest request, ServerHttpResponse response) {
      Mono<ClientResponse> exchange = getWebclient()
            .post()
            .uri(urlStr)
            .body(BodyInserters.fromDataBuffers(request.getBody()))
            .exchange();
      return handleRequest(response, exchange);
   }

private Flux<DataBuffer> handleRequest(ServerHttpResponse response, Mono<ClientResponse> exchange) {
      return exchange.flatMapMany(clientResponse -> {
         Utils.copyClientResponseHeaders(response, clientResponse);
         return clientResponse.bodyToFlux(DataBuffer.class);
      });
   }

Should i open a new one?

@snicoll
Copy link
Member

snicoll commented Jan 18, 2019

@awilhelmer if you believe the issue has not been fixed, please open a separate issue with a small sample that reproduces the behavior you've described (i.e. not some code snippet but something we can run ourselves).

@awilhelmer
Copy link

awilhelmer commented Jan 18, 2019

Okay, @snicoll thanks for your very fast reply :)
BTW.: It's only on POST not on GET or PUT ...

@rstoyanchev
Copy link
Contributor

Same exception, could be a completely different reason. It could also be that the server request body is actually read twice (e.g. on a form post colliding with exchange.getFormData()).

@thekalinga
Copy link

thekalinga commented Jan 21, 2019

@awilhelmer Can you share what this part of the code is doing with ServerHttpResponse?

Utils.copyClientResponseHeaders(response, clientResponse);

@awilhelmer
Copy link

awilhelmer commented Jan 21, 2019

@thekalinga The method copies HTTP Headers from WebClient Repsonse to the initial HTTP caller. You can comment it out.

@awilhelmer
Copy link

awilhelmer commented Jan 21, 2019

I tried to reproduce it with a application and i recognized that my postman builded a wrong request. This error only appears when i send a wrong content-type:
curl -X POST "http://localhost:8080/test/post" -H "content-type: application/x-www-form-urlencoded"

I'm not sure if it's a issue anymore, but i think this exception is not the right behaviour.
Edit: I created a new issue, because x-www-form-urlencoded reqeuests should work too

@rstoyanchev
Copy link
Contributor

See also explanation under #22284.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
in: web Issues in web modules (web, webmvc, webflux, websocket) type: bug A general bug
Projects
None yet
Development

No branches or pull requests

5 participants