Skip to content

Exception in a Flux created by a subscriber will be ignored by reactor-netty  #362

@dantesun

Description

@dantesun

Following link https://pivotal.io/security should be used to report security related issues

Expected behavior

Retry works

Actual behavior

Exception ignored by FluxReceive

Steps to reproduce

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.time.Duration;
import java.util.logging.Level;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.resources.LoopResources;
import reactor.retry.Retry;

public class PollingTest {

  private static final Logger logger = LoggerFactory.getLogger(PollingTest.class);

  private Flux<Integer> createFlux() {
    return Flux.just(1, 2, 3, 4, 5, 6, 7);
  }

  @Test
  public void polling() throws InterruptedException {
    HttpClient client =
        HttpClient.builder()
            .options(
                builder -> {
                  LoopResources channelResources = LoopResources.create("my-loop", 2, false);
                  builder.loopResources(channelResources);
                  builder.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
                  builder.disablePool();
                })
            .build();

    Retry<Object> ioError =
        Retry.anyOf(IOException.class)
            .retryMax(Integer.MAX_VALUE)
            .fixedBackoff(Duration.ofSeconds(1));

    client
        .get(
            "http://releases.ubuntu.com/16.04.4/ubuntu-16.04.4-desktop-amd64.iso",
            request -> {
              request
                  .context()
                  .addHandlerFirst(new IdleStateHandler(1, 0, 0))
                  .addHandlerLast(
                      new ChannelDuplexHandler() {
                        @Override
                        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                            throws Exception {
                          if (evt instanceof IdleStateEvent) {
                            ctx.close();
                          } else {
                            super.userEventTriggered(ctx, evt);
                          }
                        }
                      });
              return request.send();
            })
        .doOnError(e -> logger.error("Error!", e))
        .flatMapMany(NettyInbound::receive)
        .log(logger.getName(), Level.SEVERE, SignalType.ON_ERROR, SignalType.CANCEL)
        .retryWhen(ioError)
        .subscribe(
            byteBuf -> {
              logger.info("Msg: {}", byteBuf);
              Flux.just(1,2,3)
                  .onErrorReturn(8)
                  .subscribe(
                      i -> {
                          throw new RuntimeException(i + " error");
                      });
            });
    Thread.sleep(Duration.ofMinutes(5).toMillis())

Reactor Netty version

0.7.7.RELEASE

JVM version (e.g. java -version)

java version "1.8.0_144"
Java(TM) SE Runtime Environment (build 1.8.0_144-b01)
Java HotSpot(TM) 64-Bit Server VM (build 25.144-b01, mixed mode)

OS version (e.g. uname -a)

Darwin 15.6.0 Darwin Kernel Version 15.6.0: Tue Jan 30 11:45:51 PST 2018; root:xnu-3248.73.8~1/RELEASE_X86_64 x86_64

Metadata

Metadata

Assignees

No one assigned

    Labels

    for/stackoverflowQuestions are best asked on SO or Gitter

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions