Skip to content

GH-2728: AMQP Manual acks and conversion errors #2729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,6 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
Expand All @@ -34,6 +33,7 @@
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.ErrorMessageUtils;
Expand Down Expand Up @@ -217,8 +217,9 @@ public void onMessage(final Message message, final Channel channel) throws Excep
catch (MessageConversionException e) {
if (getErrorChannel() != null) {
setAttributesIfNecessary(message, null);
getMessagingTemplate().send(getErrorChannel(), buildErrorMessage(null,
new ListenerExecutionFailedException("Message conversion failed", e, message)));
getMessagingTemplate()
.send(getErrorChannel(), buildErrorMessage(null,
EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e)));
}
else {
throw e;
Expand All @@ -241,8 +242,7 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
== AcknowledgeMode.MANUAL) {
if (isManualAck()) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
headers.put(AmqpHeaders.CHANNEL, channel);
}
Expand All @@ -256,6 +256,11 @@ private org.springframework.messaging.Message<Object> createMessage(Message mess
return messagingMessage;
}

private boolean isManualAck() {
return AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode()
== AcknowledgeMode.MANUAL;
}

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,6 @@
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
Expand All @@ -38,6 +37,7 @@
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -281,10 +281,12 @@ public void onMessage(final Message message, final Channel channel) throws Excep
private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
Map<String, Object> headers = null;
Object payload = null;
boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer
.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
try {
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
if (AmqpInboundGateway.this.messageListenerContainer.getAcknowledgeMode() == AcknowledgeMode.MANUAL) {
if (isManualAck) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
headers.put(AmqpHeaders.CHANNEL, channel);
}
Expand All @@ -297,7 +299,7 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
if (errorChannel != null) {
setAttributesIfNecessary(message, null);
AmqpInboundGateway.this.messagingTemplate.send(errorChannel, buildErrorMessage(null,
new ListenerExecutionFailedException("Message conversion failed", e, message)));
EndpointUtils.errorMessagePayload(message, channel, isManualAck, e)));
}
else {
throw e;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.amqp.support;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;

import com.rabbitmq.client.Channel;

/**
* Utility methods for messaging endpoints.
*
* @author Gary Russell
* @since 5.1.3
*
*/
public final class EndpointUtils {

private static final String LEFE_MESSAGE = "Message conversion failed";

private EndpointUtils() {
super();
}

/**
* Return an {@link ListenerExecutionFailedException} or a {@link ManualAckListenerExecutionFailedException}
* depending on whether isManualAck is false or true.
* @param message the failed message.
* @param channel the channel.
* @param isManualAck true if the container uses manual acknowledgment.
* @param e the exception.
* @return the exception.
*/
public static ListenerExecutionFailedException errorMessagePayload(final Message message,
Channel channel, boolean isManualAck, Exception e) {

return isManualAck
? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, e, message, channel,
message.getMessageProperties().getDeliveryTag())
: new ListenerExecutionFailedException(LEFE_MESSAGE, e, message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.amqp.support;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;

import com.rabbitmq.client.Channel;

/**
* A {@link ListenerExecutionFailedException} enhanced with the channel and delivery tag.
* Used for conversion errors when using manual acks.
*
* @author Gary Russell
* @since 5.1.3
*
*/
public class ManualAckListenerExecutionFailedException extends ListenerExecutionFailedException {

private static final long serialVersionUID = 1L;

private final Channel channel;

private final long deliveryTag;

public ManualAckListenerExecutionFailedException(String msg, Throwable cause, Message failedMessage,
Channel channel, long deliveryTag) {

super(msg, cause, failedMessage);
this.channel = channel;
this.deliveryTag = deliveryTag;
}

public Channel getChannel() {
return this.channel;
}

public long getDeliveryTag() {
return this.deliveryTag;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2018 the original author or authors.
* Copyright 2013-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.integration.amqp.inbound;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand All @@ -32,6 +33,7 @@
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand All @@ -52,6 +54,7 @@
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConversionException;
Expand All @@ -61,6 +64,7 @@
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.ManualAckListenerExecutionFailedException;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
Expand Down Expand Up @@ -248,12 +252,31 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws

});
adapter.afterPropertiesSet();
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
MessageProperties props = new MessageProperties();
props.setDeliveryTag(42L);
given(message.getMessageProperties()).willReturn(props);
((ChannelAwareMessageListener) container.getMessageListener())
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
assertNull(outputChannel.receive(0));
.onMessage(message, null);
assertThat(outputChannel.receive(0)).isNull();
Message<?> received = errorChannel.receive(0);
assertNotNull(received);
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
assertThat(received).isNotNull();
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class);

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
Channel channel = mock(Channel.class);
((ChannelAwareMessageListener) container.getMessageListener())
.onMessage(message, channel);
assertThat(outputChannel.receive(0)).isNull();
received = errorChannel.receive(0);
assertThat(received).isNotNull();
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class);
ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received
.getPayload();
assertThat(ex.getChannel()).isEqualTo(channel);
assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag());
}

@Test
Expand Down Expand Up @@ -284,12 +307,30 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws

});
adapter.afterPropertiesSet();
org.springframework.amqp.core.Message message = mock(org.springframework.amqp.core.Message.class);
MessageProperties props = new MessageProperties();
props.setDeliveryTag(42L);
given(message.getMessageProperties()).willReturn(props);
((ChannelAwareMessageListener) container.getMessageListener())
.onMessage(mock(org.springframework.amqp.core.Message.class), null);
assertNull(outputChannel.receive(0));
.onMessage(message, null);
assertThat(outputChannel.receive(0)).isNull();
Message<?> received = errorChannel.receive(0);
assertNotNull(received);
assertNotNull(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE));
assertThat(received).isNotNull();
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
Channel channel = mock(Channel.class);
((ChannelAwareMessageListener) container.getMessageListener())
.onMessage(message, channel);
assertThat(outputChannel.receive(0)).isNull();
received = errorChannel.receive(0);
assertThat(received).isNotNull();
assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull();
assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class);
ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received
.getPayload();
assertThat(ex.getChannel()).isEqualTo(channel);
assertThat(ex.getDeliveryTag()).isEqualTo(props.getDeliveryTag());
}

@Test
Expand Down
14 changes: 14 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1081,6 +1081,20 @@ public class AmqpAsyncApplication {
----
====

[[amqp-conversion-inbound]]
=== Inbound Message Conversion

Inbound messages, arriving at the channel adapter or gateway, are converted to the `spring-messaging` `Message<?>` payload using a message converter.
By default, a `SimpleMessageConverter` is used, which handles java serialization and text.
Headers are mapped using the `DefaultHeaderMapper.inboundMapper()` by default.
If a conversion error occurs, and there is no error channel defined, the exception is thrown to the container and handled by the listener container's error handler.
The default error handler treats conversion errors as fatal and the message will be rejected (and routed to a dead-letter exchange, if the queue is so configured).
If an error channel is defined, the `ErrorMessage` payload is a `ListenerExecutionFailedException` with properties `failedMessage` (the Spring AMQP message that could not be converted) and the `cause`.
If the container `AcknowledgeMode` is `AUTO` (the default) and the error flow consumes the error without throwing an exception, the original message will be acknowledged.
If the error flow throws an exception, the exception type, in conjunction with the container's error handler, will determine whether or not the message is requeued.
If the container is configured with `AcknowledgeMode.MANUAL`, the payload is a `ManualAckListenerExecutionFailedException` with additional properties `channel` and `deliveryTag`.
This enables the error flow to call `basicAck` or `basicNack` (or `basicReject`) for the message, to control its disposition.

[[content-type-conversion-outbound]]
=== Outbound Message Conversion

Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ See the note near the bottom of <<amqp-message-headers>> for more information.
The `contentType` header is now correctly mapped as an entry in the general headers map.
See <<amqp-content-type>> for more information.

Starting with version 5.1.3, if a message conversion exception occurs when using manual acknowledgments, and an error channel is defined, the payload is a `ManualAckListenerExecutionFailedException` with additional `channel` and `deliveryTag` properties.
This enables the error flow to ack/nack the original message.
See <<amqp-conversion-inbound>> for more information.

[[x5.1-jdbc]]
=== JDBC Changes

Expand Down