Skip to content

Commit b50eaf2

Browse files
committed
spring-projectsGH-3089: Add AmqpInGateway.replyHeadersMappedLast
Fixes spring-projects#3089 In some use-case we would like to control when headers from SI message should be populated into an AMQP message. One of the use-case is like a `SimpleMessageConverter` and its `plain/text` for the String reply, meanwhile we know that this content is an `application/json`. So, with a new `replyHeadersMappedLast` we can override the mentioned `content-type` header, populated by the `MessageConverter` with an actual value from the message headers populated in the flow upstream * Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it on the DSL and XML level * Use newly introduced `MappingUtils.mapReplyMessage()` * Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter` result) * Also skip `JsonHeaders` when we `populateUserDefinedHeader()` **Cherry-pick to 5.1.x**
1 parent 977997e commit b50eaf2

File tree

11 files changed

+255
-102
lines changed

11 files changed

+255
-102
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @author Mark Fisher
3131
* @author Gary Russell
3232
* @author Artem Bilan
33+
*
3334
* @since 2.1
3435
*/
3536
public class AmqpInboundGatewayParser extends AbstractAmqpInboundAdapterParser {
@@ -48,6 +49,8 @@ protected void configureChannels(Element element, ParserContext parserContext, B
4849
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "request-channel");
4950
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel");
5051
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-reply-to");
52+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-headers-last",
53+
"replyHeadersMappedLast");
5154
}
5255

5356
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
1920
import org.springframework.amqp.support.converter.MessageConverter;
2021
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
2122
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
@@ -138,4 +139,40 @@ public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
138139
return _this();
139140
}
140141

142+
/**
143+
* Set a batching strategy to use when de-batching messages.
144+
* @return the spec.
145+
* @since 5.1.9
146+
* @see AmqpInboundGateway#setBatchingStrategy(BatchingStrategy)
147+
*/
148+
public S batchingStrategy(BatchingStrategy batchingStrategy) {
149+
this.target.setBatchingStrategy(batchingStrategy);
150+
return _this();
151+
}
152+
153+
/**
154+
* Set to true to bind the source message in the headers.
155+
* @param bindSourceMessage true to bind.
156+
* @return the spec.
157+
* @since 5.1.9
158+
* @see AmqpInboundGateway#setBindSourceMessage(boolean)
159+
*/
160+
public S bindSourceMessage(boolean bindSourceMessage) {
161+
this.target.setBindSourceMessage(bindSourceMessage);
162+
return _this();
163+
}
164+
165+
/**
166+
* When mapping headers for the outbound (reply) message, determine whether the headers are
167+
* mapped before the message is converted, or afterwards.
168+
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
169+
* @return the spec.
170+
* @since 5.1.9
171+
* @see AmqpInboundGateway#setReplyHeadersMappedLast(boolean)
172+
*/
173+
public S replyHeadersMappedLast(boolean replyHeadersMappedLast) {
174+
this.target.setReplyHeadersMappedLast(replyHeadersMappedLast);
175+
return _this();
176+
}
177+
141178
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.springframework.amqp.core.Address;
2626
import org.springframework.amqp.core.AmqpTemplate;
2727
import org.springframework.amqp.core.Message;
28-
import org.springframework.amqp.core.MessagePostProcessor;
29-
import org.springframework.amqp.core.MessageProperties;
3028
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
3129
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
3230
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -42,14 +40,14 @@
4240
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
4341
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4442
import org.springframework.integration.amqp.support.EndpointUtils;
43+
import org.springframework.integration.amqp.support.MappingUtils;
4544
import org.springframework.integration.gateway.MessagingGatewaySupport;
4645
import org.springframework.integration.support.ErrorMessageUtils;
4746
import org.springframework.messaging.MessageChannel;
4847
import org.springframework.retry.RecoveryCallback;
4948
import org.springframework.retry.support.RetrySynchronizationManager;
5049
import org.springframework.retry.support.RetryTemplate;
5150
import org.springframework.util.Assert;
52-
import org.springframework.util.StringUtils;
5351

5452
import com.rabbitmq.client.Channel;
5553

@@ -89,6 +87,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
8987

9088
private boolean bindSourceMessage;
9189

90+
private boolean replyHeadersMappedLast;
91+
9292
public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
9393
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
9494
}
@@ -204,6 +204,24 @@ public void setBindSourceMessage(boolean bindSourceMessage) {
204204
this.bindSourceMessage = bindSourceMessage;
205205
}
206206

