diff --git a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java index 0b886df8877..e33d9b72ab5 100644 --- a/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java +++ b/spring-integration-mqtt/src/main/java/org/springframework/integration/mqtt/inbound/Mqttv5PahoMessageDrivenChannelAdapter.java @@ -60,8 +60,8 @@ import org.springframework.integration.mqtt.support.MqttMessageConverter; import org.springframework.integration.mqtt.support.MqttUtils; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; +import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.converter.SmartMessageConverter; import org.springframework.messaging.support.GenericMessage; import org.springframework.util.Assert; @@ -451,7 +451,12 @@ public void messageArrived(String topic, MqttMessage mqttMessage) { message = new GenericMessage<>(payload, headers); } else { - message = this.messageConverter.toMessage(payload, new MessageHeaders(headers), this.payloadType); + Message messageToConvert = new GenericMessage<>(payload, headers); + Object convertedPayload = this.messageConverter.fromMessage(messageToConvert, this.payloadType); + if (convertedPayload == null) { + throw new MessageConversionException(messageToConvert, "Failed to convert from MQTT Message"); + } + message = new GenericMessage<>(convertedPayload, headers); } try { diff --git a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java index 481ee94adea..8bf2f575d17 100644 --- a/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java +++ b/spring-integration-mqtt/src/test/java/org/springframework/integration/mqtt/Mqttv5AdapterTests.java @@ -16,6 +16,8 @@ package org.springframework.integration.mqtt; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; import org.eclipse.paho.mqttv5.client.IMqttAsyncClient; @@ -23,14 +25,22 @@ import org.eclipse.paho.mqttv5.client.IMqttToken; import org.eclipse.paho.mqttv5.client.MqttConnectionOptions; import org.eclipse.paho.mqttv5.common.MqttException; +import org.eclipse.paho.mqttv5.common.MqttMessage; import org.eclipse.paho.mqttv5.common.MqttSubscription; +import org.eclipse.paho.mqttv5.common.packet.MqttProperties; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.BeanFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.integration.channel.NullChannel; +import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.mqtt.inbound.Mqttv5PahoMessageDrivenChannelAdapter; import org.springframework.integration.test.util.TestUtils; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.converter.ByteArrayMessageConverter; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.converter.StringMessageConverter; import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -98,9 +108,36 @@ public void sharedSubscriptionsPopulation() throws Exception { .containsEntry("$SYS/broker/#", "$SYS/broker/#"); } + @Test // GH-10990 + public void payloadTypeIsUsedForInboundMqttV5PayloadConversion() throws Exception { + IMqttAsyncClient client = mock(); + QueueChannel outputChannel = new QueueChannel(); + Mqttv5PahoMessageDrivenChannelAdapter adapter = buildAdapterIn(client, false, outputChannel); + adapter.setPayloadType(String.class); + adapter.setMessageConverter(new CompositeMessageConverter(List.of( + new ByteArrayMessageConverter(), + new StringMessageConverter()))); + + MqttMessage mqttMessage = new MqttMessage("bar".getBytes(StandardCharsets.UTF_8)); + mqttMessage.setProperties(new MqttProperties()); + + adapter.messageArrived("foo", mqttMessage); + + Message message = outputChannel.receive(0); + + assertThat(message).isNotNull(); + assertThat(message.getPayload()).isEqualTo("bar"); + } + private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, boolean cleanStart) throws MqttException { + return buildAdapterIn(client, cleanStart, new NullChannel()); + } + + private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttAsyncClient client, + boolean cleanStart, MessageChannel outputChannel) throws MqttException { + MqttConnectionOptions connectionOptions = new MqttConnectionOptions(); connectionOptions.setServerURIs(new String[] {"tcp://localhost:1883"}); connectionOptions.setCleanStart(cleanStart); @@ -117,7 +154,7 @@ private static Mqttv5PahoMessageDrivenChannelAdapter buildAdapterIn(final IMqttA ReflectionTestUtils.setField(adapter, "mqttClient", client); adapter.setBeanFactory(mock(BeanFactory.class)); adapter.setApplicationEventPublisher(mock(ApplicationEventPublisher.class)); - adapter.setOutputChannel(new NullChannel()); + adapter.setOutputChannel(outputChannel); adapter.afterPropertiesSet(); return adapter; }