Skip to content

Commit 503d001

Browse files
committed
Fix AMQP CF publisher confirms type usage
Related to spring-projects/spring-amqp#1067
1 parent 35dd0c2 commit 503d001

File tree

4 files changed

+35
-42
lines changed

4 files changed

+35
-42
lines changed

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AmqpOutboundEndpointTests2.java

+15-12
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747

4848
/**
4949
* @author Gary Russell
50+
* @author Artem Bilan
51+
*
5052
* @since 5.2
5153
*
5254
*/
@@ -65,15 +67,16 @@ void testConfirmOk(@Autowired IntegrationFlow flow, @Autowired RabbitTemplate te
6567
void testWithReturn(@Autowired IntegrationFlow flow) {
6668
assertThatThrownBy(() -> flow.getInputChannel()
6769
.send(new GenericMessage<>("test", Collections.singletonMap("rk", "junkjunk"))))
68-
.isInstanceOf(MessageHandlingException.class)
69-
.hasCauseInstanceOf(AmqpException.class)
70-
.extracting(ex -> ex.getCause())
71-
.extracting(ex -> ex.getMessage())
72-
.isEqualTo("Message was returned by the broker");
70+
.isInstanceOf(MessageHandlingException.class)
71+
.hasCauseInstanceOf(AmqpException.class)
72+
.extracting(Throwable::getCause)
73+
.extracting(Throwable::getMessage)
74+
.isEqualTo("Message was returned by the broker");
7375
}
7476

7577
@Test
76-
@DisabledIf("#{systemEnvironment['TRAVIS'] ?: false}") // needs RabbitMQ 3.7
78+
@DisabledIf("#{systemEnvironment['TRAVIS'] ?: false}")
79+
// needs RabbitMQ 3.7
7780
void testWithReject(@Autowired IntegrationFlow flow, @Autowired RabbitAdmin admin,
7881
@Autowired RabbitTemplate template) {
7982

@@ -82,10 +85,10 @@ void testWithReject(@Autowired IntegrationFlow flow, @Autowired RabbitAdmin admi
8285
flow.getInputChannel().send(new GenericMessage<>("test", Collections.singletonMap("rk", queue.getName())));
8386
assertThatThrownBy(() -> flow.getInputChannel()
8487
.send(new GenericMessage<>("test", Collections.singletonMap("rk", queue.getName()))))
85-
.hasCauseInstanceOf(AmqpException.class)
86-
.extracting(ex -> ex.getCause())
87-
.extracting(ex -> ex.getMessage())
88-
.matches(msg -> msg.matches("Negative publisher confirm received: .*"));
88+
.hasCauseInstanceOf(AmqpException.class)
89+
.extracting(Throwable::getCause)
90+
.extracting(Throwable::getMessage)
91+
.matches(msg -> msg.matches("Negative publisher confirm received: .*"));
8992
assertThat(template.receive(queue.getName())).isNotNull();
9093
admin.deleteQueue(queue.getName());
9194
}
@@ -97,7 +100,7 @@ public static class Config {
97100
@Bean
98101
public IntegrationFlow flow(RabbitTemplate template) {
99102
return f -> f.handle(Amqp.outboundAdapter(template)
100-
.exchangeName("")
103+
.exchangeName("")
101104
.routingKeyFunction(msg -> msg.getHeaders().get("rk", String.class))
102105
.confirmCorrelationFunction(msg -> msg)
103106
.waitForConfirm(true));
@@ -107,7 +110,7 @@ public IntegrationFlow flow(RabbitTemplate template) {
107110
public CachingConnectionFactory cf() {
108111
CachingConnectionFactory ccf = new CachingConnectionFactory(
109112
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
110-
ccf.setPublisherConfirms(true);
113+
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
111114
ccf.setPublisherReturns(true);
112115
return ccf;
113116
}

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/outbound/AsyncAmqpGatewayTests.java

+9-23
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,14 @@
3131
import java.util.concurrent.TimeUnit;
3232

3333
import org.apache.commons.logging.Log;
34-
import org.junit.AfterClass;
35-
import org.junit.ClassRule;
36-
import org.junit.Rule;
37-
import org.junit.Test;
34+
import org.junit.jupiter.api.Test;
3835

3936
import org.springframework.amqp.core.AmqpReplyTimeoutException;
4037
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
4138
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitMessageFuture;
4239
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4340
import org.springframework.amqp.rabbit.core.RabbitTemplate;
44-
import org.springframework.amqp.rabbit.junit.BrokerRunning;
41+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
4542
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
4643
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
4744
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
@@ -54,7 +51,7 @@
5451
import org.springframework.integration.channel.DirectChannel;
5552
import org.springframework.integration.channel.QueueChannel;
5653
import org.springframework.integration.support.MessageBuilder;
57-
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
54+
import org.springframework.integration.test.condition.LogLevels;
5855
import org.springframework.messaging.Message;
5956
import org.springframework.messaging.MessagingException;
6057
import org.springframework.messaging.support.ErrorMessage;
@@ -67,25 +64,14 @@
6764
* @since 4.3
6865
*
6966
*/
70-
public class AsyncAmqpGatewayTests {
71-
72-
@ClassRule
73-
public static BrokerRunning brokerRunning = BrokerRunning.isRunningWithEmptyQueues("asyncQ1", "asyncRQ1");
74-
75-
@Rule
76-
public Log4j2LevelAdjuster adjuster =
77-
Log4j2LevelAdjuster.trace()
78-
.categories(true, "org.springframework.amqp");
79-
80-
@AfterClass
81-
public static void tearDown() {
82-
brokerRunning.removeTestQueues();
83-
}
67+
@RabbitAvailable(queues = { "asyncQ1", "asyncRQ1" })
68+
@LogLevels(categories = "org.springframework.amqp", level = "trace")
69+
class AsyncAmqpGatewayTests {
8470

8571
@Test
86-
public void testConfirmsAndReturns() throws Exception {
72+
void testConfirmsAndReturns() throws Exception {
8773
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
88-
ccf.setPublisherConfirms(true);
74+
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
8975
ccf.setPublisherReturns(true);
9076
RabbitTemplate template = new RabbitTemplate(ccf);
9177
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
@@ -213,7 +199,7 @@ public void testConfirmsAndReturns() throws Exception {
213199
any(org.springframework.amqp.core.Message.class));
214200
DirectFieldAccessor dfa = new DirectFieldAccessor(future);
215201
dfa.setPropertyValue("nackCause", "nacknack");
216-
SettableListenableFuture<Boolean> confirmFuture = new SettableListenableFuture<Boolean>();
202+
SettableListenableFuture<Boolean> confirmFuture = new SettableListenableFuture<>();
217203
confirmFuture.set(false);
218204
dfa.setPropertyValue("confirm", confirmFuture);
219205
new DirectFieldAccessor(gateway).setPropertyValue("template", asyncTemplate);

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceIntegrationTests.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,16 @@
4747

4848
/**
4949
* @author Gary Russell
50+
* @author Artem Bilan
51+
*
5052
* @since 5.1
5153
*
5254
*/
5355
@SpringJUnitConfig
5456
@RabbitAvailable(queues = BoundRabbitChannelAdviceIntegrationTests.QUEUE)
5557
public class BoundRabbitChannelAdviceIntegrationTests {
5658

57-
public static final String QUEUE = "dedicated.advice";
59+
static final String QUEUE = "dedicated.advice";
5860

5961
@Autowired
6062
private Config.Gate gate;
@@ -63,7 +65,7 @@ public class BoundRabbitChannelAdviceIntegrationTests {
6365
private Config config;
6466

6567
@Test
66-
public void testAdvice() throws Exception {
68+
void testAdvice() throws Exception {
6769
BoundRabbitChannelAdvice advice = this.config.advice(this.config.template());
6870
Log logger = spy(TestUtils.getPropertyValue(advice, "logger", Log.class));
6971
new DirectFieldAccessor(advice).setPropertyValue("logger", logger);
@@ -88,14 +90,14 @@ public static class Config {
8890
private final List<String> received = new ArrayList<>();
8991

9092
@Bean
91-
public CachingConnectionFactory cf() throws Exception {
93+
public CachingConnectionFactory cf() {
9294
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
93-
ccf.setSimplePublisherConfirms(true);
95+
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
9496
return ccf;
9597
}
9698

9799
@Bean
98-
public RabbitTemplate template() throws Exception {
100+
public RabbitTemplate template() {
99101
return new RabbitTemplate(cf());
100102
}
101103

spring-integration-amqp/src/test/java/org/springframework/integration/amqp/support/BoundRabbitChannelAdviceTests.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050

5151
/**
5252
* @author Gary Russell
53+
* @author Artem Bilan
54+
*
5355
* @since 5.1
5456
*
5557
*/
@@ -63,7 +65,7 @@ public class BoundRabbitChannelAdviceTests {
6365
private Config config;
6466

6567
@Test
66-
public void testAdvice() throws Exception {
68+
void testAdvice() throws Exception {
6769
this.gate.send("a,b,c");
6870
verify(this.config.connection, times(1)).createChannel();
6971
verify(this.config.channel).confirmSelect();
@@ -97,7 +99,7 @@ public CachingConnectionFactory cf() throws Exception {
9799
}).given(cf).newConnection((ExecutorService) isNull(), anyString());
98100
cf.setAutomaticRecoveryEnabled(false);
99101
CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
100-
ccf.setSimplePublisherConfirms(true);
102+
ccf.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);
101103
return ccf;
102104
}
103105

0 commit comments

Comments
 (0)