Skip to content

Wrong content type in reply message #1112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mvh77 opened this issue Oct 27, 2019 · 5 comments
Closed

Wrong content type in reply message #1112

mvh77 opened this issue Oct 27, 2019 · 5 comments

Comments

@mvh77
Copy link

mvh77 commented Oct 27, 2019

Verisions: Spring AMQP 2.2.0, Spring 5.2.0

I have the following simple test:

package com.test;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.handler.GenericHandler;
import org.springframework.integration.support.json.Jackson2JsonObjectMapper;

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, Jackson2JsonMessageConverter converter) {
        return args -> {
            Object o = template.convertSendAndReceive("myqueue", new TestPojo("john doe"));
            if(o != null) System.err.println("received type: " + o.getClass().getSimpleName());
        };
    }

    @Bean
    public IntegrationFlow readMessages(ConnectionFactory cf, Jackson2JsonObjectMapper mapper, GenericHandler<TestPojo> handler) {

        return IntegrationFlows
            .from(Amqp.inboundGateway(cf, "myqueue"))
            .transform(Transformers.fromJson(TestPojo.class, mapper))
            .handle(handler)
            .transform(Transformers.toJson(mapper))
            .logAndReply();
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter() {
            @Override
            public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
                message.getMessageProperties().setContentType("application/json");
                return super.fromMessage(message, conversionHint);
            }

            // cannot override toMessage as it is final in AbstractMessageConverter
            // public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException{}
        };
    }

    @Bean
    public Jackson2JsonObjectMapper jsonObjectMapper() {
        return new Jackson2JsonObjectMapper();
    }

    @Bean
    public GenericHandler<TestPojo> handler() {
        return (tp, messageHeaders) -> {
            System.err.println("handling test pojo with headers " + messageHeaders);
            return new TestPojo(new StringBuilder(tp.getName()).reverse().toString());
        };
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
}

I managed to make this work by overriding Jackson2JsonMessageConverter#fromMessage that sets the content type before processing. However, the content type in the RabbitMQ reply message is text/plain. Here's what the RabbitMQ firehose log shows:

================================================================================
2019-10-27 13:07:34:556: Message published

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      2
Exchange:     
Routing keys: [<<"myqueue">>]
Routed queues: [<<"myqueue">>]
Properties:   [{<<"reply_to">>,longstr,
                <<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"application/json">>}]
Payload: 
{"name":"john doe"}

================================================================================
2019-10-27 13:07:34:556: Message received

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      1
Exchange:     
Routing keys: [<<"myqueue">>]
Queue:        myqueue
Properties:   [{<<"reply_to">>,longstr,
                <<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"application/json">>}]
Payload: 
{"name":"john doe"}

================================================================================
2019-10-27 13:07:34:618: Message published

Node:         rabbit@9aba3d1b01b3
Connection:   172.18.0.1:48784 -> 172.18.0.2:5672
Virtual host: /
User:         guest
Channel:      3
Exchange:     
Routing keys: [<<"amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==">>]
Routed queues: []
Properties:   [{<<"timestamp">>,signedint,1572181654},
               {<<"message_id">>,longstr,
                <<"294262a8-ce42-594b-503c-5bdea9e6946c">>},
               {<<"correlation_id">>,longstr,<<"1">>},
               {<<"priority">>,signedint,0},
               {<<"delivery_mode">>,signedint,2},
               {<<"headers">>,table,
                [{<<"_resolvableType">>,longstr,<<"com.test.TestPojo">>},
                 {<<"__TypeId__">>,longstr,<<"com.test.TestPojo">>}]},
               {<<"content_encoding">>,longstr,<<"UTF-8">>},
               {<<"content_type">>,longstr,<<"text/plain">>}]
Payload: 
{"name":"eod nhoj"}

Here's the Spring logs:

15:07:34.503 [main] INFO  com.test.Application - Started Application in 1.894 seconds (JVM running for 2.291)
15:07:34.536 [main] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,2)
15:07:34.537 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$363/620412175 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,2), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826]
15:07:34.540 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService
15:07:34.541 [main] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Starting Rabbit listener container.
15:07:34.545 [main] INFO  o.s.a.r.l.DirectReplyToMessageListenerContainer - Container initialized for queues: [amq.rabbitmq.reply-to]
15:07:34.549 [pool-1-thread-4] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - New SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] consumeOk
15:07:34.549 [main] INFO  o.s.a.r.l.DirectReplyToMessageListenerContainer - SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] started
15:07:34.553 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Sending message with tag 1
15:07:34.554 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'{"name":"john doe"}' MessageProperties [headers={__TypeId__=com.test.TestPojo}, correlationId=1, replyTo=amq.rabbitmq.reply-to, contentType=application/json, contentEncoding=UTF-8, contentLength=19, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [myqueue]
15:07:34.558 [pool-1-thread-5] DEBUG o.s.a.r.l.BlockingQueueConsumer - Storing delivery for consumerTag: 'amq.ctag-TrIyJPUcGKjimpcNCkrTaw' with deliveryTag: '1' in Consumer@48c4245d: tags=[[amq.ctag-TrIyJPUcGKjimpcNCkrTaw]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826], acknowledgeMode=AUTO local queue size=0
15:07:34.559 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Received message: (Body:'{"name":"john doe"}' MessageProperties [headers={__TypeId__=com.test.TestPojo}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=myqueue, deliveryTag=1, consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, consumerQueue=myqueue])
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
15:07:34.567 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json__TypeId__] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_replyTo] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_correlationId] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[contentType] WILL be mapped, matched pattern=*
15:07:34.568 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[__TypeId__] WILL be mapped, matched pattern=*
15:07:34.576 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.577 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.t.MessageTransformingHandler - bean 'readMessages.json-to-object-transformer#1' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.604 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#1'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.604 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@5215cd9a] (readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#1) received message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.605 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.605 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.t.MessageTransformingHandler - bean 'readMessages.object-to-json-transformer#1' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'bean 'readMessages.channel#4'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.handler.LoggingHandler - bean 'readMessages.logging-channel-adapter#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.607 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] INFO  o.s.i.handler.LoggingHandler - GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.handler.BridgeHandler - bean 'readMessages.bridge#0' for component 'readMessages.org.springframework.integration.config.ConsumerEndpointFactoryBean#3'; defined in: 'com.test.Application'; from source: 'bean method readMessages' received message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#4'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload={"name":"eod nhoj"}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, json_resolvableType=com.test.TestPojo, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=class com.test.TestPojo, amqp_correlationId=1, id=6245ee85-29fb-02ea-2227-769da870b0ec, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654607}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#2'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='eod nhoj'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=81279664-5dbd-1424-ccb0-0f44885f0122, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#1'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=TestPojo{name='john doe'}, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, amqp_correlationId=1, id=cb02f0ce-ff97-65b3-7dcf-fb3f1dea10e6, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654604}]
15:07:34.608 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'bean 'readMessages.channel#0'; defined in: 'com.test.Application'; from source: 'bean method readMessages'', message: GenericMessage [payload=byte[19], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, amqp_consumerQueue=myqueue, amqp_redelivered=false, amqp_receivedRoutingKey=myqueue, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@ec4ffed, amqp_contentEncoding=UTF-8, json__TypeId__=com.test.TestPojo, amqp_correlationId=1, id=0f20d2df-6a86-d050-0301-6b08b04c1cd3, amqp_consumerTag=amq.ctag-TrIyJPUcGKjimpcNCkrTaw, contentType=application/json, __TypeId__=com.test.TestPojo, timestamp=1572181654576}]
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedDeliveryMode] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json_resolvableType] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_replyTo] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_consumerQueue] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_redelivered] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_contentEncoding] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[json__TypeId__] WILL be mapped, matched pattern=*
15:07:34.609 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_correlationId] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[amqp_consumerTag] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[contentType] WILL be mapped, matched pattern=*
15:07:34.610 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.i.m.AbstractHeaderMapper$HeaderMatcher - headerName=[__TypeId__] WILL be mapped, matched pattern=*
15:07:34.614 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.a.r.c.CachingConnectionFactory - Creating cached Rabbit Channel from AMQChannel(amqp://[email protected]:5672/,3)
15:07:34.614 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$423/1519497130 on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,3), conn: Proxy@56da52a7 Shared Rabbit Connection: SimpleConnection@2eb79cbe [delegate=amqp://[email protected]:5672/, localPort= 33826]
15:07:34.617 [readMessages.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message [(Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, correlationId=1, contentType=text/plain, contentEncoding=UTF-8, contentLength=19, deliveryMode=PERSISTENT, priority=0, redelivered=false, receivedRoutingKey=myqueue, deliveryTag=1])] on exchange [], routingKey = [amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==]
15:07:34.619 [pool-1-thread-6] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d] received (Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, correlationId=1, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, deliveryTag=1, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA, consumerQueue=amq.rabbitmq.reply-to])
15:07:34.619 [main] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Reply: (Body:'{"name":"eod nhoj"}' MessageProperties [headers={_resolvableType=com.test.TestPojo, __TypeId__=com.test.TestPojo}, timestamp=Sun Oct 27 15:07:34 EET 2019, messageId=294262a8-ce42-594b-503c-5bdea9e6946c, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWFiYTNkMWIwMWIzAABN9AAAAAEC.JUCIxsoFwwCcQ6U3DuenxQ==, deliveryTag=1, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA, consumerQueue=amq.rabbitmq.reply-to])
15:08:39.545 [rabbitTemplate#0-consumerMonitor-1] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Reducing idle consumes by 1
15:08:39.545 [rabbitTemplate#0-consumerMonitor-1] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - Canceling SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d]
15:08:39.547 [pool-1-thread-7] DEBUG o.s.a.r.l.DirectReplyToMessageListenerContainer - CancelOk SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-OtvEK6nzPWedFapunG5eIA identity=30506c0d]
15:08:39.547 [pool-1-thread-7] DEBUG o.s.a.r.c.CachingConnectionFactory - Closing cached Channel: AMQChannel(amqp://[email protected]:5672/,2)

I feel like there's a problem here where the new MessageProperties have a contentType of text/plain. I don't see how to configure this since the toMessage method of Jackson2JsonMessageConverter cannot be overriden.

@garyrussell
Copy link
Contributor

This is a Spring Integration issue, not Spring AMQP.

        return IntegrationFlows
            .from(Amqp.inboundGateway(cf, "myqueue"))
            .transform(Transformers.fromJson(TestPojo.class, mapper))
            .handle(handler)
            .transform(Transformers.toJson(mapper))
            .logAndReply();

Your inbound gateway needs to be configured with the jackson converter (and remove the transformers). The gateway has a SimpleMessageConverter by default when it just sees a String it sets the content type to text/plain.

The outbound endpoints have a property headersMappedLast which allows the content type header from the message to be used.

	/**
	 * When mapping headers for the outbound message, determine whether the headers are
	 * mapped before the message is converted, or afterwards. This only affects headers
	 * that might be added by the message converter. When false, the converter's headers
	 * win; when true, any headers added by the converter will be overridden (if the
	 * source message has a header that maps to those headers). You might wish to set this
	 * to true, for example, when using a
	 * {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a
	 * String payload that contains json; the converter will set the content type to
	 * {@code text/plain} which can be overridden to {@code application/json} by setting
	 * the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false.
	 * @param headersMappedLast true if headers are mapped after conversion.
	 * @since 5.0
	 */
	public void setHeadersMappedLast(boolean headersMappedLast) {
		this.headersMappedLast = headersMappedLast;
	}

We should have a similar property for the reply handling in the inbound gateway.

Moving this to Spring Integration.

@garyrussell
Copy link
Contributor

Issue moved to spring-projects/spring-integration #3089 via ZenHub

@garyrussell
Copy link
Contributor

Ignore my last comment (deleted) the converter goes on the gateway not the listener container.

@mvh77
Copy link
Author

mvh77 commented Oct 28, 2019

Thank you for your answer! Is it possible to create a Jackson2JsonMessageConverter that handles a specific type for a generic type: Say I have a type Container and I'd want to create a Jackson2JsonMessageConverter that handles instances of Container because I have a queue that contains only those?

@garyrussell
Copy link
Contributor

There are several ways to influence the types created by the Jackson converter.

  1. Tell Jackson on the sending side to add type information to the JSON itself.
  2. Add a custom Jackson2JavaTypeMapper to the converter.

Please don't use GitHub issues to ask questions; we prefer Stack Overflow (tagged with spring-amqp) because it makes it easier for others to find answers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants