Skip to content

GH-2931: AMQP De-Batching as List<?> Payload #2932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package org.springframework.integration.amqp.inbound;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
Expand Down Expand Up @@ -69,6 +73,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements

private RecoveryCallback<? extends Object> recoveryCallback;

private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);

public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
Assert.notNull(listenerContainer, "listenerContainer must not be null");
Assert.isNull(listenerContainer.getMessageListener(),
Expand Down Expand Up @@ -115,6 +121,16 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
this.recoveryCallback = recoveryCallback;
}

/**
* Set a batching strategy to use when de-batching messages.
* Default is {@link SimpleBatchingStrategy}.
* @param batchingStrategy the strategy.
* @since 5.2
*/
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
this.batchingStrategy = batchingStrategy;
}

@Override
public String getComponentType() {
Expand Down Expand Up @@ -239,7 +255,16 @@ private void createAndSend(Message message, Channel channel) {
}

private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
Object payload;
if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
List<Object> payloads = new ArrayList<>();
AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads
.add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment)));
payload = payloads;
}
else {
payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
}
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (isManualAck()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.integration.amqp.inbound;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -25,6 +27,8 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
Expand Down Expand Up @@ -81,6 +85,8 @@ public class AmqpInboundGateway extends MessagingGatewaySupport {

private RecoveryCallback<? extends Object> recoveryCallback;

private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);

public AmqpInboundGateway(AbstractMessageListenerContainer listenerContainer) {
this(listenerContainer, new RabbitTemplate(listenerContainer.getConnectionFactory()), false);
}
Expand Down Expand Up @@ -175,6 +181,17 @@ public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallb
this.recoveryCallback = recoveryCallback;
}

/**
* Set a batching strategy to use when de-batching messages.
* Default is {@link SimpleBatchingStrategy}.
* @param batchingStrategy the strategy.
* @since 5.2
*/
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
this.batchingStrategy = batchingStrategy;
}

