From ec43b63ef1da6d401073935d9f83e9d691fa4e7a Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Fri, 1 Feb 2019 11:41:37 -0500 Subject: [PATCH] GH-2728: AMQP Manual acks and conversion errors Resolves https://github.com/spring-projects/spring-integration/issues/2728 Provide access to the channel and delivery tag in the `ErrorMessage` when using `AcknowledgeMode.MANUAL`. --- .../inbound/AmqpInboundChannelAdapter.java | 17 ++++-- .../amqp/inbound/AmqpInboundGateway.java | 10 ++-- .../amqp/support/EndpointUtils.java | 57 ++++++++++++++++++ ...alAckListenerExecutionFailedException.java | 56 ++++++++++++++++++ .../amqp/inbound/InboundEndpointTests.java | 59 ++++++++++++++++--- src/reference/asciidoc/amqp.adoc | 14 +++++ src/reference/asciidoc/whats-new.adoc | 4 ++ 7 files changed, 198 insertions(+), 19 deletions(-) create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java create mode 100644 spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java index 90186c403c1..8682f5e69d7 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,7 +23,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; -import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; @@ -34,6 +33,7 @@ import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.amqp.support.EndpointUtils; import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.support.ErrorMessageUtils; @@ -217,8 +217,9 @@ public void onMessage(final Message message, final Channel channel) throws Excep catch (MessageConversionException e) { if (getErrorChannel() != null) { setAttributesIfNecessary(message, null); - getMessagingTemplate().send(getErrorChannel(), buildErrorMessage(null, - new ListenerExecutionFailedException("Message conversion failed", e, message))); + getMessagingTemplate() + .send(getErrorChannel(), buildErrorMessage(null, + EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e))); } else { throw e; @@ -241,8 +242,7 @@ private org.springframework.messaging.Message createMessage(Message mess Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message); Map headers = AmqpInboundChannelAdapter.this.headerMapper .toHeadersFromRequest(message.getMessageProperties()); - if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode() - == AcknowledgeMode.MANUAL) { + if (isManualAck()) { headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag()); headers.put(AmqpHeaders.CHANNEL, channel); } @@ -256,6 +256,11 @@ private org.springframework.messaging.Message createMessage(Message mess return messagingMessage; } + private boolean isManualAck() { + return AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode() + == AcknowledgeMode.MANUAL; + } + } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java index 7a3bdef1d37..38e2c01bb89 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,7 +28,6 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; -import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SimpleMessageConverter; @@ -38,6 +37,7 @@ import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.amqp.support.EndpointUtils; import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.support.ErrorMessageUtils; import org.springframework.messaging.MessageChannel; @@ -281,10 +281,12 @@ public void onMessage(final Message message, final Channel channel) throws Excep private org.springframework.messaging.Message convert(Message message, Channel channel) { Map headers = null; Object payload = null; + boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer + .getAcknowledgeMode() == AcknowledgeMode.MANUAL; try { payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message); headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties()); - if (AmqpInboundGateway.this.messageListenerContainer.getAcknowledgeMode() == AcknowledgeMode.MANUAL) { + if (isManualAck) { headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag()); headers.put(AmqpHeaders.CHANNEL, channel); } @@ -297,7 +299,7 @@ private org.springframework.messaging.Message convert(Message message, C if (errorChannel != null) { setAttributesIfNecessary(message, null); AmqpInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null, - new ListenerExecutionFailedException("Message conversion failed", e, message))); + EndpointUtils.errorMessagePayload(message, channel, isManualAck, e))); } else { throw e; diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java new file mode 100644 index 00000000000..7bd25ec8160 --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java @@ -0,0 +1,57 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.support; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; + +import com.rabbitmq.client.Channel; + +/** + * Utility methods for messaging endpoints. + * + * @author Gary Russell + * @since 5.1.3 + * + */ +public final class EndpointUtils { + + private static final String LEFE_MESSAGE = "Message conversion failed"; + + private EndpointUtils() { + super(); + } + + /** + * Return an {@link ListenerExecutionFailedException} or a {@link ManualAckListenerExecutionFailedException} + * depending on whether isManualAck is false or true. + * @param message the failed message. + * @param channel the channel. + * @param isManualAck true if the container uses manual acknowledgment. + * @param e the exception. + * @return the exception. + */ + public static ListenerExecutionFailedException errorMessagePayload(final Message message, + Channel channel, boolean isManualAck, Exception e) { + + return isManualAck + ? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, e, message, channel, + message.getMessageProperties().getDeliveryTag()) + : new ListenerExecutionFailedException(LEFE_MESSAGE, e, message); + } + +} diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java new file mode 100644 index 00000000000..b2d7b8e31ee --- /dev/null +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java @@ -0,0 +1,56 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.amqp.support; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; + +import com.rabbitmq.client.Channel; + +/** + * A {@link ListenerExecutionFailedException} enhanced with the channel and delivery tag. + * Used for conversion errors when using manual acks. + * + * @author Gary Russell + * @since 5.1.3 + * + */ +public class ManualAckListenerExecutionFailedException extends ListenerExecutionFailedException { + + private static final long serialVersionUID = 1L; + + private final Channel channel; + + private final long deliveryTag; + + public ManualAckListenerExecutionFailedException(String msg, Throwable cause, Message failedMessage, + Channel channel, long deliveryTag) { + + super(msg, cause, failedMessage); + this.channel = channel; + this.deliveryTag = deliveryTag; + } + + public Channel getChannel() { + return this.channel; + } + + public long getDeliveryTag() { + return this.deliveryTag; + } + +} diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java index 145b587c994..1affb5c2e0a 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2018 the original author or authors. + * Copyright 2013-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.amqp.inbound; +import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.notNullValue; @@ -32,6 +33,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -52,6 +54,7 @@ import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; @@ -61,6 +64,7 @@ import org.springframework.integration.StaticMessageHeaderAccessor; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; +import org.springframework.integration.amqp.support.ManualAckListenerExecutionFailedException; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer; @@ -248,12 +252,31 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws }); adapter.afterPropertiesSet(); + org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class); + MessageProperties props = new MessageProperties(); + props.setDeliveryTag(42L); + given(message.getMessageProperties()).willReturn(props); ((ChannelAwareMessageListener) container.getMessageListener()) - .onMessage(mock(org.springframework.amqp.core.Message.class), null); - assertNull(outputChannel.receive(0)); + .onMessage(message, null); + assertThat(outputChannel.receive(0)).isNull(); Message received = errorChannel.receive(0); - assertNotNull(received); - assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class); + + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + Channel channel = mock(Channel.class); + ((ChannelAwareMessageListener) container.getMessageListener()) + .onMessage(message, channel); + assertThat(outputChannel.receive(0)).isNull(); + received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class); + ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received + .getPayload(); + assertThat(ex.getChannel()).isEqualTo(channel); + assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag()); } @Test @@ -284,12 +307,30 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws }); adapter.afterPropertiesSet(); + org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class); + MessageProperties props = new MessageProperties(); + props.setDeliveryTag(42L); + given(message.getMessageProperties()).willReturn(props); ((ChannelAwareMessageListener) container.getMessageListener()) - .onMessage(mock(org.springframework.amqp.core.Message.class), null); - assertNull(outputChannel.receive(0)); + .onMessage(message, null); + assertThat(outputChannel.receive(0)).isNull(); Message received = errorChannel.receive(0); - assertNotNull(received); - assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + Channel channel = mock(Channel.class); + ((ChannelAwareMessageListener) container.getMessageListener()) + .onMessage(message, channel); + assertThat(outputChannel.receive(0)).isNull(); + received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class); + ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received + .getPayload(); + assertThat(ex.getChannel()).isEqualTo(channel); + assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag()); } @Test diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 907f8e5ee93..25d6125b1e6 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -1081,6 +1081,20 @@ public class AmqpAsyncApplication { ---- ==== +[[amqp-conversion-inbound]] +=== Inbound Message Conversion + +Inbound messages, arriving at the channel adapter or gateway, are converted to the `spring-messaging` `Message` payload using a message converter. +By default, a `SimpleMessageConverter` is used, which handles java serialization and text. +Headers are mapped using the `DefaultHeaderMapper.inboundMapper()` by default. +If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container's error handler. +The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured). +If an error channel is defined, the `ErrorMessage` payload is a `ListenerExecutionFailedException` with properties `failedMessage` (the Spring AMQP message that could not be converted) and the `cause`. +If the container `AcknowledgeMode` is `AUTO` (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged. +If the error flow throws an exception, the exception type, in conjunction with the container's error handler, will determine whether or not the message is requeued. +If the container is configured with `AcknowledgeMode.MANUAL`, the payload is a `ManualAckListenerExecutionFailedException` with additional properties `channel` and `deliveryTag`. +This enables the error flow to call `basicAck` or `basicNack` (or `basicReject`) for the message, to control its disposition. + [[content-type-conversion-outbound]] === Outbound Message Conversion diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 3cb83cb55a2..9b48aaf70c3 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -144,6 +144,10 @@ See the note near the bottom of <> for more information. The `contentType` header is now correctly mapped as an entry in the general headers map. See <> for more information. +Starting with version 5.1.3, if a message conversion exception occurs when using manual acknowledgments, and an error channel is defined, the payload is a `ManualAckListenerExecutionFailedException` with additional `channel` and `deliveryTag` properties. +This enables the error flow to ack/nack the original message. +See <> for more information. + [[x5.1-jdbc]] === JDBC Changes