Skip to content

Commit 6468ecc

Browse files
artembilangaryrussell
authored andcommitted
GH-3089: Add AmqpInGateway.replyHeadersMappedLast (#3091)
* GH-3089: Add AmqpInGateway.replyHeadersMappedLast Fixes #3089 In some use-case we would like to control when headers from SI message should be populated into an AMQP message. One of the use-case is like a `SimpleMessageConverter` and its `plain/text` for the String reply, meanwhile we know that this content is an `application/json`. So, with a new `replyHeadersMappedLast` we can override the mentioned `content-type` header, populated by the `MessageConverter` with an actual value from the message headers populated in the flow upstream * Introduce an `AmqpInboundGateway.replyHeadersMappedLast`; expose it on the DSL and XML level * Use newly introduced `MappingUtils.mapReplyMessage()` * Optimize `DefaultAmqpHeaderMapper` to not parse JSON headers at all when `JsonHeaders.TYPE_ID` is already present (e.g. `MessageConverter` result) * Also skip `JsonHeaders` when we `populateUserDefinedHeader()` **Cherry-pick to 5.1.x** * * Fix language and package typos * Add missed `@param` in JavaDoc of the `AmqpBaseInboundGatewaySpec.batchingStrategy()` * Extract a `RabbitTemplate` `MessageConverter` to use for reply messages conversion - pursue a backward compatibility
1 parent 977997e commit 6468ecc

File tree

11 files changed

+263
-103
lines changed

11 files changed

+263
-103
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
* @author Mark Fisher
3131
* @author Gary Russell
3232
* @author Artem Bilan
33+
*
3334
* @since 2.1
3435
*/
3536
public class AmqpInboundGatewayParser extends AbstractAmqpInboundAdapterParser {
@@ -48,6 +49,8 @@ protected void configureChannels(Element element, ParserContext parserContext, B
4849
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "request-channel");
4950
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "reply-channel");
5051
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "default-reply-to");
52+
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "reply-headers-last",
53+
"replyHeadersMappedLast");
5154
}
5255

5356
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpBaseInboundGatewaySpec.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.dsl;
1818

19+
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
1920
import org.springframework.amqp.support.converter.MessageConverter;
2021
import org.springframework.integration.amqp.inbound.AmqpInboundGateway;
2122
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
@@ -138,4 +139,41 @@ public S recoveryCallback(RecoveryCallback<?> recoveryCallback) {
138139
return _this();
139140
}
140141

142+
/**
143+
* Set a batching strategy to use when de-batching messages.
144+
* @param batchingStrategy the strategy to use.
145+
* @return the spec.
146+
* @since 5.1.9
147+
* @see AmqpInboundGateway#setBatchingStrategy(BatchingStrategy)
148+
*/
149+
public S batchingStrategy(BatchingStrategy batchingStrategy) {
150+
this.target.setBatchingStrategy(batchingStrategy);
151+
return _this();
152+
}
153+
154+
/**
155+
* Set to true to bind the source message in the headers.
156+
* @param bindSourceMessage true to bind.
157+
* @return the spec.
158+
* @since 5.1.9
159+
* @see AmqpInboundGateway#setBindSourceMessage(boolean)
160+
*/
161+
public S bindSourceMessage(boolean bindSourceMessage) {
162+
this.target.setBindSourceMessage(bindSourceMessage);
163+
return _this();
164+
}
165+
166+
/**
167+
* When mapping headers for the outbound (reply) message, determine whether the headers are
168+
* mapped before the message is converted, or afterwards.
169+
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
170+
* @return the spec.
171+
* @since 5.1.9
172+
* @see AmqpInboundGateway#setReplyHeadersMappedLast(boolean)
173+
*/
174+
public S replyHeadersMappedLast(boolean replyHeadersMappedLast) {
175+
this.target.setReplyHeadersMappedLast(replyHeadersMappedLast);
176+
return _this();
177+
}
178+
141179
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import org.springframework.amqp.core.Address;
2626
import org.springframework.amqp.core.AmqpTemplate;
2727
import org.springframework.amqp.core.Message;
28-
import org.springframework.amqp.core.MessagePostProcessor;
29-
import org.springframework.amqp.core.MessageProperties;
3028
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
3129
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
3230
import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -42,14 +40,14 @@
4240
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
4341
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
4442
import org.springframework.integration.amqp.support.EndpointUtils;
43+
import org.springframework.integration.amqp.support.MappingUtils;
4544
import org.springframework.integration.gateway.MessagingGatewaySupport;
4645
import org.springframework.integration.support.ErrorMessageUtils;
4746
import org.springframework.messaging.MessageChannel;
4847
import org.springframework.retry.RecoveryCallback;
4948
import org.springframework.retry.support.RetrySynchronizationManager;
5049
import org.springframework.retry.support.RetryTemplate;
5150
import org.springframework.util.Assert;
52-
import org.springframework.util.StringUtils;
5351

5452
import com.rabbitmq.client.Channel;
5553

@@ -77,6 +75,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
7775

7876
private MessageConverter amqpMessageConverter = new SimpleMessageConverter();
7977

78+
private MessageConverter templateMessageConverter = this.amqpMessageConverter;
79+
8080
private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
8181

8282
private Address defaultReplyTo;
@@ -89,6 +89,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {
8989

9090
private boolean bindSourceMessage;
9191

92+
private boolean replyHeadersMappedLast;
93+
9294
public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
9395
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
9496
}
@@ -116,6 +118,9 @@ private AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer, A
116118
this.messageListenerContainer.setAutoStartup(false);
117119
this.amqpTemplate = amqpTemplate;
118120
this.amqpTemplateExplicitlySet = amqpTemplateExplicitlySet;
121+
if (this.amqpTemplateExplicitlySet && this.amqpTemplate instanceof RabbitTemplate) {
122+
this.templateMessageConverter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
123+
}
119124
setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
120125
}
121126

@@ -131,6 +136,7 @@ public void setMessageConverter(MessageConverter messageConverter) {
131136
this.amqpMessageConverter = messageConverter;
132137
if (!this.amqpTemplateExplicitlySet) {
133138
((RabbitTemplate) this.amqpTemplate).setMessageConverter(messageConverter);
139+
this.templateMessageConverter = messageConverter;
134140
}
135141
}
136142

@@ -204,6 +210,24 @@ public void setBindSourceMessage(boolean bindSourceMessage) {
204210
this.bindSourceMessage = bindSourceMessage;
205211
}
206212