@Override
public String getComponentType() {
return "amqp:inbound-gateway";
Expand Down Expand Up @@ -286,7 +303,15 @@ private org.springframework.messaging.Message<Object> convert(Message message, C
boolean isManualAck = AmqpInboundGateway.this.messageListenerContainer
.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
try {
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
if (AmqpInboundGateway.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
List<Object> payloads = new ArrayList<>();
AmqpInboundGateway.this.batchingStrategy.deBatch(message, fragment -> payloads
.add(AmqpInboundGateway.this.amqpMessageConverter.fromMessage(fragment)));
payload = payloads;
}
else {
payload = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
}
headers = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
if (isManualAck) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.RabbitUtils;
Expand Down Expand Up @@ -71,6 +75,8 @@ public class AmqpMessageSource extends AbstractMessageSource<Object> {

private boolean rawMessageHeader;

private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);

public AmqpMessageSource(ConnectionFactory connectionFactory, String queue) {
this(connectionFactory, new AmqpAckCallbackFactory(), queue);
}
Expand Down Expand Up @@ -151,6 +157,21 @@ public void setRawMessageHeader(boolean rawMessageHeader) {
this.rawMessageHeader = rawMessageHeader;
}

protected BatchingStrategy getBatchingStrategy() {
return this.batchingStrategy;
}

/**
* Set a batching strategy to use when de-batching messages.
* Default is {@link SimpleBatchingStrategy}.
* @param batchingStrategy the strategy.
* @since 5.2
*/
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
this.batchingStrategy = batchingStrategy;
}

@Override
public String getComponentType() {
return "amqp:message-source";
Expand All @@ -174,7 +195,16 @@ protected AbstractIntegrationMessageBuilder<Object> doReceive() {
messageProperties.setConsumerQueue(this.queue);
Map<String, Object> headers = this.headerMapper.toHeadersFromRequest(messageProperties);
org.springframework.amqp.core.Message amqpMessage = new org.springframework.amqp.core.Message(resp.getBody(), messageProperties);
Object payload = this.messageConverter.fromMessage(amqpMessage);
Object payload;
if (this.batchingStrategy.canDebatch(messageProperties)) {
List<Object> payloads = new ArrayList<>();
this.batchingStrategy.deBatch(amqpMessage, fragment -> payloads
.add(this.messageConverter.fromMessage(fragment)));
payload = payloads;
}
else {
payload = this.messageConverter.fromMessage(amqpMessage);
}
AbstractIntegrationMessageBuilder<Object> builder = getMessageBuilderFactory().withPayload(payload)
.copyHeaders(headers)
.setHeader(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.junit.Test;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.MessageBatch;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.integration.StaticMessageHeaderAccessor;
Expand Down Expand Up @@ -123,4 +127,39 @@ private void testNackOrRequeue(boolean requeue) throws Exception {
verify(connection).close(30000);
}

@SuppressWarnings({ "unchecked" })
@Test
public void testBatch() throws Exception {
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
org.springframework.amqp.core.Message message =
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
bs.addToBatch("foo", "bar", message);
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
MessageBatch batched = bs.addToBatch("foo", "bar", message);

Channel channel = mock(Channel.class);
willReturn(true).given(channel).isOpen();
Envelope envelope = new Envelope(123L, false, "ex", "rk");
BasicProperties props = new BasicProperties.Builder()
.headers(batched.getMessage().getMessageProperties().getHeaders())
.contentType("text/plain")
.build();
GetResponse getResponse = new GetResponse(envelope, props, batched.getMessage().getBody(), 0);
willReturn(getResponse).given(channel).basicGet("foo", false);
Connection connection = mock(Connection.class);
willReturn(true).given(connection).isOpen();
willReturn(channel).given(connection).createChannel();
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
willReturn(connection).given(connectionFactory).newConnection((ExecutorService) isNull(), anyString());

CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory);
AmqpMessageSource source = new AmqpMessageSource(ccf, "foo");
Message<?> received = source.receive();
assertThat(received).isNotNull();
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,6 +37,8 @@

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.batch.MessageBatch;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
Expand Down Expand Up @@ -377,6 +380,54 @@ public void testRetryWithinOnMessageGateway() throws Exception {
assertThat(errors.receive(0)).isNull();
}

@SuppressWarnings({ "unchecked" })
@Test
public void testBatchdAdapter() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
container.setDeBatchingEnabled(false);
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
QueueChannel out = new QueueChannel();
adapter.setOutputChannel(out);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
org.springframework.amqp.core.Message message =
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
bs.addToBatch("foo", "bar", message);
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
MessageBatch batched = bs.addToBatch("foo", "bar", message);
listener.onMessage(batched.getMessage(), null);
Message<?> received = out.receive();
assertThat(received).isNotNull();
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
}

@SuppressWarnings({ "unchecked" })
@Test
public void testBatchGateway() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(mock(ConnectionFactory.class));
container.setDeBatchingEnabled(false);
AmqpInboundGateway adapter = new AmqpInboundGateway(container);
QueueChannel out = new QueueChannel();
adapter.setRequestChannel(out);
adapter.afterPropertiesSet();
ChannelAwareMessageListener listener = (ChannelAwareMessageListener) container.getMessageListener();
SimpleBatchingStrategy bs = new SimpleBatchingStrategy(2, 10_000, 10_000L);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
org.springframework.amqp.core.Message message =
new org.springframework.amqp.core.Message("test1".getBytes(), messageProperties);
bs.addToBatch("foo", "bar", message);
message = new org.springframework.amqp.core.Message("test2".getBytes(), messageProperties);
MessageBatch batched = bs.addToBatch("foo", "bar", message);
listener.onMessage(batched.getMessage(), null);
Message<?> received = out.receive();
assertThat(received).isNotNull();
assertThat(((List<String>) received.getPayload())).contains("test1", "test2");
}

public static class Foo {

private String bar;
Expand Down
26 changes: 26 additions & 0 deletions src/reference/asciidoc/amqp.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,22 @@ public class AmqpJavaApplication {
----
====

[[amqp-debatching]]
==== Batched Messages

See https://docs.spring.io/spring-amqp/docs/current/reference/html/#template-batching[the Spring AMQP Documentation] for more information about batched messages.

To produce batched messages with Spring Integration, simply configure the outbound endpoint with a `BatchingRabbitTemplate`.

When receiving batched messages, by default, the listener containers extract each fragment message and the adapter will produce a `Message<?>` for each fragment.
Starting with version 5.2, if the container's `deBatchingEnabled` property is set to `false`, the de-batching is performed by the adapter instead, and a single `Message<List<?>>` is produced with the payload being a list of the fragment payloads (after conversion if appropriate).

The default `BatchingStrategy` is the `SimpleBatchingStrategy`, but this can be overridden on the adapter.

=== Polled Inbound Channel Adapter

==== Overview

Version 5.0.1 introduced a polled channel adapter, letting you fetch individual messages on demand -- for example, with a `MessageSourcePollingTemplate` or a poller.
See <<deferred-acks-message-source>> for more information.

Expand Down Expand Up @@ -315,6 +329,13 @@ public IntegrationFlow flow() {
----
====

[[amqp-polled-debatching]]
==== Batched Messages

See <<amqp-debatching>>.

For the polled adapter, there is no listener container, batched messages are always debatched (if the `BatchingStrategy` supports doing so).

[[amqp-inbound-gateway]]
=== Inbound Gateway

Expand Down Expand Up @@ -453,6 +474,11 @@ public class AmqpJavaApplication {
----
====

[[amqp-gatewway-debatching]]
==== Batched Messages

See <<amqp-debatching>>.

[[amqp-inbound-ack]]
=== Inbound Endpoint Acknowledge Mode

Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ See <<splitter>> for more information.
The outbound endpoints can now be configured to synthesize a "nack" if no publisher confirm is received within a timeout.
See <<amqp-outbound-endpoints>> for more information.

The inbound channel adapter can now receive batched messages as a `List<?>` payload instead of receiving a discrete message for each batch fragment.
See <<amqp-debatching>> for more information.

[[x5.2-file]]
==== File Changes

Expand Down