Skip to content

Commit 34c938f

Browse files
garyrussellartembilan
authored andcommitted
INT-4379: JMS OG Shutdown reply container on stop
JIRA: https://jira.spring.io/browse/INT-4379 - shutdown the container when the gateway is stopped Also, improve test suite - at the end of the tests, hundreds of threads are running, some caused by the above but others because `TaskExecutor`s are not shut down - reduce the number of iterations in the JMS pipeline tests to speed things up - change more tests to extend `ActiveMQMultiContextTests`, to keep a single broker up __cherry-pick to 4.3.x__ # Conflicts: # spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java # spring-integration-jms/src/test/java/org/springframework/integration/jms/JmsOutboundGatewayTests.java # spring-integration-jms/src/test/java/org/springframework/integration/jms/OutboundGatewayConnectionTests.java # spring-integration-jms/src/test/java/org/springframework/integration/jms/OutboundGatewayFunctionTests.java # spring-integration-jms/src/test/java/org/springframework/integration/jms/PollableJmsChannelTests.java # spring-integration-jms/src/test/java/org/springframework/integration/jms/SubscribableJmsChannelTests.java
1 parent d2fb79e commit 34c938f

10 files changed

+193
-197
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsOutboundGateway.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -156,6 +156,8 @@ public class JmsOutboundGateway extends AbstractReplyProducingMessageHandler imp
156156

157157
private volatile long idleReplyContainerTimeout;
158158

159+
private volatile boolean wasStopped;
160+
159161
private ScheduledFuture<?> idleTask;
160162

