Skip to content

Adjust to changes in Spring AMQP 2.2 snapshots #17587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.CacheMode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.boot.convert.DurationUnit;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -665,7 +666,7 @@ public static class SimpleContainer extends AmqpContainer {
* Number of messages to be processed between acks when the acknowledge mode is
* AUTO. If larger than prefetch, prefetch will be increased to this value.
*/
private Integer transactionSize;
private Integer batchSize;

/**
* Whether to fail if the queues declared by the container are not available on
Expand All @@ -690,12 +691,34 @@ public void setMaxConcurrency(Integer maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}

/**
* Get the number of messages processed in one transaction.
* @return number of messages
* @deprecated since 2.2.0 in favor of {@link SimpleContainer#getBatchSize()}
*/
@DeprecatedConfigurationProperty(replacement = "spring.rabbitmq.listener.simple.batchSize")
@Deprecated
public Integer getTransactionSize() {
return this.transactionSize;
return getBatchSize();
}

/**
* Set the number of messages processed in one transaction.
* @param transactionSize number of messages
* @deprecated since 2.2.0 in favor of
* {@link SimpleContainer#setBatchSize(Integer)}
*/
@Deprecated
public void setTransactionSize(Integer transactionSize) {
this.transactionSize = transactionSize;
setBatchSize(transactionSize);
}

public Integer getBatchSize() {
return this.batchSize;
}

public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFa
configure(factory, connectionFactory, config);
map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
map.from(config::getTransactionSize).whenNonNull().to(factory::setTxSize);
map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.amqp;

import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -125,6 +126,7 @@ void testDefaultConnectionFactoryConfiguration() {
}

@Test
@SuppressWarnings("unchecked")
void testConnectionFactoryWithOverrides() {
this.contextRunner.withUserConfiguration(TestConfiguration.class)
.withPropertyValues("spring.rabbitmq.host:remote-server", "spring.rabbitmq.port:9000",
Expand All @@ -137,15 +139,16 @@ void testConnectionFactoryWithOverrides() {
assertThat(connectionFactory.getVirtualHost()).isEqualTo("/vhost");
com.rabbitmq.client.ConnectionFactory rcf = connectionFactory.getRabbitConnectionFactory();
assertThat(rcf.getConnectionTimeout()).isEqualTo(123);
assertThat((Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
assertThat((List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses")).hasSize(1);
});
}

@Test
@SuppressWarnings("unchecked")
void testConnectionFactoryWithCustomConnectionNameStrategy() {
this.contextRunner.withUserConfiguration(ConnectionNameStrategyConfiguration.class).run((context) -> {
CachingConnectionFactory connectionFactory = context.getBean(CachingConnectionFactory.class);
Address[] addresses = (Address[]) ReflectionTestUtils.getField(connectionFactory, "addresses");
List<Address> addresses = (List<Address>) ReflectionTestUtils.getField(connectionFactory, "addresses");
assertThat(addresses).hasSize(1);
com.rabbitmq.client.ConnectionFactory rcf = mock(com.rabbitmq.client.ConnectionFactory.class);
given(rcf.newConnection(isNull(), eq(addresses), anyString())).willReturn(mock(Connection.class));
Expand Down Expand Up @@ -363,8 +366,8 @@ void testRabbitListenerContainerFactoryBackOff() {
this.contextRunner.withUserConfiguration(TestConfiguration5.class).run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
rabbitListenerContainerFactory.setTxSize(10);
verify(rabbitListenerContainerFactory).setTxSize(10);
rabbitListenerContainerFactory.setBatchSize(10);
verify(rabbitListenerContainerFactory).setBatchSize(10);
assertThat(rabbitListenerContainerFactory.getAdviceChain()).isNull();
});
}
Expand All @@ -385,20 +388,31 @@ void testSimpleRabbitListenerContainerFactoryWithCustomSettings() {
"spring.rabbitmq.listener.simple.prefetch:40",
"spring.rabbitmq.listener.simple.defaultRequeueRejected:false",
"spring.rabbitmq.listener.simple.idleEventInterval:5",
"spring.rabbitmq.listener.simple.transactionSize:20",
"spring.rabbitmq.listener.simple.batchSize:20",
"spring.rabbitmq.listener.simple.missingQueuesFatal:false")
.run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("concurrentConsumers", 5);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("maxConcurrentConsumers",
10);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("txSize", 20);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("missingQueuesFatal", false);
checkCommonProps(context, rabbitListenerContainerFactory);
});
}

@Test
void testRabbitListenerContainerFactoryWithDeprecatedTransactionSizeStillWorks() {
this.contextRunner
.withUserConfiguration(MessageConvertersConfiguration.class, MessageRecoverersConfiguration.class)
.withPropertyValues("spring.rabbitmq.listener.simple.transactionSize:20").run((context) -> {
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory = context
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).hasFieldOrPropertyWithValue("batchSize", 20);
});
}

@Test
void testDirectRabbitListenerContainerFactoryWithCustomSettings() {
this.contextRunner
Expand Down