Skip to content

Commit 98cfd3b

Browse files
committed
Use shutdown executor to avoid deadlock
Use the shutdown executor only if it's been set. Fixes #194
1 parent ba7356a commit 98cfd3b

File tree

5 files changed

+75
-33
lines changed

5 files changed

+75
-33
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ public MetricsCollector getMetricsCollector() {
642642
}
643643

644644
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
645-
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
645+
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
646646
}
647647

648648
/**

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ public void start()
393393
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
394394
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
395395
result.setShutdownExecutor(this.shutdownExecutor);
396+
result.setChannelShutdownTimeout((int) (requestedHeartbeat * 1000.0 + requestedHeartbeat * 1000.0 * (5.0 / 100.0)));
396397
return result;
397398
}
398399

src/main/java/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18+
import com.rabbitmq.client.ConnectionFactory;
1819
import com.rabbitmq.client.MetricsCollector;
1920
import com.rabbitmq.client.NoOpMetricsCollector;
2021
import com.rabbitmq.client.ShutdownSignalException;
@@ -51,6 +52,8 @@ public class ChannelManager {
5152
private ExecutorService shutdownExecutor;
5253
private final ThreadFactory threadFactory;
5354

55+
private int channelShutdownTimeout = (int) (ConnectionFactory.DEFAULT_HEARTBEAT + ConnectionFactory.DEFAULT_HEARTBEAT * (5.0 / 100.0)) * 1000;
56+
5457
protected final MetricsCollector metricsCollector;
5558

5659
public int getChannelMax(){
@@ -103,28 +106,30 @@ public void handleSignal(final ShutdownSignalException signal) {
103106
synchronized(this.monitor) {
104107
channels = new HashSet<ChannelN>(_channelMap.values());
105108
}
106-
ExecutorService executorService = Executors.newSingleThreadExecutor();
107-
try {
108-
for (final ChannelN channel : channels) {
109-
releaseChannelNumber(channel);
110-
Future<?> channelShutdownTask = executorService.submit(new Runnable() {
111-
@Override
112-
public void run() {
113-
channel.processShutdownSignal(signal, true, true);
114-
shutdownSet.add(channel.getShutdownLatch());
115-
}
116-
});
109+
110+
for (final ChannelN channel : channels) {
111+
releaseChannelNumber(channel);
112+
// async shutdown if possible
113+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
114+
Runnable channelShutdownRunnable = new Runnable() {
115+
@Override
116+
public void run() {
117+
channel.processShutdownSignal(signal, true, true);
118+
}
119+
};
120+
if(this.shutdownExecutor == null) {
121+
channelShutdownRunnable.run();
122+
} else {
123+
Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
117124
try {
118-
// FIXME make the timeout configurable
119-
channelShutdownTask.get(1, TimeUnit.SECONDS);
125+
channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
120126
} catch (Exception e) {
121-
LOGGER.warn("Couldn't properly close channel {}", channel.getChannelNumber());
122-
} finally {
123-
channel.notifyListeners();
127+
LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
128+
channelShutdownTask.cancel(true);
124129
}
125130
}
126-
} finally {
127-
executorService.shutdownNow();
131+
shutdownSet.add(channel.getShutdownLatch());
132+
channel.notifyListeners();
128133
}
129134
scheduleShutdownProcessing();
130135
}
@@ -243,4 +248,16 @@ public ExecutorService getShutdownExecutor() {
243248
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
244249
this.shutdownExecutor = shutdownExecutor;
245250
}
251+
252+
/**
253+
* Set the shutdown timeout for channels.
254+
* This is the amount of time the manager waits for a channel to
255+
* shutdown before giving up.
256+
* Works only when the {@code shutdownExecutor} property is set.
257+
* Default to {@link com.rabbitmq.client.ConnectionFactory#DEFAULT_HEARTBEAT} + 5 % seconds
258+
* @param channelShutdownTimeout shutdown timeout in milliseconds
259+
*/
260+
public void setChannelShutdownTimeout(int channelShutdownTimeout) {
261+
this.channelShutdownTimeout = channelShutdownTimeout;
262+
}
246263
}

src/main/java/com/rabbitmq/client/impl/FrameHandlerFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,25 @@
2323
import java.io.IOException;
2424
import java.net.InetSocketAddress;
2525
import java.net.Socket;
26+
import java.util.concurrent.ExecutorService;
2627

2728
public class FrameHandlerFactory {
2829
private final int connectionTimeout;
2930
private final SocketFactory factory;
3031
private final SocketConfigurator configurator;
32+
private final ExecutorService shutdownExecutor;
3133
private final boolean ssl;
3234

3335
public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl) {
36+
this(connectionTimeout, factory, configurator, ssl, null);
37+
}
38+
39+
public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
3440
this.connectionTimeout = connectionTimeout;
3541
this.factory = factory;
3642
this.configurator = configurator;
3743
this.ssl = ssl;
44+
this.shutdownExecutor = shutdownExecutor;
3845
}
3946

4047
public FrameHandler create(Address addr) throws IOException {
@@ -55,7 +62,7 @@ public FrameHandler create(Address addr) throws IOException {
5562

5663
public FrameHandler create(Socket sock) throws IOException
5764
{
58-
return new SocketFrameHandler(sock);
65+
return new SocketFrameHandler(sock, this.shutdownExecutor);
5966
}
6067

6168
private static void quietTrySocketClose(Socket socket) {

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ public class SocketFrameHandler implements FrameHandler {
3535
/** The underlying socket */
3636
private final Socket _socket;
3737

38+
/**
39+
* Optional {@link ExecutorService} for final flush.
40+
*/
41+
private final ExecutorService _shutdownExecutor;
42+
3843
/** Socket's inputstream - data from the broker - synchronized on */
3944
private final DataInputStream _inputStream;
4045

@@ -48,7 +53,15 @@ public class SocketFrameHandler implements FrameHandler {
4853
* @param socket the socket to use
4954
*/
5055
public SocketFrameHandler(Socket socket) throws IOException {
56+
this(socket, null);
57+
}
58+
59+
/**
60+
* @param socket the socket to use
61+
*/
62+
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
5163
_socket = socket;
64+
_shutdownExecutor = shutdownExecutor;
5265

5366
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
5467
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
@@ -153,20 +166,24 @@ public void flush() throws IOException {
153166
@SuppressWarnings("unused")
154167
public void close() {
155168
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
156-
ExecutorService executorService = Executors.newSingleThreadExecutor();
169+
// async flush if possible
170+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
171+
Callable<Void> flushCallable = new Callable<Void>() {
172+
@Override
173+
public Void call() throws Exception {
174+
flush();
175+
return null;
176+
}
177+
};
157178
try {
158-
Future<Void> flushTask = executorService.submit(new Callable<Void>() {
159-
@Override
160-
public Void call() throws Exception {
161-
flush();
162-
return null;
163-
}
164-
});
165-
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
166-
} catch (Exception _e) {
167-
168-
} finally {
169-
executorService.shutdownNow();
179+
if(this._shutdownExecutor == null) {
180+
flushCallable.call();
181+
} else {
182+
Future<Void> flushTask = this._shutdownExecutor.submit(flushCallable);
183+
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
184+
}
185+
} catch(Exception e) {
186+
170187
}
171188
try { _socket.close(); } catch (Exception _e) {}
172189
}

0 commit comments

Comments
 (0)