Skip to content

Wrong content type in reply message #3089

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
garyrussell opened this issue Oct 27, 2019 · 0 comments · Fixed by #3091
Closed

Wrong content type in reply message #3089

garyrussell opened this issue Oct 27, 2019 · 0 comments · Fixed by #3091

Comments

@garyrussell
Copy link
Contributor

@mvh77 commented on Sun Oct 27 2019

Verisions: Spring AMQP 2.2.0, Spring 5.2.0

I have the following simple test:

package com.test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, Jackson2JsonMessageConverter converter) {
        return args -> {
            Object o = template.convertSendAndReceive("myqueue", new TestPojo("john doe"));
            if(o != null) System.err.println("received type: " + o.getClass().getSimpleName());
        };
    }

    @Bean
    public IntegrationFlow readMessages(ConnectionFactory cf, Jackson2JsonObjectMapper mapper, GenericHandler<TestPojo> handler) {

        return IntegrationFlows
            .from(Amqp.inboundGateway(cf, "myqueue"))
            .transform(Transformers.fromJson(TestPojo.class, mapper))
            .handle(handler)
            .transform(Transformers.toJson(mapper))
            .logAndReply();
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter() {
            @Override
            public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
                message.getMessageProperties().setContentType("application/json");
                return super.fromMessage(message, conversionHint);
            }

            // cannot override toMessage as it is final in AbstractMessageConverter
            // public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException{}
        };
    }

    @Bean
    public Jackson2JsonObjectMapper jsonObjectMapper() {
        return new Jackson2JsonObjectMapper();
    }

    @Bean
    public GenericHandler<TestPojo> handler() {
        return (tp, messageHeaders) -> {
            System.err.println("handling test pojo with headers " + messageHeaders);
            return new TestPojo(new StringBuilder(tp.getName()).reverse().toString());
        };
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
}

I managed to make this work by overriding Jackson2JsonMessageConverter#fromMessage that sets the content type before processing. However, the content type in the RabbitMQ reply message is text/plain. Here's what the RabbitMQ firehose log shows:

================================================================================
2019-10-27 13:07:34:556: Message published

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      2
Exchange:     
Routing keys: [<<"myqueue">>]
Routed queues: [<<"myqueue">>]
Properties:   [{<<"reply_to">>,longstr,
                <<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"application/json">>}]
Payload: 
{"name":"john doe"}

================================================================================
2019-10-27 13:07:34:556: Message received

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      1
Exchange:     
Routing keys: [<<"myqueue">>]
Queue:        myqueue
Properties:   [{<<"reply_to">>,longstr,
                <<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"application/json">>}]
Payload: 
{"name":"john doe"}

================================================================================
2019-10-27 13:07:34:618: Message published

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      3
Exchange:     
Routing keys: [<<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>]
Routed queues: []
Properties:   [{<<"timestamp">>,signedint,1572181654},
               {<<"message_id">>,longstr,
                <<"294262a8-ce42-594b-503c-5bdea9e6946c">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"_resolvableType">>,longstr,<<"com.test.TestPojo">>},
                 {<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"text/plain">>}]
Payload: 
{"name":"eod nhoj"}

Here's the Spring logs:

15:07:34.503 [main] INFO  com.test.Application - Started Application in 1.894 seconds (JVM running for 2.291)
15:07:34.536 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
15:07:34.537 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$363/620412175 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826]
15:07:34.540 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
15:07:34.541 [main] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Starting Rabbit listener container.
15:07:34.545 [main] INFO  o.s.a.r.l.DirectReplyToMessageListenerContainer - Container initialized for queues: [amq.rabbitmq.reply-to]
15:07:34.549 [pool-1-thread-4] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - New SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] consumeOk
15:07:34.549 [main] INFO  o.s.a.r.l.DirectReplyToMessageListenerContainer - SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] started
15:07:34.553 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Sending message with tag 1
15:07:34.554 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'{"name":"john doe"}' MessageProperties [headers={__TypeId__=com.test.TestPojo}, correlationId=1, replyTo=amq.rabbitmq.reply-to, contentType=application/json, contentEncoding=UTF-8, contentLength=19, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [myqueue]
15:07:34.558 [pool-1-thread-5] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-TrIyJPUcGKjimpcNCkrTaw' with deliveryTag: '1' in Consumer@48c4245d: tags=[[amq.ctag-TrIyJPUcGKjimpcNCkrTaw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826], acknowledgeMode=AUTO local queue size=0
15:07:34.559 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'{"name":"john doe"}' MessageProperties [headers={__TypeId__=com.test.TestPojo}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myqueue, deliveryTag=1, consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, consumerQueue=myqueue])
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json__TypeId__] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_replyTo] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_correlationId] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[contentType] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[__TypeId__] WILL be mapped, matched pattern=*
15:07:34.576 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.577 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.t.MessageTransformingHandler - bean 'readMessages.json-to-object-transformer#1' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.604 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#1'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.604 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@5215cd9a] (readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#1) received message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.605 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.605 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.t.MessageTransformingHandler - bean 'readMessages.object-to-json-transformer#1' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#4'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.handler.LoggingHandler - bean 'readMessages.logging-channel-adapter#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] INFO  o.s.i.handler.LoggingHandler - GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.handler.BridgeHandler - bean 'readMessages.bridge#0' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#3'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#4'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#1'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json_resolvableType] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_replyTo] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_consumerQueue] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json__TypeId__] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_correlationId] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_consumerTag] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[contentType] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[__TypeId__] WILL be mapped, matched pattern=*
15:07:34.614 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,3)
15:07:34.614 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$423/1519497130 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826]
15:07:34.617 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, correlationId=1, contentType=text/plain, contentEncoding=UTF-8, contentLength=19, deliveryMode=PERSISTENT, priority=0, redelivered=false, receivedRoutingKey=myqueue, deliveryTag=1])] on exchange [], routingKey = [amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==]
15:07:34.619 [pool-1-thread-6] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] received (Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, correlationId=1, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, deliveryTag=1, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA, consumerQueue=amq.rabbitmq.reply-to])
15:07:34.619 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Reply: (Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, deliveryTag=1, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA, consumerQueue=amq.rabbitmq.reply-to])
15:08:39.545 [rabbitTemplate#0-consumerMonitor-1] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Reducing idle consumes by 1
15:08:39.545 [rabbitTemplate#0-consumerMonitor-1] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Canceling SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d]
15:08:39.547 [pool-1-thread-7] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - CancelOk SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d]
15:08:39.547 [pool-1-thread-7] DEBUG o.s.a.r.c.CachingConnectionFactory - Closing cached Channel: AMQChannel(amqp://[email protected]:5672/,2)

I feel like there's a problem here where the new MessageProperties have a contentType of text/plain. I don't see how to configure this since the toMessage method of Jackson2JsonMessageConverter cannot be overriden.


@garyrussell commented on Sun Oct 27 2019

This is a Spring Integration issue, not Spring AMQP.

        return IntegrationFlows
            .from(Amqp.inboundGateway(cf, "myqueue"))
            .transform(Transformers.fromJson(TestPojo.class, mapper))
            .handle(handler)
            .transform(Transformers.toJson(mapper))
            .logAndReply();

Your inbound gateway needs to be configured with the jackson converter (and remove the transformers). The gateway has a SimpleMessageConverter by default when it just sees a String it sets the content type to text/plain.

The outbound endpoints have a property headersMappedLast which allows the content type header from the message to be used.

	/**
	 * When mapping headers for the outbound message, determine whether the headers are
	 * mapped before the message is converted, or afterwards. This only affects headers
	 * that might be added by the message converter. When false, the converter's headers
	 * win; when true, any headers added by the converter will be overridden (if the
	 * source message has a header that maps to those headers). You might wish to set this
	 * to true, for example, when using a
	 * {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a
	 * String payload that contains json; the converter will set the content type to
	 * {@code text/plain} which can be overridden to {@code application/json} by setting
	 * the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false.
	 * @param headersMappedLast true if headers are mapped after conversion.
	 * @since 5.0
	 */
	public void setHeadersMappedLast(boolean headersMappedLast) {
		this.headersMappedLast = headersMappedLast;
	}

We should have a similar property for the reply handling in the inbound gateway.

Moving this to Spring Integration.

@garyrussell garyrussell added this to the 5.2.1 milestone Oct 27, 2019
@artembilan artembilan self-assigned this Oct 28, 2019
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 28, 2019
Fixes spring-projects#3089

In some use-case we would like to control when headers from SI message
should be populated into an AMQP message.
One of the use-case is like a `SimpleMessageConverter` and its `plain/text`
for the String reply, meanwhile we know that this content is an
`application/json`.
So, with a new `replyHeadersMappedLast` we can override the mentioned
`content-type` header, populated by the `MessageConverter` with an
actual value from the message headers populated in the flow upstream

* Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it
on the DSL and XML level
* Use newly introduced `MappingUtils.mapReplyMessage()`
* Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all
when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter`
result)
* Also skip `JsonHeaders` when we `populateUserDefinedHeader()`

**Cherry-pick to 5.1.x**
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 31, 2019
Fixes spring-projects#3089

In some use-case we would like to control when headers from SI message
should be populated into an AMQP message.
One of the use-case is like a `SimpleMessageConverter` and its `plain/text`
for the String reply, meanwhile we know that this content is an
`application/json`.
So, with a new `replyHeadersMappedLast` we can override the mentioned
`content-type` header, populated by the `MessageConverter` with an
actual value from the message headers populated in the flow upstream

* Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it
on the DSL and XML level
* Use newly introduced `MappingUtils.mapReplyMessage()`
* Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all
when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter`
result)
* Also skip `JsonHeaders` when we `populateUserDefinedHeader()`

**Cherry-pick to 5.1.x**
garyrussell pushed a commit that referenced this issue Oct 31, 2019
* GH-3089: Add AmqpInGateway.replyHeadersMappedLast

Fixes #3089

In some use-case we would like to control when headers from SI message
should be populated into an AMQP message.
One of the use-case is like a `SimpleMessageConverter` and its `plain/text`
for the String reply, meanwhile we know that this content is an
`application/json`.
So, with a new `replyHeadersMappedLast` we can override the mentioned
`content-type` header, populated by the `MessageConverter` with an
actual value from the message headers populated in the flow upstream

* Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it
on the DSL and XML level
* Use newly introduced `MappingUtils.mapReplyMessage()`
* Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all
when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter`
result)
* Also skip `JsonHeaders` when we `populateUserDefinedHeader()`

**Cherry-pick to 5.1.x**

* * Fix language and package typos
* Add missed `@param` in JavaDoc of the `AmqpBaseInboundGatewaySpec.batchingStrategy()`
* Extract a `RabbitTemplate` `MessageConverter` to use for reply messages
conversion - pursue a backward compatibility
garyrussell pushed a commit that referenced this issue Oct 31, 2019
* GH-3089: Add AmqpInGateway.replyHeadersMappedLast

Fixes #3089

In some use-case we would like to control when headers from SI message
should be populated into an AMQP message.
One of the use-case is like a `SimpleMessageConverter` and its `plain/text`
for the String reply, meanwhile we know that this content is an
`application/json`.
So, with a new `replyHeadersMappedLast` we can override the mentioned
`content-type` header, populated by the `MessageConverter` with an
actual value from the message headers populated in the flow upstream

* Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it
on the DSL and XML level
* Use newly introduced `MappingUtils.mapReplyMessage()`
* Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all
when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter`
result)
* Also skip `JsonHeaders` when we `populateUserDefinedHeader()`

**Cherry-pick to 5.1.x**

* * Fix language and package typos
* Add missed `@param` in JavaDoc of the `AmqpBaseInboundGatewaySpec.batchingStrategy()`
* Extract a `RabbitTemplate` `MessageConverter` to use for reply messages
conversion - pursue a backward compatibility
garyrussell added a commit that referenced this issue Oct 31, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants