Skip to content

GH-3172: Support consumer-side batching #3173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ ext {
rsocketVersion = '1.0.0-RC6'
servletApiVersion = '4.0.1'
smackVersion = '4.3.4'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.3.RELEASE'
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '2.2.4.BUILD-SNAPSHOT'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you will move Spring AMQP 2.2.4 release to February 12th: https://github.com/spring-projects/spring-integration/milestone/23.

Or this PR will be placed on hold until February 26th.

But we can't release the next Spring Integration milestone without proper dependencies.

Thanks for understanding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the release to 2/12.

springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : 'Neumann-BUILD-SNAPSHOT'
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '5.3.0.RC1'
springRetryVersion = '1.2.5.RELEASE'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "phase");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "batch-mode");
configureChannels(element, parserContext, builder);
AbstractBeanDefinition adapterBeanDefinition = builder.getRawBeanDefinition();
adapterBeanDefinition.setResource(parserContext.getReaderContext().getResource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.function.Consumer;

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.BatchMode;

/**
* Spec for an inbound channel adapter with a {@link SimpleMessageListenerContainer}.
Expand All @@ -41,4 +42,15 @@ public AmqpInboundChannelAdapterSMLCSpec configureContainer(Consumer<SimpleMessa
return this;
}

/**
* Set the {@link BatchMode} to use when the container is configured to support
* batching consumed records.
* @param batchMode the batch mode.
* @return the spec.
* @since 5.3
*/
public AmqpInboundChannelAdapterSMLCSpec batchMode(BatchMode batchMode) {
this.target.setBatchMode(batchMode);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,15 @@ public SimpleMessageListenerContainerSpec batchSize(int batchSize) {
return this;
}

/**
* Set to true to enable batching of consumed messages.
* @param enabled true to enable.
* @return the spec.
* @since 5.3
*/
public SimpleMessageListenerContainerSpec consumerBatchEnabled(boolean enabled) {
this.listenerContainer.setConsumerBatchEnabled(enabled);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.amqp.inbound;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -26,6 +27,7 @@
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
Expand All @@ -42,6 +44,7 @@
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
Expand All @@ -61,6 +64,25 @@
public class AmqpInboundChannelAdapter extends MessageProducerSupport implements
OrderlyShutdownCapable {

/**
* Defines the payload type when the listener container is configured with consumerBatchEnabled.
*/
public enum BatchMode {

/**
* Payload is a {@code List<Message<?>>} where each element is a message is
* converted from the Spring AMQP Message.
*/
MESSAGES,

/**
* Payload is a {@code List<?>} where each element is the converted body of the
* Spring AMQP Message.
*/
EXTRACT_PAYLOADS

}

private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();

private final AbstractMessageListenerContainer messageListenerContainer;
Expand All @@ -77,6 +99,8 @@ public class AmqpInboundChannelAdapter extends MessageProducerSupport implements

private boolean bindSourceMessage;

private BatchMode batchMode = BatchMode.MESSAGES;

public AmqpInboundChannelAdapter(AbstractMessageListenerContainer listenerContainer) {
Assert.notNull(listenerContainer, "listenerContainer must not be null");
Assert.isNull(listenerContainer.getMessageListener(),
Expand Down Expand Up @@ -124,8 +148,9 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
}

/**
* Set a batching strategy to use when de-batching messages.
* Default is {@link SimpleBatchingStrategy}.
* Set a batching strategy to use when de-batching messages created by a batching
* producer (such as the BatchingRabbitTemplate). Default is
* {@link SimpleBatchingStrategy}.
* @param batchingStrategy the strategy.
* @since 5.2
*/
Expand All @@ -144,6 +169,17 @@ public void setBindSourceMessage(boolean bindSourceMessage) {
this.bindSourceMessage = bindSourceMessage;
}

/**
* When the listener container is configured with consumerBatchEnabled, set the payload
* type for messages generated for the batches. Default is {@link BatchMode#MESSAGES}.
* @param batchMode the batch mode.
* @since 5.3
*/
public void setBatchMode(BatchMode batchMode) {
Assert.notNull(batchMode, "'batchMode' cannot be null");
this.batchMode = batchMode;
}

@Override
public String getComponentType() {
return "amqp:inbound-channel-adapter";
Expand All @@ -156,7 +192,13 @@ protected void onInit() {
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
}
Listener messageListener = new Listener();
Listener messageListener;
if (this.messageListenerContainer.isConsumerBatchEnabled()) {
messageListener = new BatchListener();
}
else {
messageListener = new Listener();
}
this.messageListenerContainer.setMessageListener(messageListener);
this.messageListenerContainer.afterPropertiesSet();
super.onInit();
Expand Down Expand Up @@ -193,7 +235,7 @@ public int afterShutdown() {
* @param message the Spring Messaging message to use.
* @since 4.3.10
*/
private void setAttributesIfNecessary(Message amqpMessage, org.springframework.messaging.Message<?> message) {
private void setAttributesIfNecessary(Object amqpMessage, org.springframework.messaging.Message<?> message) {
boolean needHolder = getErrorChannel() != null && this.retryTemplate == null;
boolean needAttributes = needHolder || this.retryTemplate != null;
if (needHolder) {
Expand Down Expand Up @@ -223,37 +265,48 @@ protected AttributeAccessor getErrorMessageAttributes(org.springframework.messag

protected class Listener implements ChannelAwareMessageListener {

protected final MessageConverter converter = AmqpInboundChannelAdapter.this.messageConverter; // NOSONAR

protected final boolean manualAcks = AcknowledgeMode.MANUAL ==
AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode(); // NNOSONAR

protected final RetryOperations retryOps = AmqpInboundChannelAdapter.this.retryTemplate; // NOSONAR

protected final RecoveryCallback<?> recoverer = AmqpInboundChannelAdapter.this.recoveryCallback; // NOSONAR

protected Listener() {
}

@Override
public void onMessage(final Message message, final Channel channel) {
boolean retryDisabled = AmqpInboundChannelAdapter.this.retryTemplate == null;
try {
if (retryDisabled) {
if (this.retryOps == null) {
createAndSend(message, channel);
}
else {
final org.springframework.messaging.Message<Object> toSend = createMessage(message, channel);
AmqpInboundChannelAdapter.this.retryTemplate.execute(
this.retryOps.execute(
context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(toSend).incrementAndGet();
setAttributesIfNecessary(message, toSend);
sendMessage(toSend);
return null;
}, AmqpInboundChannelAdapter.this.recoveryCallback);
}, this.recoverer);
}
}
catch (MessageConversionException e) {
if (getErrorChannel() != null) {
setAttributesIfNecessary(message, null);
getMessagingTemplate()
.send(getErrorChannel(), buildErrorMessage(null,
EndpointUtils.errorMessagePayload(message, channel, isManualAck(), e)));
EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e)));
}
else {
throw e;
}
}
finally {
if (retryDisabled) {
if (this.retryOps == null) {
ATTRIBUTES_HOLDER.remove();
}
}
Expand All @@ -265,38 +318,139 @@ private void createAndSend(Message message, Channel channel) {
sendMessage(messagingMessage);
}

private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
protected org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = convertPayload(message);
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
}
long deliveryTag = message.getMessageProperties().getDeliveryTag();
return finalize(channel, payload, headers, deliveryTag);
}

protected Object convertPayload(Message message) {
Object payload;
if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
List<Object> payloads = new ArrayList<>();
AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads
.add(AmqpInboundChannelAdapter.this.messageConverter.fromMessage(fragment)));
.add(this.converter.fromMessage(fragment)));
payload = payloads;
}
else {
payload = AmqpInboundChannelAdapter.this.messageConverter.fromMessage(message);
payload = this.converter.fromMessage(message);
}
Map<String, Object> headers = AmqpInboundChannelAdapter.this.headerMapper
.toHeadersFromRequest(message.getMessageProperties());
if (isManualAck()) {
headers.put(AmqpHeaders.DELIVERY_TAG, message.getMessageProperties().getDeliveryTag());
return payload;
}

protected org.springframework.messaging.Message<Object> finalize(Channel channel, Object payload,
Map<String, Object> headers, long deliveryTag) {

if (this.manualAcks) {
headers.put(AmqpHeaders.DELIVERY_TAG, deliveryTag);
headers.put(AmqpHeaders.CHANNEL, channel);
}
if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
headers.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
}
if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
headers.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
}
return getMessageBuilderFactory()
.withPayload(payload)
.copyHeaders(headers)
.build();
}

private boolean isManualAck() {
return AcknowledgeMode.MANUAL ==
AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode();
}

protected class BatchListener extends Listener implements ChannelAwareBatchMessageListener {

private final boolean batchModeMessages = BatchMode.MESSAGES.equals(AmqpInboundChannelAdapter.this.batchMode);

@Override
public void onMessageBatch(List<Message> messages, Channel channel) {
List<?> converted;
if (this.batchModeMessages) {
converted = convertMessages(messages, channel);
}
else {
converted = convertPayloads(messages, channel);
}
if (converted != null) {
org.springframework.messaging.Message<?> message = finalize(channel, converted, new HashMap<>(),
messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag());
try {
if (this.retryOps == null) {
setAttributesIfNecessary(messages, message);
sendMessage(message);
}
else {
this.retryOps.execute(
context -> {
StaticMessageHeaderAccessor.getDeliveryAttempt(message).incrementAndGet();
if (this.batchModeMessages) {
@SuppressWarnings("unchecked")
List<org.springframework.messaging.Message<?>> payloads =
(List<org.springframework.messaging.Message<?>>) message.getPayload();
payloads.forEach(payload -> StaticMessageHeaderAccessor
.getDeliveryAttempt(payload).incrementAndGet());
}
setAttributesIfNecessary(messages, message);
sendMessage(message);
return null;
}, this.recoverer);
}
}
finally {
if (this.retryOps == null) {
ATTRIBUTES_HOLDER.remove();
}
}
}
}

private List<org.springframework.messaging.Message<?>> convertMessages(List<Message> messages,
Channel channel) {

List<org.springframework.messaging.Message<?>> converted = new ArrayList<>();
try {
messages.forEach(message -> {
converted.add(createMessage(message, channel));
});
return converted;
}
catch (MessageConversionException e) {
if (getErrorChannel() != null) {
setAttributesIfNecessary(messages, null);
getMessagingTemplate()
.send(getErrorChannel(), buildErrorMessage(null,
EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e)));
}
else {
throw e;
}
}
return null;
}

private List<?> convertPayloads(List<Message> messages, Channel channel) {
List<Object> converted = new ArrayList<>();
try {
messages.forEach(message -> {
converted.add(this.converter.fromMessage(message));
});
return converted;
}
catch (MessageConversionException e) {
if (getErrorChannel() != null) {
setAttributesIfNecessary(messages, null);
getMessagingTemplate()
.send(getErrorChannel(), buildErrorMessage(null,
EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, e)));
}
else {
throw e;
}
}
return null;
}

}
Expand Down
Loading