diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java index 4ba878c6b97..e911f9fa4f8 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.java @@ -16,11 +16,15 @@ package org.springframework.integration.amqp.inbound; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.batch.BatchingStrategy; +import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.AmqpHeaders; @@ -69,6 +73,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements private RecoveryCallback recoveryCallback; + private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L); + public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) { Assert.notNull(listenerContainer, "listenerContainer must not be null"); Assert.isNull(listenerContainer.getMessageListener(), @@ -115,6 +121,16 @@ public void setRecoveryCallback(RecoveryCallback recoveryCallb this.recoveryCallback = recoveryCallback; } + /** + * Set a batching strategy to use when de-batching messages. + * Default is {@link SimpleBatchingStrategy}. + * @param batchingStrategy the strategy. + * @since 5.2 + */ + public void setBatchingStrategy(BatchingStrategy batchingStrategy) { + Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null"); + this.batchingStrategy = batchingStrategy; + } @Override public String getComponentType() { @@ -239,7 +255,16 @@ private void createAndSend(Message message, Channel channel) { } private org.springframework.messaging.Message createMessage(Message message, Channel channel) { - Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message); + Object payload; + if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) { + List payloads = new ArrayList<>(); + AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads + .add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment))); + payload = payloads; + } + else { + payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message); + } Map headers = AmqpInboundChannelAdapter.this.headerMapper .toHeadersFromRequest(message.getMessageProperties()); if (isManualAck()) { diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java index bfcc0658419..42848afcf44 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpInboundGateway.java @@ -16,6 +16,8 @@ package org.springframework.integration.amqp.inbound; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +27,8 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.batch.BatchingStrategy; +import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; @@ -81,6 +85,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport { private RecoveryCallback recoveryCallback; + private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L); + public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) { this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false); } @@ -175,6 +181,17 @@ public void setRecoveryCallback(RecoveryCallback recoveryCallb this.recoveryCallback = recoveryCallback; } + /** + * Set a batching strategy to use when de-batching messages. + * Default is {@link SimpleBatchingStrategy}. + * @param batchingStrategy the strategy. + * @since 5.2 + */ + public void setBatchingStrategy(BatchingStrategy batchingStrategy) { + Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null"); + this.batchingStrategy = batchingStrategy; + } + @Override public String getComponentType() { return "amqp:inbound-gateway"; @@ -286,7 +303,15 @@ private org.springframework.messaging.Message convert(Message message, C boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer .getAcknowledgeMode() == AcknowledgeMode.MANUAL; try { - payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message); + if (AmqpInboundGateway.this.batchingStrategy.canDebatch(message.getMessageProperties())) { + List payloads = new ArrayList<>(); + AmqpInboundGateway.this.batchingStrategy.deBatch(message, fragment -> payloads + .add(AmqpInboundGateway.this.amqpMessageConverter.fromMessage(fragment))); + payload = payloads; + } + else { + payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message); + } headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties()); if (isManualAck) { headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag()); diff --git a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java index dcdcff28452..7c009f66431 100644 --- a/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java +++ b/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/inbound/AmqpMessageSource.java @@ -18,12 +18,16 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.batch.BatchingStrategy; +import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.RabbitUtils; @@ -71,6 +75,8 @@ public class AmqpMessageSource extends AbstractMessageSource { private boolean rawMessageHeader; + private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L); + public AmqpMessageSource(ConnectionFactory connectionFactory, String queue) { this(connectionFactory, new AmqpAckCallbackFactory(), queue); } @@ -151,6 +157,21 @@ public void setRawMessageHeader(boolean rawMessageHeader) { this.rawMessageHeader = rawMessageHeader; } + protected BatchingStrategy getBatchingStrategy() { + return this.batchingStrategy; + } + + /** + * Set a batching strategy to use when de-batching messages. + * Default is {@link SimpleBatchingStrategy}. + * @param batchingStrategy the strategy. + * @since 5.2 + */ + public void setBatchingStrategy(BatchingStrategy batchingStrategy) { + Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null"); + this.batchingStrategy = batchingStrategy; + } + @Override public String getComponentType() { return "amqp:message-source"; @@ -174,7 +195,16 @@ protected AbstractIntegrationMessageBuilder doReceive() { messageProperties.setConsumerQueue(this.queue); Map headers = this.headerMapper.toHeadersFromRequest(messageProperties); org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(resp.getBody(), messageProperties); - Object payload = this.messageConverter.fromMessage(amqpMessage); + Object payload; + if (this.batchingStrategy.canDebatch(messageProperties)) { + List payloads = new ArrayList<>(); + this.batchingStrategy.deBatch(amqpMessage, fragment -> payloads + .add(this.messageConverter.fromMessage(fragment))); + payload = payloads; + } + else { + payload = this.messageConverter.fromMessage(amqpMessage); + } AbstractIntegrationMessageBuilder builder = getMessageBuilderFactory().withPayload(payload) .copyHeaders(headers) .setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback); diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java index cc54da9cb9b..ba7b0272675 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/AmqpMessageSourceTests.java @@ -24,10 +24,14 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.List; import java.util.concurrent.ExecutorService; import org.junit.Test; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.batch.MessageBatch; +import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.integration.StaticMessageHeaderAccessor; @@ -123,4 +127,39 @@ private void testNackOrRequeue(boolean requeue) throws Exception { verify(connection).close(30000); } + @SuppressWarnings({ "unchecked" }) + @Test + public void testBatch() throws Exception { + SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + org.springframework.amqp.core.Message message = + new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties); + bs.addToBatch("foo", "bar", message); + message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties); + MessageBatch batched = bs.addToBatch("foo", "bar", message); + + Channel channel = mock(Channel.class); + willReturn(true).given(channel).isOpen(); + Envelope envelope = new Envelope(123L, false, "ex", "rk"); + BasicProperties props = new BasicProperties.Builder() + .headers(batched.getMessage().getMessageProperties().getHeaders()) + .contentType("text/plain") + .build(); + GetResponse getResponse = new GetResponse(envelope, props, batched.getMessage().getBody(), 0); + willReturn(getResponse).given(channel).basicGet("foo", false); + Connection connection = mock(Connection.class); + willReturn(true).given(connection).isOpen(); + willReturn(channel).given(connection).createChannel(); + ConnectionFactory connectionFactory = mock(ConnectionFactory.class); + willReturn(connection).given(connectionFactory).newConnection((ExecutorService) isNull(), anyString()); + + CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory); + AmqpMessageSource source = new AmqpMessageSource(ccf, "foo"); + Message received = source.receive(); + assertThat(received).isNotNull(); + assertThat(((List) received.getPayload())).contains("test1", "test2"); + } + + } diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java index 3ea415c355b..36083dc2266 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/inbound/InboundEndpointTests.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,6 +37,8 @@ import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.batch.MessageBatch; +import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -377,6 +380,54 @@ public void testRetryWithinOnMessageGateway() throws Exception { assertThat(errors.receive(0)).isNull(); } + @SuppressWarnings({ "unchecked" }) + @Test + public void testBatchdAdapter() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); + container.setDeBatchingEnabled(false); + AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); + QueueChannel out = new QueueChannel(); + adapter.setOutputChannel(out); + adapter.afterPropertiesSet(); + ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener(); + SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + org.springframework.amqp.core.Message message = + new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties); + bs.addToBatch("foo", "bar", message); + message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties); + MessageBatch batched = bs.addToBatch("foo", "bar", message); + listener.onMessage(batched.getMessage(), null); + Message received = out.receive(); + assertThat(received).isNotNull(); + assertThat(((List) received.getPayload())).contains("test1", "test2"); + } + + @SuppressWarnings({ "unchecked" }) + @Test + public void testBatchGateway() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class)); + container.setDeBatchingEnabled(false); + AmqpInboundGateway adapter = new AmqpInboundGateway(container); + QueueChannel out = new QueueChannel(); + adapter.setRequestChannel(out); + adapter.afterPropertiesSet(); + ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener(); + SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L); + MessageProperties messageProperties = new MessageProperties(); + messageProperties.setContentType("text/plain"); + org.springframework.amqp.core.Message message = + new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties); + bs.addToBatch("foo", "bar", message); + message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties); + MessageBatch batched = bs.addToBatch("foo", "bar", message); + listener.onMessage(batched.getMessage(), null); + Message received = out.receive(); + assertThat(received).isNotNull(); + assertThat(((List) received.getPayload())).contains("test1", "test2"); + } + public static class Foo { private String bar; diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 4e501a36cb1..c61923a0dd0 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -277,8 +277,22 @@ public class AmqpJavaApplication { ---- ==== +[[amqp-debatching]] +==== Batched Messages + +See https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching[the Spring AMQP Documentation] for more information about batched messages. + +To produce batched messages with Spring Integration, simply configure the outbound endpoint with a `BatchingRabbitTemplate`. + +When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a `Message` for each fragment. +Starting with version 5.2, if the container's `deBatchingEnabled` property is set to `false`, the de-batching is performed by the adapter instead, and a single `Message>` is produced with the payload being a list of the fragment payloads (after conversion if appropriate). + +The default `BatchingStrategy` is the `SimpleBatchingStrategy`, but this can be overridden on the adapter. + === Polled Inbound Channel Adapter +==== Overview + Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand -- for example, with a `MessageSourcePollingTemplate` or a poller. See <> for more information. @@ -315,6 +329,13 @@ public IntegrationFlow flow() { ---- ==== +[[amqp-polled-debatching]] +==== Batched Messages + +See <>. + +For the polled adapter, there is no listener container, batched messages are always debatched (if the `BatchingStrategy` supports doing so). + [[amqp-inbound-gateway]] === Inbound Gateway @@ -453,6 +474,11 @@ public class AmqpJavaApplication { ---- ==== +[[amqp-gatewway-debatching]] +==== Batched Messages + +See <>. + [[amqp-inbound-ack]] === Inbound Endpoint Acknowledge Mode diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 0dcb33cbcff..8e572988614 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -40,6 +40,9 @@ See <> for more information. The outbound endpoints can now be configured to synthesize a "nack" if no publisher confirm is received within a timeout. See <> for more information. +The inbound channel adapter can now receive batched messages as a `List` payload instead of receiving a discrete message for each batch fragment. +See <> for more information. + [[x5.2-file]] ==== File Changes