161163
/**
@@ -676,6 +678,10 @@ public void start() {
676678
if (this.replyContainer != null) {
677679
TaskScheduler taskScheduler = getTaskScheduler();
678680
if (this.idleReplyContainerTimeout <= 0) {
681+
if (this.wasStopped) {
682+
this.replyContainer.initialize();
683+
this.wasStopped = false;
684+
}
679685
this.replyContainer.start();
680686
}
681687
else {
@@ -695,7 +701,8 @@ public void start() {
695701
public void stop() {
696702
synchronized (this.lifeCycleMonitor) {
697703
if (this.replyContainer != null) {
698-
this.replyContainer.stop();
704+
this.replyContainer.shutdown();
705+
this.wasStopped = true;
699706
this.deleteDestinationIfTemporary(this.replyContainer.getDestination());
700707
if (this.reaper != null) {
701708
this.reaper.cancel(false);

spring-integration-jms/src/test/java/org/springframework/integration/jms/JmsOutboundGatewayTests.java

+14-27
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.ArrayList;
3030
import java.util.List;
31+
import java.util.concurrent.ExecutorService;
3132
import java.util.concurrent.Executors;
3233
import java.util.concurrent.atomic.AtomicInteger;
3334

@@ -69,13 +70,11 @@
6970
* @author Artem Bilan
7071
* @since 2.2.4
7172
*/
72-
public class JmsOutboundGatewayTests extends LogAdjustingTestSupport {
73+
public class JmsOutboundGatewayTests extends ActiveMQMultiContextTests {
7374

7475
private final Log logger = LogFactory.getLog(this.getClass());
7576

76-
public JmsOutboundGatewayTests() {
77-
super("org.springframework.integration", "org.springframework.jms", "org.apache");
78-
}
77+
7978

8079
@Test
8180
public void testContainerBeanNameWhenNoGatewayBeanName() {
@@ -100,16 +99,11 @@ public void testReplyContainerRecovery() throws Exception {
10099
gateway.setUseReplyContainer(true);
101100
ReplyContainerProperties replyContainerProperties = new ReplyContainerProperties();
102101
final List<Throwable> errors = new ArrayList<Throwable>();
102+
ExecutorService exec = Executors.newFixedThreadPool(10);
103103
ErrorHandlingTaskExecutor errorHandlingTaskExecutor =
104-
new ErrorHandlingTaskExecutor(Executors.newFixedThreadPool(10), new ErrorHandler() {
105-
106-
@Override
107-
public void handleError(Throwable t) {
108-
logger.info("Error:", t);
109-
errors.add(t);
110-
throw new RuntimeException(t);
111-
}
112-
104+
new ErrorHandlingTaskExecutor(exec, t -> {
105+
errors.add(t);
106+
throw new RuntimeException(t);
113107
});
114108
replyContainerProperties.setTaskExecutor(errorHandlingTaskExecutor);
115109
replyContainerProperties.setRecoveryInterval(100L);
@@ -173,6 +167,7 @@ public Message answer(InvocationOnMock invocation) throws Throwable {
173167
}
174168
finally {
175169
gateway.stop();
170+
exec.shutdownNow();
176171
}
177172
}
178173

@@ -192,13 +187,8 @@ public void testConnectionBreakOnReplyMessageIdCorrelation() throws Exception {
192187
gateway.setReceiveTimeout(60000);
193188
gateway.afterPropertiesSet();
194189
gateway.start();
195-
Executors.newSingleThreadExecutor().execute(new Runnable() {
196-
197-
@Override
198-
public void run() {
199-
gateway.handleMessage(new GenericMessage<String>("foo"));
200-
}
201-
});
190+
ExecutorService exec = Executors.newSingleThreadExecutor();
191+
exec.execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
202192
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory(
203193
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
204194
JmsTemplate template = new JmsTemplate(connectionFactory2);
@@ -224,6 +214,7 @@ public Message createMessage(Session session) throws JMSException {
224214
gateway.stop();
225215
connectionFactory1.destroy();
226216
connectionFactory2.destroy();
217+
exec.shutdownNow();
227218
}
228219

229220
@Test
@@ -243,13 +234,8 @@ public void testConnectionBreakOnReplyCustomCorrelation() throws Exception {
243234
gateway.setCorrelationKey("JMSCorrelationID");
244235
gateway.afterPropertiesSet();
245236
gateway.start();
246-
Executors.newSingleThreadExecutor().execute(new Runnable() {
247-
248-
@Override
249-
public void run() {
250-
gateway.handleMessage(new GenericMessage<String>("foo"));
251-
}
252-
});
237+
ExecutorService exec = Executors.newSingleThreadExecutor();
238+
exec.execute(() -> gateway.handleMessage(new GenericMessage<String>("foo")));
253239
CachingConnectionFactory connectionFactory2 = new CachingConnectionFactory(
254240
new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
255241
JmsTemplate template = new JmsTemplate(connectionFactory2);
@@ -276,6 +262,7 @@ public Message createMessage(Session session) throws JMSException {
276262
gateway.stop();
277263
connectionFactory1.destroy();
278264
connectionFactory2.destroy();
265+
exec.shutdownNow();
279266
}
280267

281268
}

spring-integration-jms/src/test/java/org/springframework/integration/jms/JmsOutboundInsideChainTests.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,11 +36,12 @@
3636
* //INT-2275
3737
*
3838
* @author Artem Bilan
39+
* @author Gary Russell
3940
*/
4041
@RunWith(SpringJUnit4ClassRunner.class)
4142
@ContextConfiguration
4243
@DirtiesContext
43-
public class JmsOutboundInsideChainTests {
44+
public class JmsOutboundInsideChainTests extends ActiveMQMultiContextTests {
4445

4546
@Autowired
4647
private MessageChannel outboundChainChannel;

spring-integration-jms/src/test/java/org/springframework/integration/jms/OutboundGatewayConnectionTests.java

+23-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import static org.mockito.Mockito.when;
2323

2424
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.Executors;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicReference;
@@ -37,12 +38,13 @@
3738
import org.apache.activemq.command.ActiveMQQueue;
3839
import org.junit.Ignore;
3940
import org.junit.Test;
41+
4042
import org.springframework.beans.factory.BeanFactory;
4143
import org.springframework.integration.context.IntegrationContextUtils;
42-
import org.springframework.messaging.support.GenericMessage;
4344
import org.springframework.jms.connection.CachingConnectionFactory;
4445
import org.springframework.jms.core.JmsTemplate;
4546
import org.springframework.jms.core.MessageCreator;
47+
import org.springframework.messaging.support.GenericMessage;
4648
import org.springframework.scheduling.TaskScheduler;
4749
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
4850

@@ -57,14 +59,15 @@ public class OutboundGatewayConnectionTests {
5759

5860
private Destination replyQueue1 = new ActiveMQQueue("reply1");
5961

60-
@Test @Ignore // need a more reliable stop/start for AMQ
62+
@Test
63+
@Ignore // need a more reliable stop/start for AMQ
6164
public void testContainerWithDestBrokenConnection() throws Exception {
6265
BeanFactory beanFactory = mock(BeanFactory.class);
6366
when(beanFactory.containsBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)).thenReturn(true);
6467
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
6568
scheduler.initialize();
6669
when(beanFactory.getBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class))
67-
.thenReturn(scheduler);
70+
.thenReturn(scheduler);
6871
final JmsOutboundGateway gateway = new JmsOutboundGateway();
6972
gateway.setBeanFactory(beanFactory);
7073
BrokerService broker = new BrokerService();
@@ -82,15 +85,14 @@ public void testContainerWithDestBrokenConnection() throws Exception {
8285
final AtomicReference<Object> reply = new AtomicReference<Object>();
8386
final CountDownLatch latch1 = new CountDownLatch(1);
8487
final CountDownLatch latch2 = new CountDownLatch(1);
85-
Executors.newSingleThreadExecutor().execute(new Runnable() {
86-
public void run() {
87-
latch1.countDown();
88-
try {
89-
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
90-
}
91-
finally {
92-
latch2.countDown();
93-
}
88+
ExecutorService exec = Executors.newSingleThreadExecutor();
89+
exec.execute(() -> {
90+
latch1.countDown();
91+
try {
92+
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
93+
}
94+
finally {
95+
latch2.countDown();
9496
}
9597
});
9698
assertTrue(latch1.await(10, TimeUnit.SECONDS));
@@ -116,15 +118,13 @@ public Message createMessage(Session session) throws JMSException {
116118

117119
final CountDownLatch latch3 = new CountDownLatch(1);
118120
final CountDownLatch latch4 = new CountDownLatch(1);
119-
Executors.newSingleThreadExecutor().execute(new Runnable() {
120-
public void run() {
121-
latch3.countDown();
122-
try {
123-
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
124-
}
125-
finally {
126-
latch4.countDown();
127-
}
121+
exec.execute(() -> {
122+
latch3.countDown();
123+
try {
124+
reply.set(gateway.handleRequestMessage(new GenericMessage<String>("foo")));
125+
}
126+
finally {
127+
latch4.countDown();
128128
}
129129
});
130130
assertTrue(latch3.await(10, TimeUnit.SECONDS));
@@ -147,6 +147,7 @@ public Message createMessage(Session session) throws JMSException {
147147
broker.stop();
148148

149149
scheduler.destroy();
150+
exec.shutdownNow();
150151
}
151152

152153
}

0 commit comments

Comments
 (0)