Skip to content

Commit f82c716

Browse files
garyrussellartembilan
authored andcommitted
spring-projectsGH-2728: AMQP Manual acks and conversion errors
Resolves spring-projects#2728 Provide access to the channel and delivery tag in the `ErrorMessage` when using `AcknowledgeMode.MANUAL`.
1 parent 65b8899 commit f82c716

File tree

7 files changed

+198
-19
lines changed

7 files changed

+198
-19
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,6 @@
2323
import org.springframework.amqp.core.Message;
2424
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
2525
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
26-
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
2726
import org.springframework.amqp.support.AmqpHeaders;
2827
import org.springframework.amqp.support.converter.MessageConversionException;
2928
import org.springframework.amqp.support.converter.MessageConverter;
@@ -34,6 +33,7 @@
3433
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3534
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
3635
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
36+
import org.springframework.integration.amqp.support.EndpointUtils;
3737
import org.springframework.integration.context.OrderlyShutdownCapable;
3838
import org.springframework.integration.endpoint.MessageProducerSupport;
3939
import org.springframework.integration.support.ErrorMessageUtils;
@@ -217,8 +217,9 @@ public void onMessage(final Message message, final Channel channel) throws Excep
217217
catch (MessageConversionException e) {
218218
if (getErrorChannel() != null) {
219219
setAttributesIfNecessary(message, null);
220-
getMessagingTemplate().send(getErrorChannel(), buildErrorMessage(null,
221-
new ListenerExecutionFailedException("Message conversion failed", e, message)));
220+
getMessagingTemplate()
221+
.send(getErrorChannel(), buildErrorMessage(null,
222+
EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e)));
222223
}
223224
else {
224225
throw e;
@@ -241,8 +242,7 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
241242
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
242243
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
243244
.toHeadersFromRequest(message.getMessageProperties());
244-
if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
245-
== AcknowledgeMode.MANUAL) {
245+
if (isManualAck()) {
246246
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
247247
headers.put(AmqpHeaders.CHANNEL, channel);
248248
}
@@ -256,6 +256,11 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
256256
return messagingMessage;
257257
}
258258

259+
private boolean isManualAck() {
260+
return AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
261+
== AcknowledgeMode.MANUAL;
262+
}
263+
259264
}
260265