207+
/**
208+
* When mapping headers for the outbound (reply) message, determine whether the headers are
209+
* mapped before the message is converted, or afterwards. This only affects headers
210+
* that might be added by the message converter. When false, the converter's headers
211+
* win; when true, any headers added by the converter will be overridden (if the
212+
* source message has a header that maps to those headers). You might wish to set this
213+
* to true, for example, when using a
214+
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a
215+
* String payload that contains json; the converter will set the content type to
216+
* {@code text/plain} which can be overridden to {@code application/json} by setting
217+
* the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false.
218+
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
219+
* @since 5.1.9
220+
*/
221+
public void setReplyHeadersMappedLast(boolean replyHeadersMappedLast) {
222+
this.replyHeadersMappedLast = replyHeadersMappedLast;
223+
}
224+
207225
@Override
208226
public String getComponentType() {
209227
return "amqp:inbound-gateway";
@@ -357,7 +375,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
357375

358376
private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {
359377
setAttributesIfNecessary(message, messagingMessage);
360-
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
378+
org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
361379
if (reply != null) {
362380
Address replyTo;
363381
String replyToProperty = message.getMessageProperties().getReplyTo();
@@ -368,38 +386,23 @@ private void process(Message message, org.springframework.messaging.Message<Obje
368386
replyTo = AmqpInboundGateway.this.defaultReplyTo;
369387
}
370388

371-
MessagePostProcessor messagePostProcessor =
372-
message1 -> {
373-
MessageProperties messageProperties = message1.getMessageProperties();
374-
String contentEncoding = messageProperties.getContentEncoding();
375-
long contentLength = messageProperties.getContentLength();
376-
String contentType = messageProperties.getContentType();
377-
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
378-
messageProperties);
379-
// clear the replyTo from the original message since we are using it now
380-
messageProperties.setReplyTo(null);
381-
// reset the content-* properties as determined by the MessageConverter
382-
if (StringUtils.hasText(contentEncoding)) {
383-
messageProperties.setContentEncoding(contentEncoding);
384-
}
385-
messageProperties.setContentLength(contentLength);
386-
if (contentType != null) {
387-
messageProperties.setContentType(contentType);
388-
}
389-
return message1;
390-
};
389+
org.springframework.amqp.core.Message amqpMessage =
390+
MappingUtils.mapReplyMessage(reply, AmqpInboundGateway.this.amqpMessageConverter,
391+
AmqpInboundGateway.this.headerMapper,
392+
message.getMessageProperties().getReceivedDeliveryMode(),
393+
AmqpInboundGateway.this.replyHeadersMappedLast);
391394

392395
if (replyTo != null) {
393-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(replyTo.getExchangeName(),
394-
replyTo.getRoutingKey(), reply.getPayload(), messagePostProcessor);
396+
AmqpInboundGateway.this.amqpTemplate.send(replyTo.getExchangeName(), replyTo.getRoutingKey(),
397+
amqpMessage);
395398
}
396399
else {
397400
if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
398401
throw new IllegalStateException("There is no 'replyTo' message property " +
399402
"and the `defaultReplyTo` hasn't been configured.");
400403
}
401404
else {
402-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(reply.getPayload(), messagePostProcessor);
405+
AmqpInboundGateway.this.amqpTemplate.send(amqpMessage);
403406
}
404407
}
405408
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ protected DefaultAmqpHeaderMapper(String[] requestHeaderNames, String[] replyHea
106106
*/
107107
@Override
108108
protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessageProperties) {
109-
Map<String, Object> headers = new HashMap<String, Object>();
109+
Map<String, Object> headers = new HashMap<>();
110110
try {
111111
JavaUtils.INSTANCE
112112
.acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put)
@@ -132,7 +132,7 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
132132
Integer priority = amqpMessageProperties.getPriority();
133133
JavaUtils.INSTANCE
134134
.acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY,
135-
priority, (key, value) -> headers.put(key, value))
135+
priority, headers::put)
136136
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put)
137137
.acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
138138
headers::put)
@@ -255,24 +255,23 @@ else if (allHeaders != null) {
255255
}
256256

