diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java index 0cfa8b0a020..5ba1806b3c7 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java @@ -44,6 +44,7 @@ import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; import org.springframework.util.StringUtils; +import org.springframework.util.concurrent.SettableListenableFuture; /** * @author Gary Russell @@ -479,8 +480,14 @@ protected CorrelationData generateCorrelationData(Message requestMessage) { if (messageId == null) { messageId = NO_ID; } - correlationData = new CorrelationDataWrapper(messageId.toString(), - this.correlationDataGenerator.processMessage(requestMessage), requestMessage); + Object userData = this.correlationDataGenerator.processMessage(requestMessage); + if (userData != null) { + correlationData = new CorrelationDataWrapper(messageId.toString(), userData, requestMessage); + } + else { + this.logger.debug("'confirmCorrelationExpression' resolved to 'null'; " + + "no publisher confirm will be sent to the ack or nack channel"); + } } return correlationData; } @@ -604,6 +611,23 @@ public Message getMessage() { return this.message; } + @Override + public SettableListenableFuture getFuture() { + if (this.userData instanceof CorrelationData) { + return ((CorrelationData) this.userData).getFuture(); + } + else { + return super.getFuture(); + } + } + + @Override + public void setReturnedMessage(org.springframework.amqp.core.Message returnedMessage) { + if (this.userData instanceof CorrelationData) { + ((CorrelationData) this.userData).setReturnedMessage(returnedMessage); + } + super.setReturnedMessage(returnedMessage); + } } } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml index 7fef86f1bc0..6e2d6282d18 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml @@ -70,6 +70,7 @@ routing-key="#{queue.name + queue.name}" mapped-request-headers="foo*" amqp-template="amqpTemplateReturns" + confirm-correlation-expression="headers['corrData']" return-channel="returnChannel" /> diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java index f2fc836ee4d..66833b74e3b 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java @@ -18,12 +18,16 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.util.concurrent.TimeUnit; + import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.junit.BrokerRunning; import org.springframework.amqp.support.AmqpHeaders; @@ -143,11 +147,18 @@ public void adapterWithPublisherConfirms() throws Exception { @Test public void adapterWithReturns() throws Exception { this.withReturns.setErrorMessageStrategy(null); - Message message = MessageBuilder.withPayload("hello").build(); + CorrelationData corrData = new CorrelationData("adapterWithReturns"); + Message message = MessageBuilder.withPayload("hello") + .setHeader("corrData", corrData) + .build(); this.returnRequestChannel.send(message); Message returned = returnChannel.receive(10000); assertThat(returned).isNotNull(); assertThat(returned.getPayload()).isEqualTo(message.getPayload()); + Confirm confirm = corrData.getFuture().get(10, TimeUnit.SECONDS); + assertThat(confirm).isNotNull(); + assertThat(confirm.isAck()).isTrue(); + assertThat(corrData.getReturnedMessage()).isNotNull(); } @Test