Skip to content

Commit 1269dc4

Browse files
committed
GH-922: Add Batch-mode @RabbitListener
Resolves #922 - remove unnecessary back-ticks from literal table columns
1 parent 45ec57f commit 1269dc4

File tree

16 files changed

+770
-106
lines changed

16 files changed

+770
-106
lines changed

spring-amqp/src/main/java/org/springframework/amqp/support/converter/MessagingMessageConverter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
8888
this.headerMapper = headerMapper;
8989
}
9090

91+
public AmqpHeaderMapper getHeaderMapper() {
92+
return this.headerMapper;
93+
}
94+
9195
@Override
9296
public void afterPropertiesSet() {
9397
Assert.notNull(this.payloadConverter, "Property 'payloadConverter' is required");

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/AbstractRabbitListenerContainerFactory.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.amqp.core.AcknowledgeMode;
3030
import org.springframework.amqp.core.MessagePostProcessor;
3131
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
32+
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
3233
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
3334
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
3435
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
@@ -119,6 +120,10 @@ public abstract class AbstractRabbitListenerContainerFactory<C extends AbstractM
119120

120121
private Consumer<C> containerConfigurer;
121122

123+
private boolean batchListener;
124+
125+
private BatchingStrategy batchingStrategy;
126+
122127
/**
123128
* @param connectionFactory The connection factory.
124129
* @see AbstractMessageListenerContainer#setConnectionFactory(ConnectionFactory)
@@ -344,6 +349,27 @@ public void setContainerConfigurer(Consumer<C> configurer) {
344349
this.containerConfigurer = configurer;
345350
}
346351

352+
/**
353+
* Set to true to receive a list of debatched messages that were created by a
354+
* {@link org.springframework.amqp.rabbit.core.BatchingRabbitTemplate}.
355+
* @param isBatch true for a batch listener.
356+
* @since 2.2
357+
* @see #setBatchingStrategy(BatchingStrategy)
358+
*/
359+
public void setBatchListener(boolean isBatch) {
360+
this.batchListener = isBatch;
361+
}
362+
363+
/**
364+
* Set a {@link BatchingStrategy} to use when debatching messages.
365+
* @param batchingStrategy the batching strategy.
366+
* @since 2.2
367+
* @see #setBatchListener(boolean)
368+
*/
369+
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
370+
this.batchingStrategy = batchingStrategy;
371+
}
372+
347373
@SuppressWarnings("deprecation")
348374
@Override
349375
public C createListenerContainer(RabbitListenerEndpoint endpoint) {
@@ -381,12 +407,15 @@ public C createListenerContainer(RabbitListenerEndpoint endpoint) {
381407
.acceptIfNotNull(this.autoStartup, instance::setAutoStartup)
382408
.acceptIfNotNull(this.phase, instance::setPhase)
383409
.acceptIfNotNull(this.afterReceivePostProcessors, instance::setAfterReceivePostProcessors);
410+
instance.setDeBatchingEnabled(!this.batchListener);
384411
if (endpoint != null) { // endpoint settings overriding default factory settings
385412
javaUtils
386413
.acceptIfNotNull(endpoint.getAutoStartup(), instance::setAutoStartup)
387414
.acceptIfNotNull(endpoint.getTaskExecutor(), instance::setTaskExecutor);
415+
javaUtils
416+
.acceptIfNotNull(this.batchingStrategy, endpoint::setBatchingStrategy);
388417
instance.setListenerId(endpoint.getId());
389-
418+
endpoint.setBatchListener(this.batchListener);
390419
endpoint.setupListenerContainer(instance);
391420
}
392421
if (instance.getMessageListener() instanceof AbstractAdaptableMessageListener) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/RabbitUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public static int getMaxFrame(ConnectionFactory connectionFactory) {
363363
return rcon.getFrameMax();
364364
}
365365
}
366-
catch (RuntimeException e) {
366+
catch (@SuppressWarnings("unused") RuntimeException e) {
367367
// NOSONAR
368368
}
369369
return -1;

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplate.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121

2222
import org.springframework.amqp.AmqpException;
2323
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2425
import org.springframework.amqp.rabbit.connection.CorrelationData;
2526
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
2627
import org.springframework.amqp.rabbit.core.support.MessageBatch;
28+
import org.springframework.lang.Nullable;
2729
import org.springframework.scheduling.TaskScheduler;
2830

2931
/**
@@ -48,10 +50,25 @@ public class BatchingRabbitTemplate extends RabbitTemplate {
4850
private volatile ScheduledFuture<?> scheduledTask;
4951

5052
/**
53+
* Create an instance with the supplied parameters.
5154
* @param batchingStrategy the batching strategy.
5255
* @param scheduler the scheduler.
5356
*/
5457
public BatchingRabbitTemplate(BatchingStrategy batchingStrategy, TaskScheduler scheduler) {
58+
this(null, batchingStrategy, scheduler);
59+
}
60+
61+
/**
62+
* Create an instance with the supplied parameters.
63+
* @param connectionFactory the connection factory.
64+
* @param batchingStrategy the batching strategy.
65+
* @param scheduler the scheduler.
66+
* @since 2.2
67+
*/
68+
public BatchingRabbitTemplate(@Nullable ConnectionFactory connectionFactory, BatchingStrategy batchingStrategy,
69+
TaskScheduler scheduler) {
70+
71+
super(connectionFactory);
5572
this.batchingStrategy = batchingStrategy;
5673
this.scheduler = scheduler;
5774
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/support/BatchingStrategy.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.Collection;
2020
import java.util.Date;
21+
import java.util.function.Consumer;
2122

2223
import org.springframework.amqp.core.Message;
24+
import org.springframework.amqp.core.MessageProperties;
2325

2426
/**
2527
* Strategy for batching messages. The methods will never be called concurrently.
@@ -52,4 +54,27 @@ public interface BatchingStrategy {
5254
*/
5355
Collection<MessageBatch> releaseBatches();
5456

57+
/**
58+
* Return true if this strategy can decode a batch of messages from a message body.
59+
* Returning true means you must override {@link #deBatch(Message, Consumer)}.
60+
* @param properties the message properties.
61+
* @return true if we can decode the message.
62+
* @since 2.2
63+
* @see #deBatch(Message, Consumer)
64+
*/
65+
default boolean canDebatch(MessageProperties properties) {
66+
return false;
67+
}
68+
69+
/**
70+
* Decode a message into fragments.
71+
* @param message the message.
72+
* @param fragmentConsumer a consumer for fragments.
73+
* @since 2.2
74+
* @see #canDebatch(MessageProperties)
75+
*/
76+
default void deBatch(Message message, Consumer<Message> fragmentConsumer) {
77+
throw new UnsupportedOperationException("Cannot debatch this message");
78+
}
79+
5580
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/support/SimpleBatchingStrategy.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,12 @@
2222
import java.util.Collections;
2323
import java.util.Date;
2424
import java.util.List;
25+
import java.util.function.Consumer;
2526

2627
import org.springframework.amqp.core.Message;
2728
import org.springframework.amqp.core.MessageProperties;
29+
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
30+
import org.springframework.amqp.support.converter.MessageConversionException;
2831
import org.springframework.util.Assert;
2932

3033
/**
@@ -66,25 +69,26 @@ public SimpleBatchingStrategy(int batchSize, int bufferLimit, long timeout) {
6669
}
6770

6871
@Override
69-
public MessageBatch addToBatch(String exchange, String routingKey, Message message) {
72+
public MessageBatch addToBatch(String exch, String routKey, Message message) {
7073
if (this.exchange != null) {
71-
Assert.isTrue(this.exchange.equals(exchange), "Cannot send to different exchanges in the same batch");
74+
Assert.isTrue(this.exchange.equals(exch), "Cannot send to different exchanges in the same batch");
7275
}
7376
else {
74-
this.exchange = exchange;
77+
this.exchange = exch;
7578
}
7679
if (this.routingKey != null) {
77-
Assert.isTrue(this.routingKey.equals(routingKey), "Cannot send with different routing keys in the same batch");
80+
Assert.isTrue(this.routingKey.equals(routKey),
81+
"Cannot send with different routing keys in the same batch");
7882
}
7983
else {
80-
this.routingKey = routingKey;
84+
this.routingKey = routKey;
8185
}
8286
int bufferUse = Integer.BYTES + message.getBody().length;
8387
MessageBatch batch = null;
8488
if (this.messages.size() > 0 && this.currentSize + bufferUse > this.bufferLimit) {
8589
batch = doReleaseBatch();
86-
this.exchange = exchange;
87-
this.routingKey = routingKey;
90+
this.exchange = exch;
91+
this.routingKey = routKey;
8892
}
8993
this.currentSize += bufferUse;
9094
this.messages.add(message);
@@ -144,8 +148,45 @@ private Message assembleMessage() {
144148
bytes.putInt(message.getBody().length);
145149
bytes.put(message.getBody());
146150
}
147-
messageProperties.getHeaders().put(MessageProperties.SPRING_BATCH_FORMAT, MessageProperties.BATCH_FORMAT_LENGTH_HEADER4);
151+
messageProperties.getHeaders().put(MessageProperties.SPRING_BATCH_FORMAT,
152+
MessageProperties.BATCH_FORMAT_LENGTH_HEADER4);
148153
return new Message(body, messageProperties);
149154
}
150155

156+
157+
@Override
158+
public boolean canDebatch(MessageProperties properties) {
159+
return MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(properties
160+
.getHeaders()
161+
.get(MessageProperties.SPRING_BATCH_FORMAT));
162+
}
163+
164+
/**
165+
* Debatch a message that has a header with {@link MessageProperties#SPRING_BATCH_FORMAT}
166+
* set to {@link MessageProperties#BATCH_FORMAT_LENGTH_HEADER4}.
167+
* @param message the batched message.
168+
* @param fragmentConsumer a consumer for each fragment.
169+
* @since 2.2
170+
*/
171+
@Override
172+
public void deBatch(Message message, Consumer<Message> fragmentConsumer) {
173+
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
174+
MessageProperties messageProperties = message.getMessageProperties();
175+
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
176+
while (byteBuffer.hasRemaining()) {
177+
int length = byteBuffer.getInt();
178+
if (length < 0 || length > byteBuffer.remaining()) {
179+
throw new ListenerExecutionFailedException("Bad batched message received",
180+
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
181+
message);
182+
}
183+
byte[] body = new byte[length];
184+
byteBuffer.get(body);
185+
messageProperties.setContentLength(length);
186+
// Caveat - shared MessageProperties.
187+
Message fragment = new Message(body, messageProperties);
188+
fragmentConsumer.accept(fragment);
189+
}
190+
}
191+
151192
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package org.springframework.amqp.rabbit.listener;
1818

19-
import java.nio.ByteBuffer;
2019
import java.util.ArrayList;
2120
import java.util.Arrays;
2221
import java.util.Collection;
@@ -44,7 +43,6 @@
4443
import org.springframework.amqp.core.Message;
4544
import org.springframework.amqp.core.MessageListener;
4645
import org.springframework.amqp.core.MessagePostProcessor;
47-
import org.springframework.amqp.core.MessageProperties;
4846
import org.springframework.amqp.core.Queue;
4947
import org.springframework.amqp.rabbit.connection.Connection;
5048
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -53,6 +51,8 @@
5351
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
5452
import org.springframework.amqp.rabbit.connection.RabbitUtils;
5553
import org.springframework.amqp.rabbit.connection.RoutingConnectionFactory;
54+
import org.springframework.amqp.rabbit.core.support.BatchingStrategy;
55+
import org.springframework.amqp.rabbit.core.support.SimpleBatchingStrategy;
5656
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
5757
import org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException;
5858
import org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException;
@@ -61,7 +61,6 @@
6161
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
6262
import org.springframework.amqp.support.ConditionalExceptionLogger;
6363
import org.springframework.amqp.support.ConsumerTagStrategy;
64-
import org.springframework.amqp.support.converter.MessageConversionException;
6564
import org.springframework.amqp.support.converter.MessageConverter;
6665
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
6766
import org.springframework.aop.framework.ProxyFactory;
@@ -222,6 +221,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor
222221

223222
private String errorHandlerLoggerName = getClass().getName();
224223

224+
private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
225+
225226
private volatile boolean lazyLoad;
226227

227228
@Override
@@ -495,6 +496,7 @@ public MessageConverter getMessageConverter() {
495496
* Determine whether or not the container should de-batch batched
496497
* messages (true) or call the listener with the batch (false). Default: true.
497498
* @param deBatchingEnabled the deBatchingEnabled to set.
499+
* @see #setBatchingStrategy(BatchingStrategy)
498500
*/
499501
public void setDeBatchingEnabled(boolean deBatchingEnabled) {
500502
this.deBatchingEnabled = deBatchingEnabled;
@@ -1125,6 +1127,18 @@ public void setErrorHandlerLoggerName(String errorHandlerLoggerName) {
11251127
this.errorHandlerLoggerName = errorHandlerLoggerName;
11261128
}
11271129

1130+
/**
1131+
* Set a batching strategy to use when de-batching messages.
1132+
* Default is {@link SimpleBatchingStrategy}.
1133+
* @param batchingStrategy the strategy.
1134+
* @since 2.2
1135+
* @see #setDeBatchingEnabled(boolean)
1136+
*/
1137+
public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
1138+
Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
1139+
this.batchingStrategy = batchingStrategy;
1140+
}
1141+
11281142
/**
11291143
* Delegates to {@link #validateConfiguration()} and {@link #initialize()}.
11301144
*/
@@ -1437,25 +1451,8 @@ private void doExecuteListener(Channel channel, Message messageIn) {
14371451
}
14381452
}
14391453
}
1440-
Object batchFormat = message.getMessageProperties().getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT);
1441-
if (MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(batchFormat) && this.deBatchingEnabled) {
1442-
ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBody());
1443-
MessageProperties messageProperties = message.getMessageProperties();
1444-
messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
1445-
while (byteBuffer.hasRemaining()) {
1446-
int length = byteBuffer.getInt();
1447-
if (length < 0 || length > byteBuffer.remaining()) {
1448-
throw new ListenerExecutionFailedException("Bad batched message received",
1449-
new MessageConversionException("Insufficient batch data at offset " + byteBuffer.position()),
1450-
message);
1451-
}
1452-
byte[] body = new byte[length];
1453-
byteBuffer.get(body);
1454-
messageProperties.setContentLength(length);
1455-
// Caveat - shared MessageProperties.
1456-
Message fragment = new Message(body, messageProperties);
1457-
invokeListener(channel, fragment);
1458-
}
1454+
if (this.deBatchingEnabled && this.batchingStrategy.canDebatch(message.getMessageProperties())) {
1455+
this.batchingStrategy.deBatch(message, fragment -> invokeListener(channel, fragment));
14591456
}
14601457
else {
14611458
invokeListener(channel, message);

0 commit comments

Comments
 (0)