Skip to content

gRPC Outbound gateway fails on server streaming when producing output #10793

@cppwfs

Description

@cppwfs

AbstractMessageProducingHandler's sendOutput method does not handle Flux as an output type.
It treats the Flux as a Queue and thus the Flux throws the following example.

Version Found 7.1.M1

Stack trace

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'grpcOutboundFlowStreamResponse.message-handler#0' for component 'grpcOutboundFlowStreamResponse.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; from source: 'org.springframework.integration.dsl.IntegrationFlow org.springframework.integration.samples.grpc.configuration.ClientHelloWorldConfiguration.grpcOutboundFlowStreamResponse(io.grpc.ManagedChannel,org.springframework.messaging.MessageChannel,org.springframework.integration.channel.FluxMessageChannel)'], failedMessage=GenericMessage [payload=name: "Jack"
, headers={id=cd9fdbed-79bb-e51a-421d-24370870abc1, timestamp=1770997645833}]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'grpcOutboundFlowStreamResponse.message-handler#0' for component 'grpcOutboundFlowStreamResponse.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; from source: 'org.springframework.integration.dsl.IntegrationFlow org.springframework.integration.samples.grpc.configuration.ClientHelloWorldConfiguration.grpcOutboundFlowStreamResponse(io.grpc.ManagedChannel,org.springframework.messaging.MessageChannel,org.springframework.integration.channel.FluxMessageChannel)'], failedMessage=GenericMessage [payload=name: "Jack"
, headers={id=cd9fdbed-79bb-e51a-421d-24370870abc1, timestamp=1770997645833}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:190) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:133) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:150) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:123) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:438) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:341) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:310) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.samples.grpc.configuration.ClientHelloWorldConfiguration.lambda$grpcClientStreamResponse$1(ClientHelloWorldConfiguration.java:148) ~[main/:na]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:165) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.FluxRefCount$RefCountInner.setRefCountMonitor(FluxRefCount.java:214) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:87) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8888) ~[reactor-core-3.8.3.jar:3.8.3]
	at org.springframework.integration.channel.FluxMessageChannel.subscribe(FluxMessageChannel.java:120) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:70) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8888) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribeWith(Flux.java:9009) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8853) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8777) ~[reactor-core-3.8.3.jar:3.8.3]
	at reactor.core.publisher.Flux.subscribe(Flux.java:8695) ~[reactor-core-3.8.3.jar:3.8.3]
	at org.springframework.integration.samples.grpc.configuration.ClientHelloWorldConfiguration.lambda$grpcClientStreamResponse$0(ClientHelloWorldConfiguration.java:158) ~[main/:na]
	at org.springframework.boot.SpringApplication.lambda$callRunner$0(SpringApplication.java:788) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-7.0.4.jar:7.0.4]
	at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-7.0.4.jar:7.0.4]
	at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-7.0.4.jar:7.0.4]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:800) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:788) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.lambda$callRunners$0(SpringApplication.java:776) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:186) ~[na:na]
	at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:153) ~[na:na]
	at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:176) ~[na:na]
	at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
	at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:632) ~[na:na]
	at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:328) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1365) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354) ~[spring-boot-4.1.0-SNAPSHOT.jar:4.1.0-SNAPSHOT]
	at org.springframework.integration.samples.grpc.Application.main(Application.java:41) ~[main/:na]
Caused by: java.lang.UnsupportedOperationException: Although QueueSubscription extends Queue it is purely internal and only guarantees support for poll/clear/size/isEmpty. Instances shouldn't be used/exposed as Queue outside of Reactor operators.
	at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:204) ~[reactor-core-3.8.3.jar:3.8.3]
	at org.springframework.integration.handler.AbstractMessageProducingHandler.shouldSplitOutput(AbstractMessageProducingHandler.java:255) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:244) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:152) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:102) ~[spring-integration-core-7.1.0-SNAPSHOT.jar:7.1.0-SNAPSHOT]
	... 41 common frames omitted

AbstractMessageProducingHandler sendOutputs method does not support Flux

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions