diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java index e911f9fa4f8..e258360c3fb 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java @@ -75,6 +75,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L); + private boolean bindSourceMessage; + public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "listenerContainer must not be null"); Assert.isNull(listenerContainer.getMessageListener(), @@ -132,6 +134,16 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) { this.batchingStrategy = batchingStrategy; } + /** + * Set to true to bind the source message in the header named + * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. + * @param bindSourceMessage true to bind. + * @since 5.1.6 + */ + public void setBindSourceMessage(boolean bindSourceMessage) { + this.bindSourceMessage = bindSourceMessage; + } + @Override public String getComponentType() { return "amqp:inbound-channel-adapter"; @@ -274,6 +286,9 @@ private org.springframework.messaging.Message createMessage(Message mess if (AmqpInboundChannelAdapter.this.retryTemplate != null) { headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger()); } + if (AmqpInboundChannelAdapter.this.bindSourceMessage) { + headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message); + } final org.springframework.messaging.Message messagingMessage = getMessageBuilderFactory() .withPayload(payload) .copyHeaders(headers) diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java index 1cefa6beb06..edaf47c7b41 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java @@ -87,6 +87,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport { private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L); + private boolean bindSourceMessage; + public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) { this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false); } @@ -192,6 +194,16 @@ public void setBatchingStrategy(BatchingStrategy batchingStrategy) { this.batchingStrategy = batchingStrategy; } + /** + * Set to true to bind the source message in the header named + * {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. + * @param bindSourceMessage true to bind. + * @since 5.1.6 + */ + public void setBindSourceMessage(boolean bindSourceMessage) { + this.bindSourceMessage = bindSourceMessage; + } + @Override public String getComponentType() { return "amqp:inbound-gateway"; @@ -320,6 +332,9 @@ private org.springframework.messaging.Message convert(Message message, C if (AmqpInboundGateway.this.retryTemplate != null) { headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger()); } + if (AmqpInboundGateway.this.bindSourceMessage) { + headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message); + } } catch (RuntimeException e) { MessageChannel errorChannel = getErrorChannel(); diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java index 7c009f66431..d2ee4f62e6d 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java @@ -148,10 +148,11 @@ protected boolean isRawMessageHeader() { } /** - * Set to true to include the raw spring-amqp message as a header - * with key {@link AmqpMessageHeaderErrorMessageStrategy#AMQP_RAW_MESSAGE}, - * enabling callers to have access to the message to process errors. - * @param rawMessageHeader true to include the header. + * Set to true to include the raw spring-amqp message as a header with key + * {@link AmqpMessageHeaderErrorMessageStrategy#AMQP_RAW_MESSAGE}, enabling callers to + * have access to the message to process errors. The raw message is also added to the + * common header {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}. + * @param rawMessageHeader true to include the headers. */ public void setRawMessageHeader(boolean rawMessageHeader) { this.rawMessageHeader = rawMessageHeader; @@ -210,6 +211,7 @@ protected AbstractIntegrationMessageBuilder doReceive() { .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback); if (this.rawMessageHeader) { builder.setHeader(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage); + builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, amqpMessage); } return builder; } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java index ba7b0272675..a63115813dd 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java @@ -34,6 +34,7 @@ import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.integration.IntegrationMessageHeaderAccessor; import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.acks.AcknowledgmentCallback.Status; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; @@ -74,6 +75,8 @@ public void testAck() throws Exception { Message received = source.receive(); assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)) .isInstanceOf(org.springframework.amqp.core.Message.class); + assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA)) + .isSameAs(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)); assertThat(received.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE)).isEqualTo("foo"); // make sure channel is not cached org.springframework.amqp.rabbit.connection.Connection conn = ccf.createConnection(); diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java index 36083dc2266..553689a9341 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java @@ -98,6 +98,7 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception { adapter.setOutputChannel(channel); adapter.setBeanFactory(mock(BeanFactory.class)); + adapter.setBindSourceMessage(true); adapter.afterPropertiesSet(); Object payload = new Foo("bar1"); @@ -120,6 +121,8 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception { assertThat(result.getHeaders().get(AmqpHeaders.CHANNEL)).isSameAs(rabbitChannel); assertThat(result.getHeaders().get(AmqpHeaders.DELIVERY_TAG)).isEqualTo(123L); + org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(result); + assertThat(sourceData).isSameAs(amqpMessage); } @Test @@ -153,6 +156,8 @@ public void testInt2809JavaTypePropertiesFromAmqp() throws Exception { Message result = new JsonToObjectTransformer().transform(receive); assertThat(result.getPayload()).isEqualTo(payload); + org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(result); + assertThat(sourceData).isNull(); } @Test @@ -409,10 +414,11 @@ public void testBatchdAdapter() throws Exception { public void testBatchGateway() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); container.setDeBatchingEnabled(false); - AmqpInboundGateway adapter = new AmqpInboundGateway(container); + AmqpInboundGateway gateway = new AmqpInboundGateway(container); QueueChannel out = new QueueChannel(); - adapter.setRequestChannel(out); - adapter.afterPropertiesSet(); + gateway.setRequestChannel(out); + gateway.setBindSourceMessage(true); + gateway.afterPropertiesSet(); ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener(); SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L); MessageProperties messageProperties = new MessageProperties(); @@ -426,6 +432,8 @@ public void testBatchGateway() throws Exception { Message received = out.receive(); assertThat(received).isNotNull(); assertThat(((List) received.getPayload())).contains("test1", "test2"); + org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(received); + assertThat(sourceData).isSameAs(batched.getMessage()); } public static class Foo { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java b/spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java index 86bab5119f5..103d6782f4b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java @@ -67,6 +67,11 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor { public static final String ACKNOWLEDGMENT_CALLBACK = "acknowledgmentCallback"; + /** + * Raw source message. + */ + public static final String SOURCE_DATA = "sourceData"; + private static final BiFunction TYPE_VERIFY_MESSAGE_FUNCTION = (name, trailer) -> "The '" + name + trailer; @@ -153,6 +158,18 @@ public AtomicInteger getDeliveryAttempt() { return getHeader(DELIVERY_ATTEMPT, AtomicInteger.class); } + /** + * Get the source data header, if present. + * @param the data type. + * @return the source header. + * @since 5.1.6 + */ + @SuppressWarnings("unchecked") + @Nullable + public T getSourceData() { + return (T) getHeader(SOURCE_DATA); + } + @SuppressWarnings("unchecked") @Nullable public T getHeader(String key, Class type) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java b/spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java index 717868565b0..fa77f6de678 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java @@ -107,4 +107,10 @@ public static AcknowledgmentCallback getAcknowledgmentCallback(Message messag AcknowledgmentCallback.class); } + @SuppressWarnings("unchecked") + @Nullable + public static T getSourceData(Message message) { + return (T) message.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA); + } + }