Skip to content

Commit 344b9bc

Browse files
committed
Align with changes in latest Spring AMQP snapshots
1 parent e5ca9df commit 344b9bc

File tree

4 files changed

+68
-7
lines changed

4 files changed

+68
-7
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/RabbitProperties.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.springframework.amqp.core.AcknowledgeMode;
2525
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
2626
import org.springframework.boot.context.properties.ConfigurationProperties;
27+
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
2728
import org.springframework.boot.convert.DurationUnit;
2829
import org.springframework.util.CollectionUtils;
2930
import org.springframework.util.StringUtils;
@@ -651,6 +652,12 @@ public ListenerRetry getRetry() {
651652
*/
652653
public static class SimpleContainer extends AmqpContainer {
653654

655+
/**
656+
* Batch size, expressed as the number of physical messages, to be used by the
657+
* container.
658+
*/
659+
private Integer batchSize;
660+
654661
/**
655662
* Minimum number of listener invoker threads.
656663
*/
@@ -674,6 +681,14 @@ public static class SimpleContainer extends AmqpContainer {
674681
*/
675682
private boolean missingQueuesFatal = true;
676683

684+
public Integer getBatchSize() {
685+
return this.batchSize;
686+
}
687+
688+
public void setBatchSize(Integer batchSize) {
689+
this.batchSize = batchSize;
690+
}
691+
677692
public Integer getConcurrency() {
678693
return this.concurrency;
679694
}
@@ -690,10 +705,13 @@ public void setMaxConcurrency(Integer maxConcurrency) {
690705
this.maxConcurrency = maxConcurrency;
691706
}
692707

708+
@Deprecated
709+
@DeprecatedConfigurationProperty(replacement = "batchSize")
693710
public Integer getTransactionSize() {
694711
return this.transactionSize;
695712
}
696713

714+
@Deprecated
697715
public void setTransactionSize(Integer transactionSize) {
698716
this.transactionSize = transactionSize;
699717
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/amqp/SimpleRabbitListenerContainerFactoryConfigurer.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,22 @@ public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFa
3838
configure(factory, connectionFactory, config);
3939
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
4040
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
41-
map.from(config::getTransactionSize).whenNonNull().to(factory::setTxSize);
41+
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
42+
map.from(() -> batchOrTransactionSize(config)).whenNonNull().to(factory::setBatchSize);
43+
}
44+
45+
@SuppressWarnings("deprecation")
46+
private Integer batchOrTransactionSize(RabbitProperties.SimpleContainer config) {
47+
if (config.getBatchSize() != null && config.getTransactionSize() != null) {
48+
String prefix = "spring.rabbit.listener.simple.";
49+
throw new IllegalStateException(prefix + "batch-size and " + prefix
50+
+ "transaction-size cannot not be used together and transaction-size is deprecated. Please use "
51+
+ "batch-size alone instead.");
52+
}
53+
if (config.getBatchSize() == null) {
54+
return config.getTransactionSize();
55+
}
56+
return config.getBatchSize();
4257
}
4358

4459
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/web/servlet/DispatcherServletAutoConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public DispatcherServlet dispatcherServlet(HttpProperties httpProperties, WebMvc
9292
dispatcherServlet.setDispatchOptionsRequest(webMvcProperties.isDispatchOptionsRequest());
9393
dispatcherServlet.setDispatchTraceRequest(webMvcProperties.isDispatchTraceRequest());
9494
dispatcherServlet.setThrowExceptionIfNoHandlerFound(webMvcProperties.isThrowExceptionIfNoHandlerFound());
95+
dispatcherServlet.setPublishEvents(webMvcProperties.isPublishEvents());
9596
dispatcherServlet.setEnableLoggingRequestDetails(httpProperties.isLogRequestDetails());
9697
return dispatcherServlet;
9798
}

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/amqp/RabbitAutoConfigurationTests.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.boot.autoconfigure.amqp;
1818

1919
import java.security.NoSuchAlgorithmException;
20+
import java.util.List;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122

2223
import javax.net.ssl.SSLContext;
@@ -125,6 +126,7 @@ void testDefaultConnectionFactoryConfiguration() {
125126
}
126127

127128
@Test
129+
@SuppressWarnings("unchecked")
128130
void testConnectionFactoryWithOverrides() {
129131
this.contextRunner.withUserConfiguration(TestConfiguration.class)
130132
.withPropertyValues("spring.rabbitmq.host:remote-server", "spring.rabbitmq.port:9000",
@@ -137,15 +139,16 @@ void testConnectionFactoryWithOverrides() {
137139
assertThat(connectionFactory.getVirtualHost()).isEqualTo("/vhost");
138140
com.rabbitmq.client.ConnectionFactory rcf = connectionFactory.getRabbitConnectionFactory();
139141
assertThat(rcf.getConnectionTimeout()).isEqualTo(123);
140-
assertThat((Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
142+
assertThat((List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
141143
});
142144
}
143145

144146
@Test
147+
@SuppressWarnings("unchecked")
145148
void testConnectionFactoryWithCustomConnectionNameStrategy() {
146149
this.contextRunner.withUserConfiguration(ConnectionNameStrategyConfiguration.class).run((context) -> {
147150
CachingConnectionFactory connectionFactory = context.getBean(CachingConnectionFactory.class);
148-
Address[] addresses = (Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses");
151+
List<Address> addresses = (List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses");
149152
assertThat(addresses).hasSize(1);
150153
com.rabbitmq.client.ConnectionFactory rcf = mock(com.rabbitmq.client.ConnectionFactory.class);
151154
given(rcf.newConnection(isNull(), eq(addresses), anyString())).willReturn(mock(Connection.class));
@@ -363,8 +366,8 @@ void testRabbitListenerContainerFactoryBackOff() {
363366
this.contextRunner.withUserConfiguration(TestConfiguration5.class).run((context) -> {
364367
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
365368
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
366-
rabbitListenerContainerFactory.setTxSize(10);
367-
verify(rabbitListenerContainerFactory).setTxSize(10);
369+
rabbitListenerContainerFactory.setBatchSize(10);
370+
verify(rabbitListenerContainerFactory).setBatchSize(10);
368371
assertThat(rabbitListenerContainerFactory.getAdviceChain()).isNull();
369372
});
370373
}
@@ -385,15 +388,15 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
385388
"spring.rabbitmq.listener.simple.prefetch:40",
386389
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
387390
"spring.rabbitmq.listener.simple.idleEventInterval:5",
388-
"spring.rabbitmq.listener.simple.transactionSize:20",
391+
"spring.rabbitmq.listener.simple.batchSize:20",
389392
"spring.rabbitmq.listener.simple.missingQueuesFatal:false")
390393
.run((context) -> {
391394
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
392395
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
393396
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("concurrentConsumers", 5);
394397
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers",
395398
10);
396-
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("txSize", 20);
399+
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
397400
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false);
398401
checkCommonProps(context, rabbitListenerContainerFactory);
399402
});
@@ -425,6 +428,30 @@ void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
425428
});
426429
}
427430

431+
@Test
432+
void transactionSizeIsMappedToContainerBatchSize() {
433+
this.contextRunner
434+
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
435+
.withPropertyValues("spring.rabbitmq.listener.simple.transactionSize:20").run((context) -> {
436+
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
437+
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
438+
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
439+
});
440+
}
441+
442+
@Test
443+
void transactionSizeAndBatchSizeCannotBeUsedTogether() {
444+
this.contextRunner
445+
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
446+
.withPropertyValues("spring.rabbitmq.listener.simple.transactionSize:20",
447+
"spring.rabbitmq.listener.simple.batchSize:50")
448+
.run((context) -> {
449+
assertThat(context).hasFailed();
450+
assertThat(context.getStartupFailure()).hasRootCauseInstanceOf(IllegalStateException.class)
451+
.hasMessageContaining("Please use batch-size alone instead.");
452+
});
453+
}
454+
428455
@Test
429456
void testSimpleRabbitListenerContainerFactoryRetryWithCustomizer() {
430457
this.contextRunner.withUserConfiguration(RabbitRetryTemplateCustomizerConfiguration.class)

0 commit comments

Comments
 (0)