257257
private void mapJsonHeaders(Map<String, Object> headers, MessageProperties amqpMessageProperties) {
258-
Map<String, String> jsonHeaders = new HashMap<String, String>();
259-
260-
for (String jsonHeader : JsonHeaders.HEADERS) {
261-
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
262-
if (value != null) {
263-
headers.remove(jsonHeader);
264-
if (value instanceof Class<?>) {
265-
value = ((Class<?>) value).getName();
266-
}
267-
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
268-
}
269-
}
270-
271258
/*
272259
* If the MessageProperties already contains JsonHeaders, don't overwrite them here because they were
273260
* set up by a message converter.
274261
*/
275262
if (!amqpMessageProperties.getHeaders().containsKey(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, ""))) {
263+
Map<String, String> jsonHeaders = new HashMap<>();
264+
265+
for (String jsonHeader : JsonHeaders.HEADERS) {
266+
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
267+
if (value != null) {
268+
headers.remove(jsonHeader);
269+
if (value instanceof Class<?>) {
270+
value = ((Class<?>) value).getName();
271+
}
272+
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
273+
}
274+
}
276275
amqpMessageProperties.getHeaders().putAll(jsonHeaders);
277276
}
278277
}
@@ -282,8 +281,9 @@ protected void populateUserDefinedHeader(String headerName, Object headerValue,
282281
MessageProperties amqpMessageProperties) {
283282
// do not overwrite an existing header with the same key
284283
// TODO: do we need to expose a boolean 'overwrite' flag?
285-
if (!amqpMessageProperties.getHeaders().containsKey(headerName)
286-
&& !AmqpHeaders.CONTENT_TYPE.equals(headerName)) {
284+
if (!amqpMessageProperties.getHeaders().containsKey(headerName) &&
285+
!AmqpHeaders.CONTENT_TYPE.equals(headerName) &&
286+
!headerName.startsWith(JsonHeaders.PREFIX)) {
287287
amqpMessageProperties.setHeader(headerName, headerValue);
288288
}
289289
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
* Utility methods used during message mapping.
3131
*
3232
* @author Gary Russell
33+
* @author Artem Bilan
34+
*
3335
* @since 4.3
3436
*
3537
*/
@@ -54,25 +56,64 @@ private MappingUtils() {
5456
public static org.springframework.amqp.core.Message mapMessage(Message<?> requestMessage,
5557
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
5658
boolean headersMappedLast) {
59+
60+
return doMapMessage(requestMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, false);
61+
}
62+
63+
/**
64+
* Map a reply o.s.Message to an o.s.a.core.Message. When using a
65+
* {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and
66+
* {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP
67+
* header taking precedence.
68+
* @param replyMessage the reply message.
69+
* @param converter the message converter to use.
70+
* @param headerMapper the header mapper to use.
71+
* @param defaultDeliveryMode the default delivery mode.
72+
* @param headersMappedLast true if headers are mapped after conversion.
73+
* @return the mapped Message.
74+
* @since 5.1.9
75+
*/
76+
public static org.springframework.amqp.core.Message mapReplyMessage(Message<?> replyMessage,
77+
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
78+
boolean headersMappedLast) {
79+
80+
return doMapMessage(replyMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, true);
81+
}
82+
83+
private static org.springframework.amqp.core.Message doMapMessage(Message<?> message,
84+
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
85+
boolean headersMappedLast, boolean reply) {
86+
5787
MessageProperties amqpMessageProperties = new MessageProperties();
5888
org.springframework.amqp.core.Message amqpMessage;
5989
if (!headersMappedLast) {
60-
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
90+
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
6191
}
6292
if (converter instanceof ContentTypeDelegatingMessageConverter && headersMappedLast) {
63-
String contentType = contentTypeAsString(requestMessage.getHeaders());
93+
String contentType = contentTypeAsString(message.getHeaders());
6494
if (contentType != null) {
6595
amqpMessageProperties.setContentType(contentType);
6696
}
6797
}
68-
amqpMessage = converter.toMessage(requestMessage.getPayload(), amqpMessageProperties);
98+
amqpMessage = converter.toMessage(message.getPayload(), amqpMessageProperties);
6999
if (headersMappedLast) {
70-
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
100+
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
71101
}
72-
checkDeliveryMode(requestMessage, amqpMessageProperties, defaultDeliveryMode);
102+
checkDeliveryMode(message, amqpMessageProperties, defaultDeliveryMode);
73103
return amqpMessage;
74104
}
75105

106+
private static void mapHeaders(MessageHeaders messageHeaders, MessageProperties amqpMessageProperties,
107+
AmqpHeaderMapper headerMapper, boolean reply) {
108+
109+
if (reply) {
110+
headerMapper.fromHeadersToReply(messageHeaders, amqpMessageProperties);
111+
}
112+
else {
113+
headerMapper.fromHeadersToRequest(messageHeaders, amqpMessageProperties);
114+
}
115+
}
116+
76117
private static String contentTypeAsString(MessageHeaders headers) {
77118
Object contentType = headers.get(AmqpHeaders.CONTENT_TYPE);
78119
if (contentType instanceof MimeType) {

spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp-5.2.xsd

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@
6767
received within the confirm timeout or a negative acknowledgment or returned
6868
message is received, an exception will be thrown.
6969
</xsd:documentation>
70-
<xsd:simpleType>
71-
<xsd:union memberTypes="xsd:boolean xsd:string" />
72-
</xsd:simpleType>
7370
</xsd:appinfo>
7471
</xsd:annotation>
72+
<xsd:simpleType>
73+
<xsd:union memberTypes="xsd:boolean xsd:string" />
74+
</xsd:simpleType>
7575
</xsd:attribute>
7676
</xsd:extension>
7777
</xsd:complexContent>
@@ -254,6 +254,18 @@
254254
</xsd:documentation>
255255
</xsd:annotation>
256256
</xsd:attribute>
257+
<xsd:attribute name="reply-headers-last">
258+
<xsd:annotation>
259+
<xsd:documentation>
260+
Whether reply headers are mapped before or after conversion from a messaging Message to
261+
a spring amqp Message. Set to true, for example, if you wish to override the
262+
contentType header set by the converter.
263+
</xsd:documentation>
264+
</xsd:annotation>
265+
<xsd:simpleType>
266+
<xsd:union memberTypes="xsd:boolean xsd:string" />
267+
</xsd:simpleType>
268+
</xsd:attribute>
257269
</xsd:extension>
258270
</xsd:complexContent>
259271
</xsd:complexType>

0 commit comments

Comments
 (0)