261266
}

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,7 +28,6 @@
2828
import org.springframework.amqp.rabbit.core.RabbitTemplate;
2929
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3030
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
31-
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
3231
import org.springframework.amqp.support.AmqpHeaders;
3332
import org.springframework.amqp.support.converter.MessageConverter;
3433
import org.springframework.amqp.support.converter.SimpleMessageConverter;
@@ -38,6 +37,7 @@
3837
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3938
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
4039
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
40+
import org.springframework.integration.amqp.support.EndpointUtils;
4141
import org.springframework.integration.gateway.MessagingGatewaySupport;
4242
import org.springframework.integration.support.ErrorMessageUtils;
4343
import org.springframework.messaging.MessageChannel;
@@ -281,10 +281,12 @@ public void onMessage(final Message message, final Channel channel) throws Excep
281281
private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
282282
Map<String, Object> headers = null;
283283
Object payload = null;
284+
boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer
285+
.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
284286
try {
285287
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
286288
headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
287-
if (AmqpInboundGateway.this.messageListenerContainer.getAcknowledgeMode() == AcknowledgeMode.MANUAL) {
289+
if (isManualAck) {
288290
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
289291
headers.put(AmqpHeaders.CHANNEL, channel);
290292
}
@@ -297,7 +299,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
297299
if (errorChannel != null) {
298300
setAttributesIfNecessary(message, null);
299301
AmqpInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null,
300-
new ListenerExecutionFailedException("Message conversion failed", e, message)));
302+
EndpointUtils.errorMessagePayload(message, channel, isManualAck, e)));
301303
}
302304
else {
303305
throw e;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.support;
18+
19+
import org.springframework.amqp.core.Message;
20+
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
21+
22+
import com.rabbitmq.client.Channel;
23+
24+
/**
25+
* Utility methods for messaging endpoints.
26+
*
27+
* @author Gary Russell
28+
* @since 5.1.3
29+
*
30+
*/
31+
public final class EndpointUtils {
32+
33+
private static final String LEFE_MESSAGE = "Message conversion failed";
34+
35+
private EndpointUtils() {
36+
super();
37+
}
38+
39+
/**
40+
* Return an {@link ListenerExecutionFailedException} or a {@link ManualAckListenerExecutionFailedException}
41+
* depending on whether isManualAck is false or true.
42+
* @param message the failed message.
43+
* @param channel the channel.
44+
* @param isManualAck true if the container uses manual acknowledgment.
45+
* @param e the exception.
46+
* @return the exception.
47+
*/
48+
public static ListenerExecutionFailedException errorMessagePayload(final Message message,
49+
Channel channel, boolean isManualAck, Exception e) {
50+
51+
return isManualAck
52+
? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, e, message, channel,
53+
message.getMessageProperties().getDeliveryTag())
54+
: new ListenerExecutionFailedException(LEFE_MESSAGE, e, message);
55+
}
56+
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.support;
18+
19+
import org.springframework.amqp.core.Message;
20+
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
21+
22+
import com.rabbitmq.client.Channel;
23+
24+
/**
25+
* A {@link ListenerExecutionFailedException} enhanced with the channel and delivery tag.
26+
* Used for conversion errors when using manual acks.
27+
*
28+
* @author Gary Russell
29+
* @since 5.1.3
30+
*
31+
*/
32+
public class ManualAckListenerExecutionFailedException extends ListenerExecutionFailedException {
33+
34+
private static final long serialVersionUID = 1L;
35+
36+
private final Channel channel;
37+
38+
private final long deliveryTag;
39+
40+
public ManualAckListenerExecutionFailedException(String msg, Throwable cause, Message failedMessage,
41+
Channel channel, long deliveryTag) {
42+
43+
super(msg, cause, failedMessage);
44+
this.channel = channel;
45+
this.deliveryTag = deliveryTag;
46+
}
47+
48+
public Channel getChannel() {
49+
return this.channel;
50+
}
51+
52+
public long getDeliveryTag() {
53+
return this.deliveryTag;
54+
}
55+
56+
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2018 the original author or authors.
2+
* Copyright 2013-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.amqp.inbound;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.hamcrest.CoreMatchers.containsString;
2021
import static org.hamcrest.CoreMatchers.instanceOf;
2122
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -32,6 +33,7 @@
3233
import static org.mockito.ArgumentMatchers.anyBoolean;
3334
import static org.mockito.ArgumentMatchers.anyString;
3435
import static org.mockito.ArgumentMatchers.isNull;
36+
import static org.mockito.BDDMockito.given;
3537
import static org.mockito.Mockito.doAnswer;
3638
import static org.mockito.Mockito.mock;
3739
import static org.mockito.Mockito.spy;
@@ -52,6 +54,7 @@
5254
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
5355
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5456
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
57+
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
5558
import org.springframework.amqp.support.AmqpHeaders;
5659
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
5760
import org.springframework.amqp.support.converter.MessageConversionException;
@@ -61,6 +64,7 @@
6164
import org.springframework.integration.StaticMessageHeaderAccessor;
6265
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
6366
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
67+
import org.springframework.integration.amqp.support.ManualAckListenerExecutionFailedException;
6468
import org.springframework.integration.channel.DirectChannel;
6569
import org.springframework.integration.channel.QueueChannel;
6670
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
@@ -248,12 +252,31 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
248252

249253
});
250254
adapter.afterPropertiesSet();
255+
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
256+
MessageProperties props = new MessageProperties();
257+
props.setDeliveryTag(42L);
258+
given(message.getMessageProperties()).willReturn(props);
251259
((ChannelAwareMessageListener) container.getMessageListener())
252-
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
253-
assertNull(outputChannel.receive(0));
260+
.onMessage(message, null);
261+
assertThat(outputChannel.receive(0)).isNull();
254262
Message<?> received = errorChannel.receive(0);
255-
assertNotNull(received);
256-
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
263+
assertThat(received).isNotNull();
264+
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
265+
assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class);
266+
267+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
268+
Channel channel = mock(Channel.class);
269+
((ChannelAwareMessageListener) container.getMessageListener())
270+
.onMessage(message, channel);
271+
assertThat(outputChannel.receive(0)).isNull();
272+
received = errorChannel.receive(0);
273+
assertThat(received).isNotNull();
274+
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
275+
assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class);
276+
ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received
277+
.getPayload();
278+
assertThat(ex.getChannel()).isEqualTo(channel);
279+
assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag());
257280
}
258281

259282
@Test
@@ -284,12 +307,30 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws
284307

285308
});
286309
adapter.afterPropertiesSet();
310+
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
311+
MessageProperties props = new MessageProperties();
312+
props.setDeliveryTag(42L);
313+
given(message.getMessageProperties()).willReturn(props);
287314
((ChannelAwareMessageListener) container.getMessageListener())
288-
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
289-
assertNull(outputChannel.receive(0));
315+
.onMessage(message, null);
316+
assertThat(outputChannel.receive(0)).isNull();
290317
Message<?> received = errorChannel.receive(0);
291-
assertNotNull(received);
292-
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
318+
assertThat(received).isNotNull();
319+
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
320+
321+
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
322+
Channel channel = mock(Channel.class);
323+
((ChannelAwareMessageListener) container.getMessageListener())
324+
.onMessage(message, channel);
325+
assertThat(outputChannel.receive(0)).isNull();
326+
received = errorChannel.receive(0);
327+
assertThat(received).isNotNull();
328+
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
329+
assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class);
330+
ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received
331+
.getPayload();
332+
assertThat(ex.getChannel()).isEqualTo(channel);
333+
assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag());
293334
}
294335

295336
@Test

src/reference/asciidoc/amqp.adoc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,20 @@ public class AmqpAsyncApplication {
10811081
----
10821082
====
10831083

1084+
[[amqp-conversion-inbound]]
1085+
=== Inbound Message Conversion
1086+
1087+
Inbound messages, arriving at the channel adapter or gateway, are converted to the `spring-messaging` `Message<?>` payload using a message converter.
1088+
By default, a `SimpleMessageConverter` is used, which handles java serialization and text.
1089+
Headers are mapped using the `DefaultHeaderMapper.inboundMapper()` by default.
1090+
If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container's error handler.
1091+
The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured).
1092+
If an error channel is defined, the `ErrorMessage` payload is a `ListenerExecutionFailedException` with properties `failedMessage` (the Spring AMQP message that could not be converted) and the `cause`.
1093+
If the container `AcknowledgeMode` is `AUTO` (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged.
1094+
If the error flow throws an exception, the exception type, in conjunction with the container's error handler, will determine whether or not the message is requeued.
1095+
If the container is configured with `AcknowledgeMode.MANUAL`, the payload is a `ManualAckListenerExecutionFailedException` with additional properties `channel` and `deliveryTag`.
1096+
This enables the error flow to call `basicAck` or `basicNack` (or `basicReject`) for the message, to control its disposition.
1097+
10841098
[[content-type-conversion-outbound]]
10851099
=== Outbound Message Conversion
10861100

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ See the note near the bottom of <<amqp-message-headers>> for more information.
144144
The `contentType` header is now correctly mapped as an entry in the general headers map.
145145
See <<amqp-content-type>> for more information.
146146

147+
Starting with version 5.1.3, if a message conversion exception occurs when using manual acknowledgments, and an error channel is defined, the payload is a `ManualAckListenerExecutionFailedException` with additional `channel` and `deliveryTag` properties.
148+
This enables the error flow to ack/nack the original message.
149+
See <<amqp-conversion-inbound>> for more information.
150+
147151
[[x5.1-jdbc]]
148152
=== JDBC Changes
149153

0 commit comments

Comments
 (0)