Skip to content

Commit 9e45402

Browse files
committed
Prevent deadlock/hanging on JDK socket write
References #11
1 parent 844a258 commit 9e45402

File tree

2 files changed

+50
-17
lines changed

2 files changed

+50
-17
lines changed

src/main/java/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,27 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18+
import com.rabbitmq.client.MetricsCollector;
19+
import com.rabbitmq.client.NoOpMetricsCollector;
20+
import com.rabbitmq.client.ShutdownSignalException;
21+
import com.rabbitmq.utility.IntAllocator;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
1825
import java.io.IOException;
1926
import java.util.HashMap;
2027
import java.util.HashSet;
2128
import java.util.Map;
2229
import java.util.Set;
23-
import java.util.concurrent.CountDownLatch;
24-
import java.util.concurrent.ExecutorService;
25-
import java.util.concurrent.Executors;
26-
import java.util.concurrent.ThreadFactory;
27-
import java.util.concurrent.TimeUnit;
28-
29-
import com.rabbitmq.client.NoOpMetricsCollector;
30-
import com.rabbitmq.client.ShutdownSignalException;
31-
import com.rabbitmq.client.MetricsCollector;
32-
import com.rabbitmq.utility.IntAllocator;
30+
import java.util.concurrent.*;
3331

3432
/**
3533
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
3634
*/
3735
public class ChannelManager {
36+
37+
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
38+
3839
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
3940
private final Object monitor = new Object();
4041
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
@@ -97,16 +98,33 @@ public ChannelN getChannel(int channelNumber) {
9798
* Handle shutdown. All the managed {@link com.rabbitmq.client.Channel Channel}s are shutdown.
9899
* @param signal reason for shutdown
99100
*/
100-
public void handleSignal(ShutdownSignalException signal) {
101+
public void handleSignal(final ShutdownSignalException signal) {
101102
Set<ChannelN> channels;
102103
synchronized(this.monitor) {
103104
channels = new HashSet<ChannelN>(_channelMap.values());
104105
}
105-
for (ChannelN channel : channels) {
106-
releaseChannelNumber(channel);
107-
channel.processShutdownSignal(signal, true, true);
108-
shutdownSet.add(channel.getShutdownLatch());
109-
channel.notifyListeners();
106+
ExecutorService executorService = Executors.newSingleThreadExecutor();
107+
try {
108+
for (final ChannelN channel : channels) {
109+
releaseChannelNumber(channel);
110+
Future<?> channelShutdownTask = executorService.submit(new Runnable() {
111+
@Override
112+
public void run() {
113+
channel.processShutdownSignal(signal, true, true);
114+
shutdownSet.add(channel.getShutdownLatch());
115+
}
116+
});
117+
try {
118+
// FIXME make the timeout configurable
119+
channelShutdownTask.get(1, TimeUnit.SECONDS);
120+
} catch (Exception e) {
121+
LOGGER.warn("Couldn't properly close channel {}", channel.getChannelNumber());
122+
} finally {
123+
channel.notifyListeners();
124+
}
125+
}
126+
} finally {
127+
executorService.shutdownNow();
110128
}
111129
scheduleShutdownProcessing();
112130
}

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.InetAddress;
2424
import java.net.Socket;
2525
import java.net.SocketException;
26+
import java.util.concurrent.*;
2627

2728
import com.rabbitmq.client.AMQP;
2829

@@ -152,7 +153,21 @@ public void flush() throws IOException {
152153
@SuppressWarnings("unused")
153154
public void close() {
154155
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
155-
try { flush(); } catch (Exception _e) {}
156+
ExecutorService executorService = Executors.newSingleThreadExecutor();
157+
try {
158+
Future<Void> flushTask = executorService.submit(new Callable<Void>() {
159+
@Override
160+
public Void call() throws Exception {
161+
flush();
162+
return null;
163+
}
164+
});
165+
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
166+
} catch (Exception _e) {
167+
168+
} finally {
169+
executorService.shutdownNow();
170+
}
156171
try { _socket.close(); } catch (Exception _e) {}
157172
}
158173
}

0 commit comments

Comments
 (0)