Skip to content

Mqttv5PahoMessageDrivenChannelAdapter ignores payloadType due to incorrect MessageConverter.toMessage usage #10990

@d0v0c

Description

@d0v0c

In what version(s) of Spring Integration are you seeing this issue?

7.0.4

Describe the bug

I ran into an issue with Mqttv5PahoMessageDrivenChannelAdapter when receiving messages. When payloadType is set to a class other than byte[] or MqttMessage (e.g., setPayloadType(String.class)), it causes two weird behaviors:

Symptom 1: setPayloadType(String.class) is silently ignored

If I configure setPayloadType(String.class), the Message payload is still byte[]. I receive no error and no warning; it just silently ignores the type hint I set.

Symptom 2: payload is silently Base64-encoded when contentType=application/json

When the inbound MQTT v5 message has PayloadFormatIndicator=1 and Content-Type=application/json, the payload doesn't come out as raw bytes or parsed JSON. Instead, it gets Base64-encoded and wrapped in JSON quotes.

For example, if the broker sends payload {"temperature":25}, my application receives "eyJ0ZW1wZXJhdHVyZSI6MjV9" (wrapped in JSON quotes, emitted as bytes). This is pretty confusing because it looks like a normal string at first glance, so you don't realize something is broken until your downstream JSON parser fails on the Base64 text.

To Reproduce

In Spring Boot

Mqttv5PahoMessageDrivenChannelAdapter adapter = new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "device");
adapter.setPayloadType(String.class);

return IntegrationFlow
    .from(adapter)
    .log(LoggingHandler.Level.WARN, "MQTT-String-Log", m -> "String after conversion: " + m.getPayload() + "\nHeader: " + m.getHeaders())
    .get();

Receive a standard text message from the MQTT broker. Observe the console log: message.getPayload().getClass() returns [B (byte array) rather than java.lang.String.

In JUnit

@SpringJUnitConfig
public class MqttBugReproducerTest {
    @Autowired
    private ApplicationContext applicationContext;

    @Test
    public void testPayloadConversion() {
        // Construct a raw MQTT v5 message
        String originalJson = "{\"temperature\":25}";
        MqttMessage mqttMessage = new MqttMessage(originalJson.getBytes(StandardCharsets.UTF_8));

        MqttProperties properties = new MqttProperties();
        properties.setContentType("application/json");
        properties.setPayloadFormat(true); // PayloadFormatIndicator=1
        mqttMessage.setProperties(properties);

        //  Initialize the Adapter (bypass actual broker connection)
        Mqttv5PahoMessageDrivenChannelAdapter adapter =
                new Mqttv5PahoMessageDrivenChannelAdapter(
                        "tcp://localhost:1883",
                        "testClient",
                        "test/topic");
		
		// Trigger the bug
        adapter.setPayloadType(String.class);

        QueueChannel outputChannel = new QueueChannel();
        adapter.setOutputChannel(outputChannel);
        adapter.setBeanFactory(applicationContext);
        adapter.afterPropertiesSet();

        // Trigger messageArrived to simulate message inbound
        adapter.messageArrived("test/topic", mqttMessage);

        // Retrieve result and inspect the bug symptom
        Message<?> resultMessage = outputChannel.receive(1000);
        assertNotNull(resultMessage, "No message received on outputChannel");

        Object payload = resultMessage.getPayload();
        System.out.println("Expected Payload Type: String.class");
        System.out.println("Actual Payload Type: " + payload.getClass().getName());
        System.out.println("Expected Payload Content: " + originalJson);
        if (payload instanceof byte[] bytes) {
            System.out.println("Actual Payload Content (byte[] -> UTF8): " + new String(bytes, StandardCharsets.UTF_8));
        } else {
            System.out.println("Actual Payload Content: " + payload);
        }
    }

    @Configuration
    static class TestConfig {
    	/**
    	 * Mock SmartMessageConverter chain,
         * including JacksonJsonMessageConverter 
         * to reproduce Base64 encoding symptoms.
         */
        @Bean(name = "integrationArgumentResolverMessageConverter")
        public SmartMessageConverter integrationArgumentResolverMessageConverter() {
            return new CompositeMessageConverter(List.of(
                    new ByteArrayMessageConverter(),
                    new StringMessageConverter(),
                    new JacksonJsonMessageConverter()	// Jackson 3
            ));
        }
    }
}

Cause Analysis

Both symptoms originate in:

Mqttv5PahoMessageDrivenChannelAdapter#messageArrived (lines ~444–455):

Object payload =
      MqttMessage.class.isAssignableFrom(this.payloadType)
              ? mqttMessage
              : mqttMessage.getPayload();

Message<?> message;
if (MqttMessage.class.isAssignableFrom(this.payloadType) || byte[].class.isAssignableFrom(this.payloadType)) {
  message = new GenericMessage<>(payload, headers);
}
else {
  message = this.messageConverter.toMessage(payload, new MessageHeaders(headers), this.payloadType);
}

When this.payloadType is set to String.class, the code executes:

this.messageConverter.toMessage( <byte[] instance>, headers, String.class )

I think the issue is a mismatch in API semantics. In the Spring Messaging contract:

  • toMessage is for outbound serialization: It converts a Java object into a serialized format (like byte[]).
  • fromMessage is for inbound deserialization: It decodes raw network byte[] into a typed Java object.

Since Mqttv5PahoMessageDrivenChannelAdapter is an inbound component handling raw byte[] from the broker, using toMessage here reverses the expected workflow. The default converter chain sees the byte[] input, assumes serialization is already done, and simply envelopes it into a Message<byte[]>, completely ignoring the String.class hint. What's worse, if the converter sees the contentType=application/json, it assumes the byte[] input is binary data (e.g., image, audio), then encodes the data in base64 and wraps it in JSON quotes.

Possible Solutions

Here are a few ways we could fix this:

1. Replace toMessage with fromMessage

This cleanly fixes the problem:

// Conceptual approach:
Message<?> rawMessage = new GenericMessage<>(payload, headers);
Object decodedPayload = this.messageConverter.fromMessage(rawMessage, this.payloadType);
message = new GenericMessage<>(decodedPayload, headers);

But it is a behavior change. It could affect users who rely on the current byte[] output (e.g., users who call setPayloadType(String.class) but consume byte[] downstream).

2. Add an opt-in flag

Add a new setter, e.g., setDecodePayload(boolean), defaulting to false. When true, it uses fromMessage. This preserves backward compatibility.

3. Restrict setPayloadType and update docs

Simply document the legacy behavior and restrict setPayloadType to only accept byte[]/MqttMessage.

I am happy to draft a PR if needed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions