Skip to content

Commit 2f4394d

Browse files
authored
Populate a JsonHeaders.RESOLVABLE_TYPE on reply (#3163)
* Populate a JsonHeaders.RESOLVABLE_TYPE on reply Fixes spring-projects/spring-integration-samples#277 In the `AbstractAmqpOutboundEndpoint` the `JsonHeaders.RESOLVABLE_TYPE` from request message is copied to reply message making inconsistency downstream. The `JsonToObjectTransformer` consults first a `JsonHeaders.RESOLVABLE_TYPE` and deserialize payload to wrong type * Fix `DefaultAmqpHeaderMapper` to populate a `JsonHeaders.RESOLVABLE_TYPE` alongside with other `JsonHeaders` populated from the reply AMQP message. This way a `JsonHeaders.RESOLVABLE_TYPE` from request message won't have effect * To get access to classes, supply `AbstractHeaderMapper` with a bean factory `ClassLoader` * Introduce a couple utility methods into `JsonHeaders` for building a `ResolvableType` **Cherry-pick to 5.2.x** * * Fix code formatting for arguments wrapping
1 parent 94c8cf7 commit 2f4394d

File tree

7 files changed

+225
-138
lines changed

7 files changed

+225
-138
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/AbstractAmqpOutboundEndpoint.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
4141
import org.springframework.integration.expression.ValueExpression;
4242
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
4343
import org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor;
44+
import org.springframework.integration.mapping.AbstractHeaderMapper;
4445
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
4546
import org.springframework.integration.support.DefaultErrorMessageStrategy;
4647
import org.springframework.integration.support.ErrorMessageStrategy;
@@ -416,6 +417,10 @@ protected final void doInit() {
416417
configureDelayGenerator(beanFactory);
417418

418419
endpointInit();
420+
421+
if (this.headerMapper instanceof AbstractHeaderMapper) {
422+
((AbstractHeaderMapper<?>) this.headerMapper).setBeanClassLoader(getBeanClassLoader());
423+
}
419424
}
420425

421426
private void configureExchangeNameGenerator(BeanFactory beanFactory) {

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java

+80-67
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -109,40 +109,41 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
109109
Map<String, Object> headers = new HashMap<>();
110110
try {
111111
JavaUtils.INSTANCE
112-
.acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put)
113-
.acceptIfNotNull(AmqpHeaders.CLUSTER_ID, amqpMessageProperties.getClusterId(), headers::put)
114-
.acceptIfNotNull(AmqpHeaders.CONTENT_ENCODING, amqpMessageProperties.getContentEncoding(),
115-
headers::put);
112+
.acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put)
113+
.acceptIfNotNull(AmqpHeaders.CLUSTER_ID, amqpMessageProperties.getClusterId(), headers::put)
114+
.acceptIfNotNull(AmqpHeaders.CONTENT_ENCODING, amqpMessageProperties.getContentEncoding(),
115+
headers::put);
116116
long contentLength = amqpMessageProperties.getContentLength();
117117
JavaUtils.INSTANCE
118-
.acceptIfCondition(contentLength > 0, AmqpHeaders.CONTENT_LENGTH, contentLength, headers::put)
119-
.acceptIfHasText(AmqpHeaders.CONTENT_TYPE, amqpMessageProperties.getContentType(), headers::put)
120-
.acceptIfHasText(AmqpHeaders.CORRELATION_ID, amqpMessageProperties.getCorrelationId(), headers::put)
121-
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELIVERY_MODE, amqpMessageProperties.getReceivedDeliveryMode(),
122-
headers::put);
118+
.acceptIfCondition(contentLength > 0, AmqpHeaders.CONTENT_LENGTH, contentLength, headers::put)
119+
.acceptIfHasText(AmqpHeaders.CONTENT_TYPE, amqpMessageProperties.getContentType(), headers::put)
120+
.acceptIfHasText(AmqpHeaders.CORRELATION_ID, amqpMessageProperties.getCorrelationId(), headers::put)
121+
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELIVERY_MODE,
122+
amqpMessageProperties.getReceivedDeliveryMode(), headers::put);
123123
long deliveryTag = amqpMessageProperties.getDeliveryTag();
124124
JavaUtils.INSTANCE
125-
.acceptIfCondition(deliveryTag > 0, AmqpHeaders.DELIVERY_TAG, deliveryTag, headers::put)
126-
.acceptIfHasText(AmqpHeaders.EXPIRATION, amqpMessageProperties.getExpiration(), headers::put);
125+
.acceptIfCondition(deliveryTag > 0, AmqpHeaders.DELIVERY_TAG, deliveryTag, headers::put)
126+
.acceptIfHasText(AmqpHeaders.EXPIRATION, amqpMessageProperties.getExpiration(), headers::put);
127127
Integer messageCount = amqpMessageProperties.getMessageCount();
128128
JavaUtils.INSTANCE
129-
.acceptIfCondition(messageCount != null && messageCount > 0, AmqpHeaders.MESSAGE_COUNT, messageCount,
130-
headers::put)
131-
.acceptIfHasText(AmqpHeaders.MESSAGE_ID, amqpMessageProperties.getMessageId(), headers::put);
129+
.acceptIfCondition(messageCount != null && messageCount > 0, AmqpHeaders.MESSAGE_COUNT,
130+
messageCount, headers::put)
131+
.acceptIfHasText(AmqpHeaders.MESSAGE_ID, amqpMessageProperties.getMessageId(), headers::put);
132132
Integer priority = amqpMessageProperties.getPriority();
133133
JavaUtils.INSTANCE
134-
.acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY,
135-
priority, headers::put)
136-
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put)
137-
.acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
138-
headers::put)
139-
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
140-
headers::put)
141-
.acceptIfNotNull(AmqpHeaders.REDELIVERED, amqpMessageProperties.isRedelivered(), headers::put)
142-
.acceptIfNotNull(AmqpHeaders.REPLY_TO, amqpMessageProperties.getReplyTo(), headers::put)
143-
.acceptIfNotNull(AmqpHeaders.TIMESTAMP, amqpMessageProperties.getTimestamp(), headers::put)
144-
.acceptIfHasText(AmqpHeaders.TYPE, amqpMessageProperties.getType(), headers::put)
145-
.acceptIfHasText(AmqpHeaders.RECEIVED_USER_ID, amqpMessageProperties.getReceivedUserId(), headers::put);
134+
.acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY,
135+
priority, headers::put)
136+
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put)
137+
.acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
138+
headers::put)
139+
.acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(),
140+
headers::put)
141+
.acceptIfNotNull(AmqpHeaders.REDELIVERED, amqpMessageProperties.isRedelivered(), headers::put)
142+
.acceptIfNotNull(AmqpHeaders.REPLY_TO, amqpMessageProperties.getReplyTo(), headers::put)
143+
.acceptIfNotNull(AmqpHeaders.TIMESTAMP, amqpMessageProperties.getTimestamp(), headers::put)
144+
.acceptIfHasText(AmqpHeaders.TYPE, amqpMessageProperties.getType(), headers::put)
145+
.acceptIfHasText(AmqpHeaders.RECEIVED_USER_ID,
146+
amqpMessageProperties.getReceivedUserId(), headers::put);
146147

147148
for (String jsonHeader : JsonHeaders.HEADERS) {
148149
Object value = amqpMessageProperties.getHeaders().get(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""));
@@ -151,6 +152,7 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
151152
}
152153
}
153154

155+
createJsonResolvableTypHeaderInAny(headers);
154156
}
155157
catch (Exception e) {
156158
if (logger.isWarnEnabled()) {
@@ -160,6 +162,15 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
160162
return headers;
161163
}
162164

165+
private void createJsonResolvableTypHeaderInAny(Map<String, Object> headers) {
166+
Object typeIdHeader = headers.get(JsonHeaders.TYPE_ID);
167+
if (typeIdHeader != null) {
168+
headers.put(JsonHeaders.RESOLVABLE_TYPE,
169+
JsonHeaders.buildResolvableType(getClassLoader(), typeIdHeader,
170+
headers.get(JsonHeaders.CONTENT_TYPE_ID), headers.get(JsonHeaders.KEY_TYPE_ID)));
171+
}
172+
}
173+
163174
/**
164175
* Extract user-defined headers from an AMQP MessageProperties instance.
165176
*/
@@ -186,28 +197,28 @@ protected void populateStandardHeaders(@Nullable Map<String, Object> allHeaders,
186197
MessageProperties amqpMessageProperties) {
187198

188199
JavaUtils.INSTANCE
189-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class),
190-
amqpMessageProperties::setAppId)
191-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class),
192-
amqpMessageProperties::setClusterId)
193-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class),
194-
amqpMessageProperties::setContentEncoding)
195-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class),
196-
amqpMessageProperties::setContentLength)
197-
.acceptIfHasText(this.extractContentTypeAsString(headers),
198-
amqpMessageProperties::setContentType)
199-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CORRELATION_ID, String.class),
200-
amqpMessageProperties::setCorrelationId)
201-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
202-
amqpMessageProperties::setDelay)
203-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
204-
amqpMessageProperties::setDeliveryMode)
205-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
206-
amqpMessageProperties::setDeliveryTag)
207-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class),
208-
amqpMessageProperties::setExpiration)
209-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class),
210-
amqpMessageProperties::setMessageCount);
200+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.APP_ID, String.class),
201+
amqpMessageProperties::setAppId)
202+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CLUSTER_ID, String.class),
203+
amqpMessageProperties::setClusterId)
204+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_ENCODING, String.class),
205+
amqpMessageProperties::setContentEncoding)
206+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.CONTENT_LENGTH, Long.class),
207+
amqpMessageProperties::setContentLength)
208+
.acceptIfHasText(this.extractContentTypeAsString(headers),
209+
amqpMessageProperties::setContentType)
210+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.CORRELATION_ID, String.class),
211+
amqpMessageProperties::setCorrelationId)
212+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class),
213+
amqpMessageProperties::setDelay)
214+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class),
215+
amqpMessageProperties::setDeliveryMode)
216+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class),
217+
amqpMessageProperties::setDeliveryTag)
218+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.EXPIRATION, String.class),
219+
amqpMessageProperties::setExpiration)
220+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_COUNT, Integer.class),
221+
amqpMessageProperties::setMessageCount);
211222
String messageId = getHeaderIfAvailable(headers, AmqpHeaders.MESSAGE_ID, String.class);
212223
if (StringUtils.hasText(messageId)) {
213224
amqpMessageProperties.setMessageId(messageId);
@@ -219,16 +230,17 @@ else if (allHeaders != null) {
219230
}
220231
}
221232
JavaUtils.INSTANCE
222-
.acceptIfNotNull(getHeaderIfAvailable(headers, IntegrationMessageHeaderAccessor.PRIORITY, Integer.class),
223-
amqpMessageProperties::setPriority)
224-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class),
225-
amqpMessageProperties::setReceivedExchange)
226-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class),
227-
amqpMessageProperties::setReceivedRoutingKey)
228-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class),
229-
amqpMessageProperties::setRedelivered)
230-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class),
231-
amqpMessageProperties::setReplyTo);
233+
.acceptIfNotNull(getHeaderIfAvailable(headers, IntegrationMessageHeaderAccessor.PRIORITY,
234+
Integer.class),
235+
amqpMessageProperties::setPriority)
236+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_EXCHANGE, String.class),
237+
amqpMessageProperties::setReceivedExchange)
238+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.RECEIVED_ROUTING_KEY, String.class),
239+
amqpMessageProperties::setReceivedRoutingKey)
240+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REDELIVERED, Boolean.class),
241+
amqpMessageProperties::setRedelivered)
242+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.REPLY_TO, String.class),
243+
amqpMessageProperties::setReplyTo);
232244
Date timestamp = getHeaderIfAvailable(headers, AmqpHeaders.TIMESTAMP, Date.class);
233245
if (timestamp != null) {
234246
amqpMessageProperties.setTimestamp(timestamp);
@@ -240,18 +252,19 @@ else if (allHeaders != null) {
240252
}
241253
}
242254
JavaUtils.INSTANCE
243-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class),
244-
amqpMessageProperties::setType)
245-
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class),
246-
amqpMessageProperties::setUserId);
255+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.TYPE, String.class),
256+
amqpMessageProperties::setType)
257+
.acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.USER_ID, String.class),
258+
amqpMessageProperties::setUserId);
247259

248260
mapJsonHeaders(headers, amqpMessageProperties);
249261

250262
JavaUtils.INSTANCE
251-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_CORRELATION, String.class),
252-
replyCorrelation -> amqpMessageProperties.setHeader("spring_reply_correlation", replyCorrelation))
253-
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_TO_STACK, String.class),
254-
replyToStack -> amqpMessageProperties.setHeader("spring_reply_to", replyToStack));
263+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_CORRELATION, String.class),
264+
replyCorrelation -> amqpMessageProperties
265+
.setHeader("spring_reply_correlation", replyCorrelation))
266+
.acceptIfHasText(getHeaderIfAvailable(headers, AmqpHeaders.SPRING_REPLY_TO_STACK, String.class),
267+
replyToStack -> amqpMessageProperties.setHeader("spring_reply_to", replyToStack));
255268
}
256269

257270
private void mapJsonHeaders(Map<String, Object> headers, MessageProperties amqpMessageProperties) {

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/OutboundEndpointTests.java

+37-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,9 +27,10 @@
2727
import static org.mockito.Mockito.spy;
2828
import static org.mockito.Mockito.verify;
2929

30+
import java.util.Date;
3031
import java.util.concurrent.atomic.AtomicReference;
3132

32-
import org.junit.Test;
33+
import org.junit.jupiter.api.Test;
3334
import org.mockito.ArgumentCaptor;
3435

3536
import org.springframework.amqp.core.Message;
@@ -38,10 +39,13 @@
3839
import org.springframework.amqp.rabbit.connection.CorrelationData;
3940
import org.springframework.amqp.rabbit.core.RabbitTemplate;
4041
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
42+
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
4143
import org.springframework.beans.factory.BeanFactory;
44+
import org.springframework.core.ResolvableType;
4245
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4346
import org.springframework.integration.channel.NullChannel;
4447
import org.springframework.integration.channel.QueueChannel;
48+
import org.springframework.integration.mapping.support.JsonHeaders;
4549
import org.springframework.integration.support.MessageBuilder;
4650
import org.springframework.messaging.MessageHeaders;
4751
import org.springframework.messaging.support.GenericMessage;
@@ -50,6 +54,7 @@
5054
/**
5155
* @author Gary Russell
5256
* @author Artem Bilan
57+
*
5358
* @since 3.0
5459
*/
5560
public class OutboundEndpointTests {
@@ -94,9 +99,9 @@ public void testAsyncDelayExpression() {
9499
new SimpleMessageListenerContainer(connectionFactory), "replyTo"));
95100
amqpTemplate.setTaskScheduler(mock(TaskScheduler.class));
96101
AsyncAmqpOutboundGateway gateway = new AsyncAmqpOutboundGateway(amqpTemplate);
97-
willAnswer(
98-
invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
99-
.given(amqpTemplate).sendAndReceive(anyString(), anyString(), any(Message.class));
102+
willAnswer(invocation -> amqpTemplate.new RabbitMessageFuture("foo", invocation.getArgument(2)))
103+
.given(amqpTemplate)
104+
.sendAndReceive(anyString(), anyString(), any(Message.class));
100105
gateway.setExchangeName("foo");
101106
gateway.setRoutingKey("bar");
102107
gateway.setDelayExpressionString("42");
@@ -158,6 +163,33 @@ public void testHeaderMapperWinsGateway() {
158163
assertThat(amqpMessage.get().getMessageProperties().getHeaders().get(MessageHeaders.REPLY_CHANNEL)).isNull();
159164
}
160165

166+
@Test
167+
public void testReplyHeadersWin() {
168+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
169+
TestRabbitTemplate amqpTemplate = spy(new TestRabbitTemplate(connectionFactory));
170+
amqpTemplate.setUseTemporaryReplyQueues(true);
171+
AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(amqpTemplate);
172+
endpoint.setExpectReply(true);
173+
willAnswer(invocation ->
174+
org.springframework.amqp.core.MessageBuilder.withBody(new byte[0])
175+
.setHeader(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, String.class.getName())
176+
.build()
177+
).given(amqpTemplate)
178+
.doSendAndReceiveWithTemporary(isNull(), isNull(), any(Message.class), isNull());
179+
QueueChannel replyChannel = new QueueChannel();
180+
org.springframework.messaging.Message<?> message = MessageBuilder.withPayload("foo")
181+
.setHeader(JsonHeaders.RESOLVABLE_TYPE, ResolvableType.forClass(Date.class))
182+
.setReplyChannel(replyChannel)
183+
.build();
184+
endpoint.handleMessage(message);
185+
org.springframework.messaging.Message<?> receive = replyChannel.receive(10_000);
186+
assertThat(receive).isNotNull();
187+
assertThat(receive.getHeaders())
188+
.containsEntry(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, String.class.getName())
189+
.containsEntry(JsonHeaders.TYPE_ID, String.class.getName())
190+
.containsEntry(JsonHeaders.RESOLVABLE_TYPE, ResolvableType.forClass(String.class));
191+
}
192+
161193
/**
162194
* Increase method visibility
163195
*/

0 commit comments

Comments
 (0)