Skip to content

WebClient subscribing twice #1747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Sam-Kruglov opened this issue Jun 12, 2019 · 2 comments
Closed

WebClient subscribing twice #1747

Sam-Kruglov opened this issue Jun 12, 2019 · 2 comments
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor

Comments

@Sam-Kruglov
Copy link

Sam-Kruglov commented Jun 12, 2019

Expected behavior

Only hit reactor.netty.channel.FluxReceive#startReceiver once

Actual behavior

Hits twice, where second time throws silently the "Only one connection receive subscriber allowed."

In production, sometimes this exception is reported from reactor.core.publisher.Operators : Operator called default onErrorDropped ....
Locally I could only see this exception at the point of throwing inside FluxReceive#startReceiver with a debugger. Later in reactor.core.publisher.MonoFlatMap.FlatMapMain#onError it was done = false, so the exception was never shown anywhere.

Steps to reproduce

My calling code is the following:

webClient.get()
      .uri("myUri")
      .retrieve()
      .onStatus(HttpStatus::isError, response ->
                          response.bodyToMono(ErrorResponse.class)
                                  .log("response.bodyToMono")
                                  .flatMap(error -> Mono.error(
                                          new MyException(
                                                  MyErrorType.ofCode(error.getCode()).orElse(null),
                                                  error.getDescription()
                                          )
                                  ))
        )
      .bodyToMono(MyDto.class)

the logs:

client.bodyToMono                        : | onSubscribe([Fuseable] MonoFlatMap.FlatMapMain)
client.bodyToMono                        : | request(unbounded)
response.bodyToMono                      : | onSubscribe([Fuseable] MonoSingle.SingleSubscriber)
response.bodyToMono                      : | request(unbounded)
response.bodyToMono                      : | onNext(ErrorResponse(code=500, description=message))
response.bodyToMono                      : | cancel()
client.bodyToMono                        : | onError(com.bla.MyException: message)
client.bodyToMono                        :  the exception stacktrace

Reactor Core version

org.springframework:spring-webflux:5.1.6.RELEASE

JVM version (e.g. java -version)

openjdk version "11.0.1" 2018-10-16

========

The workaround for me is using an ExchangeFilterFunction https://stackoverflow.com/a/48984852/6166627 instead of onStatus. With this approach, I only hit the start receiver once with a debugger. Hopefully, it's gonna fix itself.

Anyway, this ERROR log seems to not affect anything actually, the code still throws MyException

@Sam-Kruglov
Copy link
Author

This issue probably should be in the webflux repo, but I am not entirely sure

@bsideup
Copy link
Contributor

bsideup commented Jun 20, 2019

I would say the issue should definitely be moved to webflux's issue tracker
/cc @rstoyanchev

@bsideup bsideup added the status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor label Jun 20, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid, or the root cause was found outside of Reactor
Projects
None yet
Development

No branches or pull requests

3 participants