diff --git a/build.gradle b/build.gradle index ca2e8a1d36e..60e92716bf8 100644 --- a/build.gradle +++ b/build.gradle @@ -90,7 +90,7 @@ ext { rsocketVersion = '1.0.0-RC6' servletApiVersion = '4.0.1' smackVersion = '4.3.4' - springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.3.RELEASE' + springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.4.BUILD-SNAPSHOT' springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : 'Neumann-BUILD-SNAPSHOT' springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.3.0.RC1' springRetryVersion = '1.2.5.RELEASE' diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java index 8a00576ce41..033bb9c36d6 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/config/AbstractAmqpInboundAdapterParser.java @@ -119,6 +119,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "batch-mode"); configureChannels(element, parserContext, builder); AbstractBeanDefinition adapterBeanDefinition = builder.getRawBeanDefinition(); adapterBeanDefinition.setResource(parserContext.getReaderContext().getResource()); diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSMLCSpec.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSMLCSpec.java index 85465546d53..ae235d42577 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSMLCSpec.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpInboundChannelAdapterSMLCSpec.java @@ -19,6 +19,7 @@ import java.util.function.Consumer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; /** * Spec for an inbound channel adapter with a {@link SimpleMessageListenerContainer}. @@ -41,4 +42,15 @@ public AmqpInboundChannelAdapterSMLCSpec configureContainer(Consumer>} where each element is a message is + * converted from the Spring AMQP Message. + */ + MESSAGES, + + /** + * Payload is a {@code List} where each element is the converted body of the + * Spring AMQP Message. + */ + EXTRACT_PAYLOADS + + } + private static final ThreadLocal ATTRIBUTES_HOLDER = new ThreadLocal<>(); private final AbstractMessageListenerContainer messageListenerContainer; @@ -77,6 +99,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements private boolean bindSourceMessage; + private BatchMode batchMode = BatchMode.MESSAGES; + public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "listenerContainer must not be null"); Assert.isNull(listenerContainer.getMessageListener(), @@ -124,8 +148,9 @@ public void setRecoveryCallback(RecoveryCallback recoveryCallback) { } /** - * Set a batching strategy to use when de-batching messages. - * Default is {@link SimpleBatchingStrategy}. + * Set a batching strategy to use when de-batching messages created by a batching + * producer (such as the BatchingRabbitTemplate). Default is + * {@link SimpleBatchingStrategy}. * @param batchingStrategy the strategy. * @since 5.2 */ @@ -144,6 +169,17 @@ public void setBindSourceMessage(boolean bindSourceMessage) { this.bindSourceMessage = bindSourceMessage; } + /** + * When the listener container is configured with consumerBatchEnabled, set the payload + * type for messages generated for the batches. Default is {@link BatchMode#MESSAGES}. + * @param batchMode the batch mode. + * @since 5.3 + */ + public void setBatchMode(BatchMode batchMode) { + Assert.notNull(batchMode, "'batchMode' cannot be null"); + this.batchMode = batchMode; + } + @Override public String getComponentType() { return "amqp:inbound-channel-adapter"; @@ -156,7 +192,13 @@ protected void onInit() { + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to " + "send an error message when retries are exhausted"); } - Listener messageListener = new Listener(); + Listener messageListener; + if (this.messageListenerContainer.isConsumerBatchEnabled()) { + messageListener = new BatchListener(); + } + else { + messageListener = new Listener(); + } this.messageListenerContainer.setMessageListener(messageListener); this.messageListenerContainer.afterPropertiesSet(); super.onInit(); @@ -193,7 +235,7 @@ public int afterShutdown() { * @param message the Spring Messaging message to use. * @since 4.3.10 */ - private void setAttributesIfNecessary(Message amqpMessage, org.springframework.messaging.Message message) { + private void setAttributesIfNecessary(Object amqpMessage, org.springframework.messaging.Message message) { boolean needHolder = getErrorChannel() != null && this.retryTemplate == null; boolean needAttributes = needHolder || this.retryTemplate != null; if (needHolder) { @@ -223,22 +265,33 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag protected class Listener implements ChannelAwareMessageListener { + protected final MessageConverter converter = AmqpInboundChannelAdapter.this.messageConverter; // NOSONAR + + protected final boolean manualAcks = AcknowledgeMode.MANUAL == + AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode(); // NNOSONAR + + protected final RetryOperations retryOps = AmqpInboundChannelAdapter.this.retryTemplate; // NOSONAR + + protected final RecoveryCallback recoverer = AmqpInboundChannelAdapter.this.recoveryCallback; // NOSONAR + + protected Listener() { + } + @Override public void onMessage(final Message message, final Channel channel) { - boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null; try { - if (retryDisabled) { + if (this.retryOps == null) { createAndSend(message, channel); } else { final org.springframework.messaging.Message toSend = createMessage(message, channel); - AmqpInboundChannelAdapter.this.retryTemplate.execute( + this.retryOps.execute( context -> { StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet(); setAttributesIfNecessary(message, toSend); sendMessage(toSend); return null; - }, AmqpInboundChannelAdapter.this.recoveryCallback); + }, this.recoverer); } } catch (MessageConversionException e) { @@ -246,14 +299,14 @@ public void onMessage(final Message message, final Channel channel) { setAttributesIfNecessary(message, null); getMessagingTemplate() .send(getErrorChannel(), buildErrorMessage(null, - EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e))); + EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e))); } else { throw e; } } finally { - if (retryDisabled) { + if (this.retryOps == null) { ATTRIBUTES_HOLDER.remove(); } } @@ -265,38 +318,139 @@ private void createAndSend(Message message, Channel channel) { sendMessage(messagingMessage); } - private org.springframework.messaging.Message createMessage(Message message, Channel channel) { + protected org.springframework.messaging.Message createMessage(Message message, Channel channel) { + Object payload = convertPayload(message); + Map headers = AmqpInboundChannelAdapter.this.headerMapper + .toHeadersFromRequest(message.getMessageProperties()); + if (AmqpInboundChannelAdapter.this.bindSourceMessage) { + headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message); + } + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + return finalize(channel, payload, headers, deliveryTag); + } + + protected Object convertPayload(Message message) { Object payload; if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) { List payloads = new ArrayList<>(); AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads - .add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment))); + .add(this.converter.fromMessage(fragment))); payload = payloads; } else { - payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message); + payload = this.converter.fromMessage(message); } - Map headers = AmqpInboundChannelAdapter.this.headerMapper - .toHeadersFromRequest(message.getMessageProperties()); - if (isManualAck()) { - headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag()); + return payload; + } + + protected org.springframework.messaging.Message finalize(Channel channel, Object payload, + Map headers, long deliveryTag) { + + if (this.manualAcks) { + headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag); headers.put(AmqpHeaders.CHANNEL, channel); } if (AmqpInboundChannelAdapter.this.retryTemplate != null) { headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger()); } - if (AmqpInboundChannelAdapter.this.bindSourceMessage) { - headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message); - } return getMessageBuilderFactory() .withPayload(payload) .copyHeaders(headers) .build(); } - private boolean isManualAck() { - return AcknowledgeMode.MANUAL == - AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode(); + } + + protected class BatchListener extends Listener implements ChannelAwareBatchMessageListener { + + private final boolean batchModeMessages = BatchMode.MESSAGES.equals(AmqpInboundChannelAdapter.this.batchMode); + + @Override + public void onMessageBatch(List messages, Channel channel) { + List converted; + if (this.batchModeMessages) { + converted = convertMessages(messages, channel); + } + else { + converted = convertPayloads(messages, channel); + } + if (converted != null) { + org.springframework.messaging.Message message = finalize(channel, converted, new HashMap<>(), + messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag()); + try { + if (this.retryOps == null) { + setAttributesIfNecessary(messages, message); + sendMessage(message); + } + else { + this.retryOps.execute( + context -> { + StaticMessageHeaderAccessor.getDeliveryAttempt(message).incrementAndGet(); + if (this.batchModeMessages) { + @SuppressWarnings("unchecked") + List> payloads = + (List>) message.getPayload(); + payloads.forEach(payload -> StaticMessageHeaderAccessor + .getDeliveryAttempt(payload).incrementAndGet()); + } + setAttributesIfNecessary(messages, message); + sendMessage(message); + return null; + }, this.recoverer); + } + } + finally { + if (this.retryOps == null) { + ATTRIBUTES_HOLDER.remove(); + } + } + } + } + + private List> convertMessages(List messages, + Channel channel) { + + List> converted = new ArrayList<>(); + try { + messages.forEach(message -> { + converted.add(createMessage(message, channel)); + }); + return converted; + } + catch (MessageConversionException e) { + if (getErrorChannel() != null) { + setAttributesIfNecessary(messages, null); + getMessagingTemplate() + .send(getErrorChannel(), buildErrorMessage(null, + EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e))); + } + else { + throw e; + } + } + return null; + } + + private List convertPayloads(List messages, Channel channel) { + List converted = new ArrayList<>(); + try { + messages.forEach(message -> { + converted.add(this.converter.fromMessage(message)); + }); + return converted; + } + catch (MessageConversionException e) { + if (getErrorChannel() != null) { + setAttributesIfNecessary(messages, null); + getMessagingTemplate() + .send(getErrorChannel(), buildErrorMessage(null, + EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e))); + } + else { + throw e; + } + } + return null; } } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java index 57d93bf6a12..02574651019 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/EndpointUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.amqp.support; +import java.util.List; + import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; @@ -50,9 +52,29 @@ public static ListenerExecutionFailedException errorMessagePayload(Message messa Channel channel, boolean isManualAck, Exception ex) { return isManualAck - ? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, ex, message, channel, - message.getMessageProperties().getDeliveryTag()) + ? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, ex, channel, + message.getMessageProperties().getDeliveryTag(), message) : new ListenerExecutionFailedException(LEFE_MESSAGE, ex, message); } + /** + * Return an {@link ListenerExecutionFailedException} or a {@link ManualAckListenerExecutionFailedException} + * depending on whether isManualAck is false or true. + * @param messages the failed messages. + * @param channel the channel. + * @param isManualAck true if the container uses manual acknowledgment. + * @param ex the exception. + * @return the exception. + * @since 5.3 + */ + public static ListenerExecutionFailedException errorMessagePayload(List messages, + Channel channel, boolean isManualAck, Exception ex) { + + return isManualAck + ? new ManualAckListenerExecutionFailedException(LEFE_MESSAGE, ex, channel, + messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), + messages.toArray(new Message[0])) + : new ListenerExecutionFailedException(LEFE_MESSAGE, ex, messages.toArray(new Message[0])); + } + } diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java index f565c7835ae..74854ffc07c 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/support/ManualAckListenerExecutionFailedException.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,17 @@ public class ManualAckListenerExecutionFailedException extends ListenerExecution private final long deliveryTag; + /** + * Construct an instance with the provided properties. + * @param msg the exception message. + * @param cause the cause. + * @param failedMessage the failed message. + * @param channel the channel. + * @param deliveryTag the delivery tag for the message. + * @deprecated in favor of + * {@link #ManualAckListenerExecutionFailedException(String, Throwable, Channel, long, Message...)}. + */ + @Deprecated public ManualAckListenerExecutionFailedException(String msg, Throwable cause, Message failedMessage, Channel channel, long deliveryTag) { @@ -46,10 +57,35 @@ public ManualAckListenerExecutionFailedException(String msg, Throwable cause, Me this.deliveryTag = deliveryTag; } + /** + * Construct an instance with the provided properties. + * @param msg the exception message. + * @param cause the cause. + * @param channel the channel. + * @param deliveryTag the delivery tag for the last message. + * @param failedMessages the failed message(s). + * @since 5.3 + */ + public ManualAckListenerExecutionFailedException(String msg, Throwable cause, + Channel channel, long deliveryTag, Message... failedMessages) { + + super(msg, cause, failedMessages); + this.channel = channel; + this.deliveryTag = deliveryTag; + } + + /** + * Return the channel. + * @return the channel. + */ public Channel getChannel() { return this.channel; } + /** + * Return the delivery tag for the last failed message. + * @return the tag. + */ public long getDeliveryTag() { return this.deliveryTag; } diff --git a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd index 7df60e4f73b..770522f3a6a 100644 --- a/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd +++ b/spring-integration-amqp/src/main/resources/org/springframework/integration/amqp/config/spring-integration-amqp.xsd @@ -124,6 +124,21 @@ + + + + + When the listener container's 'consumerBatchEnabled' property is true, + this attribute determines the payload type. 'MESSAGES' (default) means + the payload will be a list of messages; 'EXTRACT_PAYLOADS' means the payload + will be a list of converted payloads. + + + + + + + @@ -1157,4 +1172,11 @@ standard headers to also be mapped. To map all non-standard headers the 'NON_STA + + + + + + + diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml index e05f83d4022..dc39073d6f6 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests-context.xml @@ -19,7 +19,8 @@ NONE - + diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java index 1e011bc907e..0bedb2dd5b5 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,6 +34,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; +import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.test.util.TestUtils; @@ -68,6 +69,8 @@ public void verifyIdAsChannel() { .isTrue(); assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer")) .isInstanceOf(SimpleMessageListenerContainer.class); + assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class)) + .isEqualTo(BatchMode.EXTRACT_PAYLOADS); } @Test @@ -79,6 +82,8 @@ public void verifyDMCC() { assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer")) .isInstanceOf(DirectMessageListenerContainer.class); assertThat(TestUtils.getPropertyValue(adapter, "messageListenerContainer.consumersPerQueue")).isEqualTo(2); + assertThat(TestUtils.getPropertyValue(adapter, "batchMode", BatchMode.class)) + .isEqualTo(BatchMode.MESSAGES); } @Test diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java index 3347ee7e5d6..8109278375a 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/dsl/AmqpTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2019 the original author or authors. + * Copyright 2014-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,6 +50,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.amqp.channel.AbstractAmqpChannel; +import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; import org.springframework.integration.amqp.inbound.AmqpInboundGateway; import org.springframework.integration.amqp.support.AmqpHeaderMapper; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; @@ -145,12 +146,15 @@ public void testAmqpInboundGatewayFlow() { @Test public void testAmqpOutboundFlow() { - this.amqpOutboundInput.send(MessageBuilder.withPayload("hello through the amqp") + this.amqpOutboundInput.send(MessageBuilder.withPayload("one") + .setHeader("routingKey", "si.dsl.test") + .build()); + this.amqpOutboundInput.send(MessageBuilder.withPayload("two") .setHeader("routingKey", "si.dsl.test") .build()); Message receive = this.amqpReplyChannel.receive(10000); assertThat(receive).isNotNull(); - assertThat(receive.getPayload()).isEqualTo("HELLO THROUGH THE AMQP"); + assertThat(receive.getPayload()).isEqualTo("[ONE, TWO]"); ((Lifecycle) this.amqpOutboundInput).stop(); } @@ -343,7 +347,10 @@ public Queue amqpReplyChannel() { @Bean public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory) { return IntegrationFlows.from(Amqp.inboundAdapter(rabbitConnectionFactory, fooQueue()) - .id("amqpInboundFlowAdapter")) + .configureContainer(container -> container.consumerBatchEnabled(true) + .batchSize(2)) + .batchMode(BatchMode.EXTRACT_PAYLOADS) + .id("amqpInboundFlowAdapter")) .transform(String.class, String::toUpperCase) .channel(Amqp.pollableChannel(rabbitConnectionFactory) .queueName("amqpReplyChannel") diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java index 2dacfc76276..d28235902f2 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2019 the original author or authors. + * Copyright 2013-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -44,6 +45,7 @@ import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException; import org.springframework.amqp.support.AmqpHeaders; @@ -53,6 +55,7 @@ import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.StaticMessageHeaderAccessor; +import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode; import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy; import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper; import org.springframework.integration.amqp.support.ManualAckListenerExecutionFailedException; @@ -262,6 +265,7 @@ public Object fromMessage(org.springframework.amqp.core.Message message) throws assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + adapter.afterPropertiesSet(); // ack mode is now captured during init Channel channel = mock(Channel.class); ((ChannelAwareMessageListener) container.getMessageListener()) .onMessage(message, channel); @@ -388,7 +392,7 @@ public void testRetryWithinOnMessageGateway() throws Exception { @SuppressWarnings({ "unchecked" }) @Test - public void testBatchdAdapter() throws Exception { + public void testBatchAdapter() throws Exception { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); container.setDeBatchingEnabled(false); AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); @@ -405,7 +409,7 @@ public void testBatchdAdapter() throws Exception { message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties); MessageBatch batched = bs.addToBatch("foo", "bar", message); listener.onMessage(batched.getMessage(), null); - Message received = out.receive(); + Message received = out.receive(0); assertThat(received).isNotNull(); assertThat(((List) received.getPayload())).contains("test1", "test2"); } @@ -430,13 +434,217 @@ public void testBatchGateway() throws Exception { message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties); MessageBatch batched = bs.addToBatch("foo", "bar", message); listener.onMessage(batched.getMessage(), null); - Message received = out.receive(); + Message received = out.receive(0); assertThat(received).isNotNull(); assertThat(((List) received.getPayload())).contains("test1", "test2"); org.springframework.amqp.core.Message sourceData = StaticMessageHeaderAccessor.getSourceData(received); assertThat(sourceData).isSameAs(batched.getMessage()); } + @SuppressWarnings({ "unchecked" }) + @Test + public void testConsumerBatchExtract() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); + container.setConsumerBatchEnabled(true); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + QueueChannel out = new QueueChannel(); + adapter.setOutputChannel(out); + adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS); + adapter.afterPropertiesSet(); + ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener(); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + List messages = new ArrayList<>(); + messages.add(new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties)); + messages.add(new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties)); + listener.onMessageBatch(messages, null); + Message received = out.receive(0); + assertThat(received).isNotNull(); + assertThat(((List) received.getPayload())).contains("test1", "test2"); + } + + @SuppressWarnings({ "unchecked" }) + @Test + public void testConsumerBatch() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); + container.setConsumerBatchEnabled(true); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + QueueChannel out = new QueueChannel(); + adapter.setOutputChannel(out); + adapter.afterPropertiesSet(); + ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener(); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + List messages = new ArrayList<>(); + messages.add(new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties)); + messages.add(new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties)); + listener.onMessageBatch(messages, null); + Message received = out.receive(0); + assertThat(received).isNotNull(); + assertThat(((List>) received.getPayload())) + .extracting(message -> message.getPayload()) + .contains("test1", "test2"); + } + + @Test + public void testAdapterConversionErrorConsumerBatchExtract() throws Exception { + Connection connection = mock(Connection.class); + doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean()); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + when(connectionFactory.createConnection()).thenReturn(connection); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setConsumerBatchEnabled(true); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + QueueChannel outputChannel = new QueueChannel(); + adapter.setOutputChannel(outputChannel); + QueueChannel errorChannel = new QueueChannel(); + adapter.setErrorChannel(errorChannel); + adapter.setMessageConverter(new SimpleMessageConverter() { + + @Override + public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException { + throw new MessageConversionException("intended"); + } + + }); + adapter.setBatchMode(BatchMode.EXTRACT_PAYLOADS); + adapter.afterPropertiesSet(); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(42L); + List messages = new ArrayList<>(); + messages.add(new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties)); + messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(43L); + messages.add(new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties)); + ((ChannelAwareBatchMessageListener) container.getMessageListener()) + .onMessageBatch(messages, null); + assertThat(outputChannel.receive(0)).isNull(); + Message received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class); + + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + adapter.afterPropertiesSet(); // ack mode is now captured during init + Channel channel = mock(Channel.class); + ((ChannelAwareBatchMessageListener) container.getMessageListener()) + .onMessageBatch(messages, channel); + assertThat(outputChannel.receive(0)).isNull(); + received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class); + ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received + .getPayload(); + assertThat(ex.getChannel()).isEqualTo(channel); + assertThat(ex.getDeliveryTag()).isEqualTo(43L); + } + + @Test + public void testAdapterConversionErrorConsumerBatch() throws Exception { + Connection connection = mock(Connection.class); + doAnswer(invocation -> mock(Channel.class)).when(connection).createChannel(anyBoolean()); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + when(connectionFactory.createConnection()).thenReturn(connection); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.setConsumerBatchEnabled(true); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + QueueChannel outputChannel = new QueueChannel(); + adapter.setOutputChannel(outputChannel); + QueueChannel errorChannel = new QueueChannel(); + adapter.setErrorChannel(errorChannel); + adapter.setMessageConverter(new SimpleMessageConverter() { + + @Override + public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException { + throw new MessageConversionException("intended"); + } + + }); + adapter.afterPropertiesSet(); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(42L); + List messages = new ArrayList<>(); + messages.add(new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties)); + messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(43L); + messages.add(new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties)); + ((ChannelAwareBatchMessageListener) container.getMessageListener()) + .onMessageBatch(messages, null); + assertThat(outputChannel.receive(0)).isNull(); + Message received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload().getClass()).isEqualTo(ListenerExecutionFailedException.class); + + container.setAcknowledgeMode(AcknowledgeMode.MANUAL); + adapter.afterPropertiesSet(); // ack mode is now captured during init + Channel channel = mock(Channel.class); + ((ChannelAwareBatchMessageListener) container.getMessageListener()) + .onMessageBatch(messages, channel); + assertThat(outputChannel.receive(0)).isNull(); + received = errorChannel.receive(0); + assertThat(received).isNotNull(); + assertThat(received.getHeaders().get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE)).isNotNull(); + assertThat(received.getPayload()).isInstanceOf(ManualAckListenerExecutionFailedException.class); + ManualAckListenerExecutionFailedException ex = (ManualAckListenerExecutionFailedException) received + .getPayload(); + assertThat(ex.getChannel()).isEqualTo(channel); + assertThat(ex.getDeliveryTag()).isEqualTo(43L); + } + + @Test + public void testRetryWithinOnMessageAdapterConsumerBatch() throws Exception { + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); + container.setConsumerBatchEnabled(true); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + adapter.setOutputChannel(new DirectChannel()); + adapter.setRetryTemplate(new RetryTemplate()); + QueueChannel errors = new QueueChannel(); + ErrorMessageSendingRecoverer recoveryCallback = new ErrorMessageSendingRecoverer(errors); + recoveryCallback.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy()); + adapter.setRecoveryCallback(recoveryCallback); + adapter.afterPropertiesSet(); + ChannelAwareBatchMessageListener listener = (ChannelAwareBatchMessageListener) container.getMessageListener(); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(42L); + List messages = new ArrayList<>(); + messages.add(new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties)); + messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + messageProperties.setDeliveryTag(43L); + messages.add(new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties)); + listener.onMessageBatch(messages, null); + Message errorMessage = errors.receive(0); + assertThat(errorMessage).isNotNull(); + assertThat(errorMessage.getPayload()).isInstanceOf(MessagingException.class); + MessagingException payload = (MessagingException) errorMessage.getPayload(); + assertThat(payload.getMessage()).contains("Dispatcher has no"); + assertThat(StaticMessageHeaderAccessor.getDeliveryAttempt(payload.getFailedMessage()).get()).isEqualTo(3); + @SuppressWarnings("unchecked") + List amqpMessages = errorMessage.getHeaders() + .get(AmqpMessageHeaderErrorMessageStrategy.AMQP_RAW_MESSAGE, + List.class); + assertThat(amqpMessages).isNotNull(); + assertThat(amqpMessages).hasSize(2); + @SuppressWarnings("unchecked") + List> msgs = (List>) payload.getFailedMessage().getPayload(); + assertThat(msgs).hasSize(2); + assertThat(msgs).extracting(msg -> StaticMessageHeaderAccessor.getDeliveryAttempt(msg).get()) + .contains(3, 3); + assertThat(msgs).extracting(msg -> msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class)) + .contains(42L, 43L); + assertThat(errors.receive(0)).isNull(); + } + public static class Foo { private String bar; diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 679a76f4bfa..cb4bba12813 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -76,7 +76,8 @@ The following listing shows the possible configuration options for an AMQP Inbou transaction-attribute="" <23> transaction-manager="" <24> tx-size="" <25> - consumers-per-queue /> <26> + consumers-per-queue <26> + batch-mode="MESSAGES"/> <27> ---- <1> The unique ID for this adapter. @@ -167,6 +168,9 @@ Not allowed when 'consumers-per-queue' is set. Optional (defaults to `1`). <26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`. See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information. +<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload. +When set to `MESSAGES` (default), the payload is a `List>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`. +When set to `EXTRACT_PAYLOADS`, the payload is a `List` where the elements are converted from the AMQP `Message` body. ==== [NOTE] diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index be8a4600f63..9b2382da316 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -59,6 +59,9 @@ See <<./aggregator.adoc#aggregator-api,Aggregator Programming Model>> for more i The outbound channel adapter has a new property `multiSend` allowing multiple messages to be sent within the scope of one `RabbitTemplate` invocation. See <<./amqp.adoc/amqp-outbound-channel-adapter,AMQP Outbound Channel Adapter>> for more information. +The inbound channel adapter now supports a listener container with the `consumerBatchEnabled` property set to `true`. +See <<./amqp.adoc/amqp-inbound-channel-adapter,AMQP Inbound Channel Adapter>> + [[x5.3-http]] === HTTP Changes