Skip to content

Commit be90fdb

Browse files
garyrussellartembilan
authored andcommitted
AMQP: Add bindSourceMessage property (inbound)
Resolves #2958 # Conflicts: # spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java # spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java # spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java # spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java # spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java
1 parent b73b720 commit be90fdb

File tree

7 files changed

+70
-4
lines changed

7 files changed

+70
-4
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

+15
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
6969

7070
private RecoveryCallback<? extends Object> recoveryCallback;
7171

72+
private boolean bindSourceMessage;
73+
7274
public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
7375
Assert.notNull(listenerContainer, "listenerContainer must not be null");
7476
Assert.isNull(listenerContainer.getMessageListener(),
@@ -116,6 +118,16 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
116118
}
117119

118120

121+
/**
122+
* Set to true to bind the source message in the header named
123+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
124+
* @param bindSourceMessage true to bind.
125+
* @since 5.1.6
126+
*/
127+
public void setBindSourceMessage(boolean bindSourceMessage) {
128+
this.bindSourceMessage = bindSourceMessage;
129+
}
130+
119131
@Override
120132
public String getComponentType() {
121133
return "amqp:inbound-channel-adapter";
@@ -249,6 +261,9 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
249261
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
250262
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
251263
}
264+
if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
265+
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
266+
}
252267
final org.springframework.messaging.Message<Object> messagingMessage = getMessageBuilderFactory()
253268
.withPayload(payload)
254269
.copyHeaders(headers)

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

+15
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
8181

8282
private RecoveryCallback<? extends Object> recoveryCallback;
8383

84+
private boolean bindSourceMessage;
85+
8486
public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
8587
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
8688
}
@@ -175,6 +177,16 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
175177
this.recoveryCallback = recoveryCallback;
176178
}
177179

