Skip to content

Commit d6ac866

Browse files
garyrussellartembilan
authored andcommitted
GH-2759: Fix CorrelationData.future
* GH-2759: Fix CorrelationData.future Fixes #2759 The outbound endpoints wrap user correlation data in a wrapper. If the user data is a `CorrelationData`, we must delegate methods involving the `Future<?>` and `returnedMessage` to the user data. **cherry-pick to 5.1 and switch AMQP to snapshots** * Polishing - remove redundant override. * Add debug log with null correlation data
1 parent 221393e commit d6ac866

File tree

3 files changed

+39
-3
lines changed

3 files changed

+39
-3
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.messaging.MessageChannel;
4545
import org.springframework.util.Assert;
4646
import org.springframework.util.StringUtils;
47+
import org.springframework.util.concurrent.SettableListenableFuture;
4748

4849
/**
4950
* @author Gary Russell
@@ -479,8 +480,14 @@ protected CorrelationData generateCorrelationData(Message<?> requestMessage) {
479480
if (messageId == null) {
480481
messageId = NO_ID;
481482
}
482-
correlationData = new CorrelationDataWrapper(messageId.toString(),
483-
this.correlationDataGenerator.processMessage(requestMessage), requestMessage);
483+
Object userData = this.correlationDataGenerator.processMessage(requestMessage);
484+
if (userData != null) {
485+
correlationData = new CorrelationDataWrapper(messageId.toString(), userData, requestMessage);
486+
}
487+
else {
488+
this.logger.debug("'confirmCorrelationExpression' resolved to 'null'; "
489+
+ "no publisher confirm will be sent to the ack or nack channel");
490+
}
484491
}
485492
return correlationData;
486493
}
@@ -604,6 +611,23 @@ public Message<?> getMessage() {
604611
return this.message;
605612
}
606613

614+
@Override
615+
public SettableListenableFuture<Confirm> getFuture() {
616+
if (this.userData instanceof CorrelationData) {
617+
return ((CorrelationData) this.userData).getFuture();
618+
}
619+
else {
620+
return super.getFuture();
621+
}
622+
}
623+
624+
@Override
625+
public void setReturnedMessage(org.springframework.amqp.core.Message returnedMessage) {
626+
if (this.userData instanceof CorrelationData) {
627+
((CorrelationData) this.userData).setReturnedMessage(returnedMessage);
628+
}
629+
super.setReturnedMessage(returnedMessage);
630+
}
607631
}
608632

609633
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
routing-key="#{queue.name + queue.name}"
7171
mapped-request-headers="foo*"
7272
amqp-template="amqpTemplateReturns"
73+
confirm-correlation-expression="headers['corrData']"
7374
return-channel="returnChannel" />
7475

7576
<int:channel id="returnRequestChannel"/>

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.concurrent.TimeUnit;
22+
2123
import org.junit.Rule;
2224
import org.junit.Test;
2325
import org.junit.runner.RunWith;
2426

2527
import org.springframework.amqp.core.Queue;
2628
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
29+
import org.springframework.amqp.rabbit.connection.CorrelationData;
30+
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
2731
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2832
import org.springframework.amqp.rabbit.junit.BrokerRunning;
2933
import org.springframework.amqp.support.AmqpHeaders;
@@ -143,11 +147,18 @@ public void adapterWithPublisherConfirms() throws Exception {
143147
@Test
144148
public void adapterWithReturns() throws Exception {
145149
this.withReturns.setErrorMessageStrategy(null);
146-
Message<?> message = MessageBuilder.withPayload("hello").build();
150+
CorrelationData corrData = new CorrelationData("adapterWithReturns");
151+
Message<?> message = MessageBuilder.withPayload("hello")
152+
.setHeader("corrData", corrData)
153+
.build();
147154
this.returnRequestChannel.send(message);
148155
Message<?> returned = returnChannel.receive(10000);
149156
assertThat(returned).isNotNull();
150157
assertThat(returned.getPayload()).isEqualTo(message.getPayload());
158+
Confirm confirm = corrData.getFuture().get(10, TimeUnit.SECONDS);
159+
assertThat(confirm).isNotNull();
160+
assertThat(confirm.isAck()).isTrue();
161+
assertThat(corrData.getReturnedMessage()).isNotNull();
151162
}
152163

153164
@Test

0 commit comments

Comments
 (0)