Skip to content

Rest endpoint echoing request attribute fails with: Only one connection receive subscriber allowed. #718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
hmble2 opened this issue Apr 18, 2019 · 4 comments
Labels
for/springframework This belongs to the Spring Framework project

Comments

@hmble2
Copy link

hmble2 commented Apr 18, 2019

A basic REST endpoint that simply echoes headers (and other request attributes), fails with:
Only one connection receive subscriber allowed.

C:\>curl -v --data "testdata" localhost:8083
* Rebuilt URL to: localhost:8083/
* timeout on name lookup is not supported
*   Trying 127.0.0.1...
* TCP_NODELAY set
* Connected to localhost (127.0.0.1) port 8083 (#0)
> POST / HTTP/1.1
> Host: localhost:8083
> User-Agent: curl/7.51.0
> Accept: */*
> Content-Length: 8
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 8 out of 8 bytes
< HTTP/1.1 500 Internal Server Error
< Content-Type: application/json;charset=UTF-8
< Content-Length: 160
<
{"timestamp":"2019-04-18T17:50:46.050+0000","path":"/","status":500,"error":"Internal Server Error","message":"Only one connection receive subscriber allowed."}* Curl_http_done: called premature == 0
* Connection #0 to host localhost left intact

reactor service signature looks like this (It does nothing but accumulate headers, body, params in the returning Map):

public Mono<ResponseEntity<Map<String, Object>>> service(@RequestBody(required = false) byte[] rawBody, ServerHttpRequest request)

service uses latest spring boot 2.14 (bundles reactor-netty 0.8.6 and reactor-core 3.2.8)

Exception stacktrace:

2019-04-18 13:55:40.375 [reactor-http-nio-6] ERROR o.s.b.a.w.r.e.AbstractErrorWebExceptionHandler.error(117) - [358b40a8] 500 Server Error for HTTP POST "/"
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
	at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:271)
	at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:121)
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:307)
	at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83)
	at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:307)
	at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
	at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
	at reactor.core.publisher.MonoFilterFuseable.subscribe(MonoFilterFuseable.java:44)
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:56)
	at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47)
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37)
	at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3710)
	at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153)
	at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)
	at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)
	at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
	at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
	at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1878)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1752)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
	at reactor.core.publisher.Mono.subscribe(Mono.java:3710)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)
	at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:212)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)
	at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)
	at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
	at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)
	at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
	at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:331)
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:123)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
	at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
	at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:453)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:191)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:677)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	at java.lang.Thread.run(Thread.java:748)

@hmble2 hmble2 added the type/bug A general bug label Apr 18, 2019
@violetagg
Copy link
Member

@hmble2 Please provide a reproducible scenario. Typically this exception mean that you try to obtain the body more than once.

@violetagg violetagg added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Apr 19, 2019
@hmble2
Copy link
Author

hmble2 commented Apr 22, 2019

@violetagg below sample reproduces the error

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
@RestController
public class Controller {

	private static final Log LOG = LogFactory.getLog(Controller.class);

	@RequestMapping
	public Mono<ResponseEntity<Map<String, Object>>> service(@RequestBody(required = false) byte[] rawBody, ServerHttpRequest request) {
		return Mono.just(ResponseEntity.ok(buildResponseMap(rawBody, request)));
	}

	Map<String, Object> buildResponseMap(byte[] rawBody, ServerHttpRequest request) {
		Map<String, String> headers = request.getHeaders().toSingleValueMap();
		Map<String, Object> responseMap = new HashMap<String, Object>();
		responseMap.put("method", request.getMethod().name());
		responseMap.put("headers", headers);
		responseMap.put("cookies", request.getCookies().toSingleValueMap());
		responseMap.put("parameters", request.getQueryParams().toSingleValueMap());
		responseMap.put("path", request.getPath().value());
		responseMap.put("body", rawBody);
		return responseMap;
	}

	public static void main(final String[] args) {
		SpringApplication.run(Controller.class, args);
	}
}

@violetagg
Copy link
Member

This issue is related to spring-projects/spring-framework#22284

@rstoyanchev The application does not get the request body but declares
@RequestBody(required = false) byte[] rawBody isn't it supposed that the framework will handle this use case?

@violetagg
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/springframework This belongs to the Spring Framework project
Projects
None yet
Development

No branches or pull requests

2 participants