Description
Hi,
So, I'm not sure if this is an actual issue or if it is designed that way, feel free to let me know of course !
Describe the issue
We are using Spring Cloud Stream with function composition.
In some cases, we want to continue processing some message, in some cases we don't.
When we don't, we are able to return a null
value from the last function in the composition, and no message outputs the chain of function.
The issue appear when a function returns a null
but isn't the last one in the chain.
To Reproduce
I've got a minimal project demonstrating this in two integration tests at https://github.com/mborgraeve/demo-scs-bean-loop/tree/npe
For example:
- Some value -> PassThroughFunction|NullReturningFunction-> No issue, no message output
- Some value -> NullReturningFunction|PassThroughFunction -> we have an NPE stack trace (and still no message output).
Version of the framework
This happens in 2022.0.3, 2022.0.4, 2023.0.0.
Expected behavior
We wouldn't expect a NPE stack trace, and would expect the message to be silently dropped.
Screenshots
Additional context
For reference, the stacktrace is copied below. It looks like it tries to convert the message, but fail because it's null
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2921) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2862) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2826) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$56(KafkaMessageListenerContainer.java:2744) ~[spring-kafka-3.1.1.jar:3.1.1]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.12.2.jar:1.12.2]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2742) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2595) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2481) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2123) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1478) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1442) ~[spring-kafka-3.1.1.jar:3.1.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1313) ~[spring-kafka-3.1.1.jar:3.1.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: org.springframework.kafka.KafkaException: Failed to execute runnable
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:82) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2848) ~[spring-kafka-3.1.1.jar:3.1.1]
... 12 common frames omitted
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@3b9906f1]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.3.jar:6.1.3]
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
at io.micrometer.observation.Observation.observe(Observation.java:499) ~[micrometer-observation-1.12.2.jar:1.12.2]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262) ~[spring-integration-core-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:391) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.lambda$onMessage$0(KafkaMessageDrivenChannelAdapter.java:460) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.lambda$doWithRetry$0(KafkaInboundEndpoint.java:77) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:335) ~[spring-retry-2.0.5.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:227) ~[spring-retry-2.0.5.jar:na]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.2.1.jar:6.2.1]
... 15 common frames omitted
Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "input" is null
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertNonMessageInputIfNecessary(SimpleFunctionRegistry.java:1283) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertInputIfNecessary(SimpleFunctionRegistry.java:1121) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:722) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.lambda$andThen$0(SimpleFunctionRegistry.java:646) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:725) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:577) ~[spring-cloud-function-context-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.PartitionAwareFunctionWrapper.apply(PartitionAwareFunctionWrapper.java:88) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionWrapper.apply(FunctionConfiguration.java:789) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:624) ~[spring-cloud-stream-4.0.3.jar:4.0.3]
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105) ~[spring-integration-core-6.2.1.jar:6.2.1]
... 36 common frames omitted