Skip to content

Commit cb1b1c8

Browse files
garyrussellartembilan
authored andcommitted
GH-2759: Fix CorrelationData.future
* GH-2759: Fix CorrelationData.future Fixes #2759 The outbound endpoints wrap user correlation data in a wrapper. If the user data is a `CorrelationData`, we must delegate methods involving the `Future<?>` and `returnedMessage` to the user data. **cherry-pick to 5.1 and switch AMQP to snapshots** * Polishing - remove redundant override. * Add debug log with null correlation data # Conflicts: # spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java * Updated Spring AMQP dependency to `2.1.5.BUILD-SNAPSHOT` * Moved `AmqpOutboundEndpointTests` assertions to `AssertJ` to avoid conflicts with `master`
1 parent 40fd8d3 commit cb1b1c8

File tree

4 files changed

+70
-38
lines changed

4 files changed

+70
-38
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ subprojects { subproject ->
134134
romeToolsVersion = '1.9.0'
135135
servletApiVersion = '4.0.0'
136136
smackVersion = '4.3.1'
137-
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.1.4.RELEASE'
137+
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.1.5.BUILD-SNAPSHOT'
138138
springDataJpaVersion = '2.1.5.RELEASE'
139139
springDataMongoVersion = '2.1.5.RELEASE'
140140
springDataRedisVersion = '2.1.5.RELEASE'

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.springframework.messaging.MessageChannel;
4545
import org.springframework.util.Assert;
4646
import org.springframework.util.StringUtils;
47+
import org.springframework.util.concurrent.SettableListenableFuture;
4748

4849
/**
4950
* @author Gary Russell
@@ -479,8 +480,14 @@ protected CorrelationData generateCorrelationData(Message<?> requestMessage) {
479480
if (messageId == null) {
480481
messageId = NO_ID;
481482
}
482-
correlationData = new CorrelationDataWrapper(messageId.toString(),
483-
this.correlationDataGenerator.processMessage(requestMessage), requestMessage);
483+
Object userData = this.correlationDataGenerator.processMessage(requestMessage);
484+
if (userData != null) {
485+
correlationData = new CorrelationDataWrapper(messageId.toString(), userData, requestMessage);
486+
}
487+
else {
488+
this.logger.debug("'confirmCorrelationExpression' resolved to 'null'; "
489+
+ "no publisher confirm will be sent to the ack or nack channel");
490+
}
484491
}
485492
return correlationData;
486493
}
@@ -604,6 +611,23 @@ public Message<?> getMessage() {
604611
return this.message;
605612
}
606613

614+
@Override
615+
public SettableListenableFuture<Confirm> getFuture() {
616+
if (this.userData instanceof CorrelationData) {
617+
return ((CorrelationData) this.userData).getFuture();
618+
}
619+
else {
620+
return super.getFuture();
621+
}
622+
}
623+
624+
@Override
625+
public void setReturnedMessage(org.springframework.amqp.core.Message returnedMessage) {
626+
if (this.userData instanceof CorrelationData) {
627+
((CorrelationData) this.userData).setReturnedMessage(returnedMessage);
628+
}
629+
super.setReturnedMessage(returnedMessage);
630+
}
607631
}
608632

609633
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests-context.xml

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
routing-key="#{queue.name + queue.name}"
7171
mapped-request-headers="foo*"
7272
amqp-template="amqpTemplateReturns"
73+
confirm-correlation-expression="headers['corrData']"
7374
return-channel="returnChannel" />
7475

7576
<int:channel id="returnRequestChannel"/>

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests.java

+42-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,18 +16,18 @@
1616

1717
package org.springframework.integration.amqp.outbound;
1818

19-
import static org.hamcrest.Matchers.instanceOf;
20-
import static org.junit.Assert.assertEquals;
21-
import static org.junit.Assert.assertNotNull;
22-
import static org.junit.Assert.assertSame;
23-
import static org.junit.Assert.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.concurrent.TimeUnit;
2422

2523
import org.junit.Rule;
2624
import org.junit.Test;
2725
import org.junit.runner.RunWith;
2826

2927
import org.springframework.amqp.core.Queue;
3028
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
29+
import org.springframework.amqp.rabbit.connection.CorrelationData;
30+
import org.springframework.amqp.rabbit.connection.CorrelationData.Confirm;
3131
import org.springframework.amqp.rabbit.core.RabbitTemplate;
3232
import org.springframework.amqp.rabbit.junit.BrokerRunning;
3333
import org.springframework.amqp.support.AmqpHeaders;
@@ -108,24 +108,24 @@ public void testGatewayPublisherConfirms() throws Exception {
108108
.build();
109109
this.pcRequestChannel.send(message);
110110
Message<?> ack = this.ackChannel.receive(10000);
111-
assertNotNull(ack);
112-
assertEquals("foo", ack.getPayload());
113-
assertEquals(Boolean.TRUE, ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM));
111+
assertThat(ack).isNotNull();
112+
assertThat(ack.getPayload()).isEqualTo("foo");
113+
assertThat(ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM)).isEqualTo(Boolean.TRUE);
114114

115115
org.springframework.amqp.core.Message received = this.amqpTemplateConfirms.receive(this.queue.getName());
116-
assertEquals("\"hello\"", new String(received.getBody(), "UTF-8"));
117-
assertEquals("application/json", received.getMessageProperties().getContentType());
118-
assertEquals("java.lang.String", received.getMessageProperties().getHeaders()
119-
.get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")));
116+
assertThat(new String(received.getBody(), "UTF-8")).isEqualTo("\"hello\"");
117+
assertThat(received.getMessageProperties().getContentType()).isEqualTo("application/json");
118+
assertThat(received.getMessageProperties().getHeaders()
119+
.get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, ""))).isEqualTo("java.lang.String");
120120

121121
// test whole message is correlation
122122
message = MessageBuilder.withPayload("hello")
123123
.build();
124124
this.pcMessageCorrelationRequestChannel.send(message);
125125
ack = ackChannel.receive(10000);
126-
assertNotNull(ack);
127-
assertSame(message.getPayload(), ack.getPayload());
128-
assertEquals(Boolean.TRUE, ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM));
126+
assertThat(ack).isNotNull();
127+
assertThat(ack.getPayload()).isSameAs(message.getPayload());
128+
assertThat(ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM)).isEqualTo(Boolean.TRUE);
129129

130130
while (this.amqpTemplateConfirms.receive(this.queue.getName()) != null) {
131131
// drain
@@ -139,31 +139,38 @@ public void adapterWithPublisherConfirms() throws Exception {
139139
.build();
140140
this.pcRequestChannelForAdapter.send(message);
141141
Message<?> ack = this.ackChannel.receive(10000);
142-
assertNotNull(ack);
143-
assertEquals("foo", ack.getPayload());
144-
assertEquals(Boolean.TRUE, ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM));
142+
assertThat(ack).isNotNull();
143+
assertThat(ack.getPayload()).isEqualTo("foo");
144+
assertThat(ack.getHeaders().get(AmqpHeaders.PUBLISH_CONFIRM)).isEqualTo(Boolean.TRUE);
145145
}
146146

147147
@Test
148148
public void adapterWithReturns() throws Exception {
149149
this.withReturns.setErrorMessageStrategy(null);
150-
Message<?> message = MessageBuilder.withPayload("hello").build();
150+
CorrelationData corrData = new CorrelationData("adapterWithReturns");
151+
Message<?> message = MessageBuilder.withPayload("hello")
152+
.setHeader("corrData", corrData)
153+
.build();
151154
this.returnRequestChannel.send(message);
152155
Message<?> returned = returnChannel.receive(10000);
153-
assertNotNull(returned);
154-
assertEquals(message.getPayload(), returned.getPayload());
156+
assertThat(returned).isNotNull();
157+
assertThat(returned.getPayload()).isEqualTo(message.getPayload());
158+
Confirm confirm = corrData.getFuture().get(10, TimeUnit.SECONDS);
159+
assertThat(confirm).isNotNull();
160+
assertThat(confirm.isAck()).isTrue();
161+
assertThat(corrData.getReturnedMessage()).isNotNull();
155162
}
156163

157164
@Test
158165
public void adapterWithReturnsAndErrorMessageStrategy() throws Exception {
159166
Message<?> message = MessageBuilder.withPayload("hello").build();
160167
this.returnRequestChannel.send(message);
161168
Message<?> returned = returnChannel.receive(10000);
162-
assertNotNull(returned);
163-
assertThat(returned, instanceOf(ErrorMessage.class));
164-
assertThat(returned.getPayload(), instanceOf(ReturnedAmqpMessageException.class));
169+
assertThat(returned).isNotNull();
170+
assertThat(returned).isInstanceOf(ErrorMessage.class);
171+
assertThat(returned.getPayload()).isInstanceOf(ReturnedAmqpMessageException.class);
165172
ReturnedAmqpMessageException payload = (ReturnedAmqpMessageException) returned.getPayload();
166-
assertEquals(message.getPayload(), payload.getFailedMessage().getPayload());
173+
assertThat(payload.getFailedMessage().getPayload()).isEqualTo(message.getPayload());
167174
}
168175

169176
@Test
@@ -178,18 +185,18 @@ public void adapterWithContentType() throws Exception {
178185
.build();
179186
this.ctRequestChannel.send(message);
180187
org.springframework.amqp.core.Message m = receive(template);
181-
assertNotNull(m);
182-
assertEquals("\"hello\"", new String(m.getBody(), "UTF-8"));
183-
assertEquals("application/json", m.getMessageProperties().getContentType());
184-
assertEquals("java.lang.String",
185-
m.getMessageProperties().getHeaders().get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")));
188+
assertThat(m).isNotNull();
189+
assertThat(new String(m.getBody(), "UTF-8")).isEqualTo("\"hello\"");
190+
assertThat(m.getMessageProperties().getContentType()).isEqualTo("application/json");
191+
assertThat(m.getMessageProperties().getHeaders().get(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, "")))
192+
.isEqualTo("java.lang.String");
186193
message = MessageBuilder.withPayload("hello")
187194
.build();
188195
this.ctRequestChannel.send(message);
189196
m = receive(template);
190-
assertNotNull(m);
191-
assertEquals("hello", new String(m.getBody(), "UTF-8"));
192-
assertEquals("text/plain", m.getMessageProperties().getContentType());
197+
assertThat(m).isNotNull();
198+
assertThat(new String(m.getBody(), "UTF-8")).isEqualTo("hello");
199+
assertThat(m.getMessageProperties().getContentType()).isEqualTo("text/plain");
193200
while (template.receive() != null) {
194201
// drain
195202
}
@@ -202,7 +209,7 @@ private org.springframework.amqp.core.Message receive(RabbitTemplate template) t
202209
Thread.sleep(100);
203210
message = template.receive();
204211
}
205-
assertNotNull(message);
212+
assertThat(message).isNotNull();
206213
return message;
207214
}
208215

0 commit comments

Comments
 (0)