180+
/**
181+
* Set to true to bind the source message in the header named
182+
* {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
183+
* @param bindSourceMessage true to bind.
184+
* @since 5.1.6
185+
*/
186+
public void setBindSourceMessage(boolean bindSourceMessage) {
187+
this.bindSourceMessage = bindSourceMessage;
188+
}
189+
178190
@Override
179191
public String getComponentType() {
180192
return "amqp:inbound-gateway";
@@ -295,6 +307,9 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
295307
if (AmqpInboundGateway.this.retryTemplate != null) {
296308
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
297309
}
310+
if (AmqpInboundGateway.this.bindSourceMessage) {
311+
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
312+
}
298313
}
299314
catch (RuntimeException e) {
300315
MessageChannel errorChannel = getErrorChannel();

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,11 @@ protected boolean isRawMessageHeader() {
142142
}
143143

144144
/**
145-
* Set to true to include the raw spring-amqp message as a header
146-
* with key {@link AmqpMessageHeaderErrorMessageStrategy#AMQP_RAW_MESSAGE},
147-
* enabling callers to have access to the message to process errors.
148-
* @param rawMessageHeader true to include the header.
145+
* Set to true to include the raw spring-amqp message as a header with key
146+
* {@link AmqpMessageHeaderErrorMessageStrategy#AMQP_RAW_MESSAGE}, enabling callers to
147+
* have access to the message to process errors. The raw message is also added to the
148+
* common header {@link IntegrationMessageHeaderAccessor#SOURCE_DATA}.
149+
* @param rawMessageHeader true to include the headers.
149150
*/
150151
public void setRawMessageHeader(boolean rawMessageHeader) {
151152
this.rawMessageHeader = rawMessageHeader;
@@ -180,6 +181,7 @@ protected AbstractIntegrationMessageBuilder<Object> doReceive() {
180181
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback);
181182
if (this.rawMessageHeader) {
182183
builder.setHeader(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, amqpMessage);
184+
builder.setHeader(IntegrationMessageHeaderAccessor.SOURCE_DATA, amqpMessage);
183185
}
184186
return builder;
185187
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.hamcrest.Matchers.equalTo;
2020
import static org.hamcrest.Matchers.instanceOf;
21+
import static org.hamcrest.Matchers.sameInstance;
2122
import static org.junit.Assert.assertThat;
2223
import static org.mockito.ArgumentMatchers.anyString;
2324
import static org.mockito.ArgumentMatchers.isNull;
@@ -32,6 +33,7 @@
3233

3334
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
3435
import org.springframework.amqp.support.AmqpHeaders;
36+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3537
import org.springframework.integration.StaticMessageHeaderAccessor;
3638
import org.springframework.integration.acks.AcknowledgmentCallback.Status;
3739
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
@@ -46,6 +48,7 @@
4648

4749
/**
4850
* @author Gary Russell
51+
* @author Artem Bilan
4952
*
5053
* @since 5.0.1
5154
*
@@ -73,6 +76,8 @@ public void testAck() throws Exception {
7376
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE),
7477
instanceOf(org.springframework.amqp.core.Message.class));
7578
assertThat(received.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE), equalTo("foo"));
79+
assertThat(received.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA),
80+
sameInstance(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)));
7681
// make sure channel is not cached
7782
org.springframework.amqp.rabbit.connection.Connection conn = ccf.createConnection();
7883
Channel notCached = conn.createChannel(false); // should not have been "closed"

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

+5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
107107

108108
adapter.setOutputChannel(channel);
109109
adapter.setBeanFactory(mock(BeanFactory.class));
110+
adapter.setBindSourceMessage(true);
110111
adapter.afterPropertiesSet();
111112

112113
Object payload = new Foo("bar1");
@@ -129,6 +130,8 @@ public void testInt2809JavaTypePropertiesToAmqp() throws Exception {
129130

130131
assertSame(rabbitChannel, result.getHeaders().get(AmqpHeaders.CHANNEL));
131132
assertEquals(123L, result.getHeaders().get(AmqpHeaders.DELIVERY_TAG));
133+
org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(result);
134+
assertThat(sourceData).isSameAs(amqpMessage);
132135
}
133136

134137
@Test
@@ -162,6 +165,8 @@ public void testInt2809JavaTypePropertiesFromAmqp() throws Exception {
162165
Message<?> result = new JsonToObjectTransformer().transform(receive);
163166

164167
assertEquals(payload, result.getPayload());
168+
org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(result);
169+
assertThat(sourceData).isNull();
165170
}
166171

167172
@Test

spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java

+18
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,12 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {
6666

6767
public static final String ACKNOWLEDGMENT_CALLBACK = "acknowledgmentCallback";
6868

69+
/**
70+
* Raw source message.
71+
*/
72+
public static final String SOURCE_DATA = "sourceData";
73+
74+
6975
private Set<String> readOnlyHeaders = new HashSet<>();
7076

7177
public IntegrationMessageHeaderAccessor(@Nullable Message<?> message) {
@@ -149,6 +155,18 @@ public AtomicInteger getDeliveryAttempt() {
149155
return getHeader(DELIVERY_ATTEMPT, AtomicInteger.class);
150156
}
151157

158+
/**
159+
* Get the source data header, if present.
160+
* @param <T> the data type.
161+
* @return the source header.
162+
* @since 5.1.6
163+
*/
164+
@SuppressWarnings("unchecked")
165+
@Nullable
166+
public <T> T getSourceData() {
167+
return (T) getHeader(SOURCE_DATA);
168+
}
169+
152170
@SuppressWarnings("unchecked")
153171
@Nullable
154172
public <T> T getHeader(String key, Class<T> type) {

spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java

+6
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,10 @@ public static AcknowledgmentCallback getAcknowledgmentCallback(Message<?> messag
107107
AcknowledgmentCallback.class);
108108
}
109109

110+
@SuppressWarnings("unchecked")
111+
@Nullable
112+
public static <T> T getSourceData(Message<?> message) {
113+
return (T) message.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA);
114+
}
115+
110116
}

0 commit comments

Comments
 (0)