213+
/**
214+
* When mapping headers for the outbound (reply) message, determine whether the headers are
215+
* mapped before the message is converted, or afterwards. This only affects headers
216+
* that might be added by the message converter. When false, the converter's headers
217+
* win; when true, any headers added by the converter will be overridden (if the
218+
* source message has a header that maps to those headers). You might wish to set this
219+
* to true, for example, when using a
220+
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter} with a
221+
* String payload that contains json; the converter will set the content type to
222+
* {@code text/plain} which can be overridden to {@code application/json} by setting
223+
* the {@link AmqpHeaders#CONTENT_TYPE} message header. Default: false.
224+
* @param replyHeadersMappedLast true if reply headers are mapped after conversion.
225+
* @since 5.1.9
226+
*/
227+
public void setReplyHeadersMappedLast(boolean replyHeadersMappedLast) {
228+
this.replyHeadersMappedLast = replyHeadersMappedLast;
229+
}
230+
207231
@Override
208232
public String getComponentType() {
209233
return "amqp:inbound-gateway";
@@ -357,7 +381,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
357381

358382
private void process(Message message, org.springframework.messaging.Message<Object> messagingMessage) {
359383
setAttributesIfNecessary(message, messagingMessage);
360-
final org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
384+
org.springframework.messaging.Message<?> reply = sendAndReceiveMessage(messagingMessage);
361385
if (reply != null) {
362386
Address replyTo;
363387
String replyToProperty = message.getMessageProperties().getReplyTo();
@@ -368,38 +392,23 @@ private void process(Message message, org.springframework.messaging.Message<Obje
368392
replyTo = AmqpInboundGateway.this.defaultReplyTo;
369393
}
370394

371-
MessagePostProcessor messagePostProcessor =
372-
message1 -> {
373-
MessageProperties messageProperties = message1.getMessageProperties();
374-
String contentEncoding = messageProperties.getContentEncoding();
375-
long contentLength = messageProperties.getContentLength();
376-
String contentType = messageProperties.getContentType();
377-
AmqpInboundGateway.this.headerMapper.fromHeadersToReply(reply.getHeaders(),
378-
messageProperties);
379-
// clear the replyTo from the original message since we are using it now
380-
messageProperties.setReplyTo(null);
381-
// reset the content-* properties as determined by the MessageConverter
382-
if (StringUtils.hasText(contentEncoding)) {
383-
messageProperties.setContentEncoding(contentEncoding);
384-
}
385-
messageProperties.setContentLength(contentLength);
386-
if (contentType != null) {
387-
messageProperties.setContentType(contentType);
388-
}
389-
return message1;
390-
};
395+
org.springframework.amqp.core.Message amqpMessage =
396+
MappingUtils.mapReplyMessage(reply, AmqpInboundGateway.this.templateMessageConverter,
397+
AmqpInboundGateway.this.headerMapper,
398+
message.getMessageProperties().getReceivedDeliveryMode(),
399+
AmqpInboundGateway.this.replyHeadersMappedLast);
391400

392401
if (replyTo != null) {
393-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(replyTo.getExchangeName(),
394-
replyTo.getRoutingKey(), reply.getPayload(), messagePostProcessor);
402+
AmqpInboundGateway.this.amqpTemplate.send(replyTo.getExchangeName(), replyTo.getRoutingKey(),
403+
amqpMessage);
395404
}
396405
else {
397406
if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
398407
throw new IllegalStateException("There is no 'replyTo' message property " +
399408
"and the `defaultReplyTo` hasn't been configured.");
400409
}
401410
else {
402-
AmqpInboundGateway.this.amqpTemplate.convertAndSend(reply.getPayload(), messagePostProcessor);
411+
AmqpInboundGateway.this.amqpTemplate.send(amqpMessage);
403412
}
404413
}
405414
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/DefaultAmqpHeaderMapper.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ protected DefaultAmqpHeaderMapper(String[] requestHeaderNames, String[] replyHea
106106
*/
107107
@Override
108108
protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessageProperties) {
109-
Map<String, Object> headers = new HashMap<String, Object>();
109+
Map<String, Object> headers = new HashMap<>();
110110
try {
111111
JavaUtils.INSTANCE
112112
.acceptIfNotNull(AmqpHeaders.APP_ID, amqpMessageProperties.getAppId(), headers::put)
@@ -132,7 +132,7 @@ protected Map<String, Object> extractStandardHeaders(MessageProperties amqpMessa
132132
Integer priority = amqpMessageProperties.getPriority();
133133
JavaUtils.INSTANCE
134134
.acceptIfCondition(priority != null && priority > 0, IntegrationMessageHeaderAccessor.PRIORITY,
135-
priority, (key, value) -> headers.put(key, value))
135+
priority, headers::put)
136136
.acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), headers::put)
137137
.acceptIfNotNull(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(),
138138
headers::put)
@@ -255,24 +255,23 @@ else if (allHeaders != null) {
255255
}
256256

257257
private void mapJsonHeaders(Map<String, Object> headers, MessageProperties amqpMessageProperties) {
258-
Map<String, String> jsonHeaders = new HashMap<String, String>();
259-
260-
for (String jsonHeader : JsonHeaders.HEADERS) {
261-
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
262-
if (value != null) {
263-
headers.remove(jsonHeader);
264-
if (value instanceof Class<?>) {
265-
value = ((Class<?>) value).getName();
266-
}
267-
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
268-
}
269-
}
270-
271258
/*
272259
* If the MessageProperties already contains JsonHeaders, don't overwrite them here because they were
273260
* set up by a message converter.
274261
*/
275262
if (!amqpMessageProperties.getHeaders().containsKey(JsonHeaders.TYPE_ID.replaceFirst(JsonHeaders.PREFIX, ""))) {
263+
Map<String, String> jsonHeaders = new HashMap<>();
264+
265+
for (String jsonHeader : JsonHeaders.HEADERS) {
266+
Object value = getHeaderIfAvailable(headers, jsonHeader, Object.class);
267+
if (value != null) {
268+
headers.remove(jsonHeader);
269+
if (value instanceof Class<?>) {
270+
value = ((Class<?>) value).getName();
271+
}
272+
jsonHeaders.put(jsonHeader.replaceFirst(JsonHeaders.PREFIX, ""), value.toString());
273+
}
274+
}
276275
amqpMessageProperties.getHeaders().putAll(jsonHeaders);
277276
}
278277
}
@@ -282,8 +281,9 @@ protected void populateUserDefinedHeader(String headerName, Object headerValue,
282281
MessageProperties amqpMessageProperties) {
283282
// do not overwrite an existing header with the same key
284283
// TODO: do we need to expose a boolean 'overwrite' flag?
285-
if (!amqpMessageProperties.getHeaders().containsKey(headerName)
286-
&& !AmqpHeaders.CONTENT_TYPE.equals(headerName)) {
284+
if (!amqpMessageProperties.getHeaders().containsKey(headerName) &&
285+
!AmqpHeaders.CONTENT_TYPE.equals(headerName) &&
286+
!headerName.startsWith(JsonHeaders.PREFIX)) {
287287
amqpMessageProperties.setHeader(headerName, headerValue);
288288
}
289289
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/MappingUtils.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
* Utility methods used during message mapping.
3131
*
3232
* @author Gary Russell
33+
* @author Artem Bilan
34+
*
3335
* @since 4.3
3436
*
3537
*/
@@ -40,7 +42,7 @@ private MappingUtils() {
4042
}
4143

4244
/**
43-
* Map an o.s.Message to an o.s.a.core.Message. When using a
45+
* Map an o.s.m.Message to an o.s.a.core.Message. When using a
4446
* {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and
4547
* {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP
4648
* header taking precedence.
@@ -54,25 +56,64 @@ private MappingUtils() {
5456
public static org.springframework.amqp.core.Message mapMessage(Message<?> requestMessage,
5557
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
5658
boolean headersMappedLast) {
59+
60+
return doMapMessage(requestMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, false);
61+
}
62+
63+
/**
64+
* Map a reply o.s.m.Message to an o.s.a.core.Message. When using a
65+
* {@link ContentTypeDelegatingMessageConverter}, {@link AmqpHeaders#CONTENT_TYPE} and
66+
* {@link MessageHeaders#CONTENT_TYPE} will be used for the selection, with the AMQP
67+
* header taking precedence.
68+
* @param replyMessage the reply message.
69+
* @param converter the message converter to use.
70+
* @param headerMapper the header mapper to use.
71+
* @param defaultDeliveryMode the default delivery mode.
72+
* @param headersMappedLast true if headers are mapped after conversion.
73+
* @return the mapped Message.
74+
* @since 5.1.9
75+
*/
76+
public static org.springframework.amqp.core.Message mapReplyMessage(Message<?> replyMessage,
77+
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
78+
boolean headersMappedLast) {
79+
80+
return doMapMessage(replyMessage, converter, headerMapper, defaultDeliveryMode, headersMappedLast, true);
81+
}
82+
83+
private static org.springframework.amqp.core.Message doMapMessage(Message<?> message,
84+
MessageConverter converter, AmqpHeaderMapper headerMapper, MessageDeliveryMode defaultDeliveryMode,
85+
boolean headersMappedLast, boolean reply) {
86+
5787
MessageProperties amqpMessageProperties = new MessageProperties();
5888
org.springframework.amqp.core.Message amqpMessage;
5989
if (!headersMappedLast) {
60-
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
90+
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
6191
}
6292
if (converter instanceof ContentTypeDelegatingMessageConverter && headersMappedLast) {
63-
String contentType = contentTypeAsString(requestMessage.getHeaders());
93+
String contentType = contentTypeAsString(message.getHeaders());
6494
if (contentType != null) {
6595
amqpMessageProperties.setContentType(contentType);
6696
}
6797
}
68-
amqpMessage = converter.toMessage(requestMessage.getPayload(), amqpMessageProperties);
98+
amqpMessage = converter.toMessage(message.getPayload(), amqpMessageProperties);
6999
if (headersMappedLast) {
70-
headerMapper.fromHeadersToRequest(requestMessage.getHeaders(), amqpMessageProperties);
100+
mapHeaders(message.getHeaders(), amqpMessageProperties, headerMapper, reply);
71101
}
72-
checkDeliveryMode(requestMessage, amqpMessageProperties, defaultDeliveryMode);
102+
checkDeliveryMode(message, amqpMessageProperties, defaultDeliveryMode);
73103
return amqpMessage;
74104
}
75105

106+
private static void mapHeaders(MessageHeaders messageHeaders, MessageProperties amqpMessageProperties,
107+
AmqpHeaderMapper headerMapper, boolean reply) {
108+
109+
if (reply) {
110+
headerMapper.fromHeadersToReply(messageHeaders, amqpMessageProperties);
111+
}
112+
else {
113+
headerMapper.fromHeadersToRequest(messageHeaders, amqpMessageProperties);
114+
}
115+
}
116+
76117
private static String contentTypeAsString(MessageHeaders headers) {
77118
Object contentType = headers.get(AmqpHeaders.CONTENT_TYPE);
78119
if (contentType instanceof MimeType) {

spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp-5.2.xsd

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@
6767
received within the confirm timeout or a negative acknowledgment or returned
6868
message is received, an exception will be thrown.
6969
</xsd:documentation>
70-
<xsd:simpleType>
71-
<xsd:union memberTypes="xsd:boolean xsd:string" />
72-
</xsd:simpleType>
7370
</xsd:appinfo>
7471
</xsd:annotation>
72+
<xsd:simpleType>
73+
<xsd:union memberTypes="xsd:boolean xsd:string" />
74+
</xsd:simpleType>
7575
</xsd:attribute>
7676
</xsd:extension>
7777
</xsd:complexContent>
@@ -254,6 +254,18 @@
254254
</xsd:documentation>
255255
</xsd:annotation>
256256
</xsd:attribute>
257+
<xsd:attribute name="reply-headers-last">
258+
<xsd:annotation>
259+
<xsd:documentation>
260+
Whether reply headers are mapped before or after conversion from a messaging Message to
261+
a spring amqp Message. Set to true, for example, if you wish to override the
262+
contentType header set by the converter.
263+
</xsd:documentation>
264+
</xsd:annotation>
265+
<xsd:simpleType>
266+
<xsd:union memberTypes="xsd:boolean xsd:string" />
267+
</xsd:simpleType>
268+
</xsd:attribute>
257269
</xsd:extension>
258270
</xsd:complexContent>
259271
</xsd:complexType>

0 commit comments

Comments
 (0)