Skip to content

Commit 6168e97

Browse files
committed
Shutdown NIO loop ASAP
Fixes #11
1 parent ee94a9e commit 6168e97

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,11 @@ public void run() {
6666
boolean writeRegistered = false;
6767

6868
try {
69-
int idlenessCount = 0;
7069
while (true && !Thread.currentThread().isInterrupted()) {
7170

7271
for (SelectionKey selectionKey : selector.keys()) {
7372
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState) selectionKey.attachment();
74-
if (state.getConnection().getHeartbeat() > 0) {
73+
if (state.getConnection() != null && state.getConnection().getHeartbeat() > 0) {
7574
long now = System.currentTimeMillis();
7675
if ((now - state.getLastActivity()) > state.getConnection().getHeartbeat() * 1000 * 2) {
7776
try {
@@ -89,8 +88,7 @@ public void run() {
8988
if (!writeRegistered && registrations.isEmpty() && writeRegistrations.isEmpty()) {
9089
// we can block, registrations will call Selector.wakeup()
9190
select = selector.select(1000);
92-
idlenessCount++;
93-
if (idlenessCount == 10 && selector.keys().size() == 0) {
91+
if (selector.keys().size() == 0) {
9492
// we haven't been doing anything for a while, shutdown state
9593
boolean clean = context.cleanUp();
9694
if (clean) {
@@ -118,7 +116,6 @@ public void run() {
118116
}
119117

120118
if (select > 0) {
121-
idlenessCount = 0;
122119
Set<SelectionKey> readyKeys = selector.selectedKeys();
123120
Iterator<SelectionKey> iterator = readyKeys.iterator();
124121
while (iterator.hasNext()) {
@@ -246,7 +243,7 @@ public void run() {
246243
}
247244
}
248245
} catch (Exception e) {
249-
LOGGER.error("Error in read loop", e);
246+
LOGGER.error("Error in NIO loop", e);
250247
}
251248
}
252249

src/main/java/com/rabbitmq/client/impl/nio/NioLoopContext.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.nio.channels.Selector;
1010
import java.util.concurrent.ExecutorService;
1111
import java.util.concurrent.ThreadFactory;
12-
import java.util.concurrent.atomic.AtomicLong;
1312

1413
/**
1514
*
@@ -29,8 +28,6 @@ public class NioLoopContext {
2928
SelectorHolder readSelectorState;
3029
SelectorHolder writeSelectorState;
3130

32-
private final AtomicLong nioLoopsConnectionCount = new AtomicLong();
33-
3431
public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandlerFactory,
3532
NioParams nioParams) {
3633
this.socketChannelFrameHandlerFactory = socketChannelFrameHandlerFactory;
@@ -40,10 +37,6 @@ public NioLoopContext(SocketChannelFrameHandlerFactory socketChannelFrameHandler
4037
this.writeBuffer = ByteBuffer.allocate(nioParams.getWriteByteBufferSize());
4138
}
4239

43-
void notifyNewConnection() {
44-
nioLoopsConnectionCount.incrementAndGet();
45-
}
46-
4740
void initStateIfNecessary() throws IOException {
4841
if (this.readSelectorState == null) {
4942
this.readSelectorState = new SelectorHolder(Selector.open());
@@ -67,10 +60,13 @@ private void startIoLoops() {
6760
}
6861

6962
protected boolean cleanUp() {
70-
long connectionCountNow = nioLoopsConnectionCount.get();
63+
int readRegistrationsCount = readSelectorState.registrations.size();
64+
if(readRegistrationsCount != 0) {
65+
return false;
66+
}
7167
socketChannelFrameHandlerFactory.lock();
7268
try {
73-
if (connectionCountNow != nioLoopsConnectionCount.get()) {
69+
if (readRegistrationsCount != readSelectorState.registrations.size()) {
7470
// a connection request has come in meanwhile, don't do anything
7571
return false;
7672
}

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ public void sendHeader() throws IOException {
7676
@Override
7777
public void initialize(AMQConnection connection) {
7878
state.setConnection(connection);
79-
state.startReading();
8079
}
8180

8281
@Override

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,20 +98,20 @@ public FrameHandler create(Address addr) throws IOException {
9898
long modulo = globalConnectionCount.getAndIncrement() % nioParams.getNbIoThreads();
9999
nioLoopContext = nioLoopContexts.get((int) modulo);
100100
nioLoopContext.initStateIfNecessary();
101-
nioLoopContext.notifyNewConnection();
101+
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(
102+
channel,
103+
nioLoopContext,
104+
nioParams,
105+
sslEngine
106+
);
107+
state.startReading();
108+
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
109+
return frameHandler;
102110
} finally {
103111
stateLock.unlock();
104112
}
105113

106-
SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(
107-
channel,
108-
nioLoopContext,
109-
nioParams,
110-
sslEngine
111-
);
112114

113-
SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);
114-
return frameHandler;
115115
} catch(IOException e) {
116116
try {
117117
if(sslEngine != null && channel != null) {

src/test/java/com/rabbitmq/client/test/JavaNioTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ public void shutdownCompleted(ShutdownSignalException cause) {
9292
}
9393
}
9494

95+
@Test
96+
public void nioLoopCleaning() throws Exception {
97+
ConnectionFactory connectionFactory = new ConnectionFactory();
98+
connectionFactory.useNio();
99+
for(int i = 0; i < 10; i++) {
100+
Connection connection = connectionFactory.newConnection();
101+
connection.abort();
102+
}
103+
}
104+
95105
private Connection basicGetBasicConsume(ConnectionFactory connectionFactory, String queue, final CountDownLatch latch)
96106
throws IOException, TimeoutException {
97107
Connection connection = connectionFactory.newConnection();

0 commit comments

Comments
 (0)