Skip to content

Commit 6d912dd

Browse files
committed
Handle EOF properly in NIO
On MacOS, SocketChannel.read() can return EOF when the SocketChannel is closed. On Linux, it would throw an exception. This could lead to spinning threads on MacOS, because the NIO loop was never closed. Fixes #11
1 parent d74b125 commit 6d912dd

File tree

6 files changed

+35
-7
lines changed

6 files changed

+35
-7
lines changed

src/main/java/com/rabbitmq/client/impl/nio/ByteBufferInputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ private int readFromBuffer(ByteBuffer buffer) {
4747
private static void readFromNetworkIfNecessary(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
4848
if(!buffer.hasRemaining()) {
4949
buffer.clear();
50-
int read = channel.read(buffer);
50+
int read = NioHelper.read(channel, buffer);
5151
if(read <= 0) {
5252
int attempt = 0;
5353
while(attempt < 3) {
@@ -56,7 +56,7 @@ private static void readFromNetworkIfNecessary(ReadableByteChannel channel, Byte
5656
} catch (InterruptedException e) {
5757
// ignore
5858
}
59-
read = channel.read(buffer);
59+
read = NioHelper.read(channel, buffer);
6060
if(read > 0) {
6161
break;
6262
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.rabbitmq.client.impl.nio;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.ReadableByteChannel;
6+
import java.nio.channels.SocketChannel;
7+
8+
/**
9+
* Created by acogoluegnes on 04/10/2016.
10+
*/
11+
public class NioHelper {
12+
13+
static int read(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
14+
int read = channel.read(buffer);
15+
if(read < 0) {
16+
throw new IOException("Channel has reached EOF");
17+
}
18+
return read;
19+
}
20+
21+
}

src/main/java/com/rabbitmq/client/impl/nio/NioLoop.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import java.io.DataInputStream;
2424
import java.io.DataOutputStream;
25+
import java.io.IOException;
2526
import java.nio.ByteBuffer;
2627
import java.nio.channels.SelectionKey;
2728
import java.nio.channels.Selector;
@@ -255,6 +256,12 @@ public void run() {
255256
protected void handleIoError(SocketChannelFrameHandlerState state, Throwable ex) {
256257
if (needToDispatchIoError(state)) {
257258
dispatchIoErrorToConnection(state, ex);
259+
} else {
260+
try {
261+
state.close();
262+
} catch (IOException e) {
263+
264+
}
258265
}
259266
}
260267

src/main/java/com/rabbitmq/client/impl/nio/SocketChannelFrameHandlerState.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ void prepareForReadSequence() throws IOException {
186186
cipherIn.flip();
187187
plainIn.flip();
188188
} else {
189-
channel.read(plainIn);
189+
NioHelper.read(channel, plainIn);
190190
plainIn.flip();
191191
}
192192
}
@@ -196,7 +196,7 @@ boolean continueReading() throws IOException {
196196
if (!plainIn.hasRemaining() && !cipherIn.hasRemaining()) {
197197
// need to try to read something
198198
cipherIn.clear();
199-
int bytesRead = channel.read(cipherIn);
199+
int bytesRead = NioHelper.read(channel, cipherIn);
200200
if (bytesRead <= 0) {
201201
return false;
202202
} else {
@@ -209,7 +209,7 @@ boolean continueReading() throws IOException {
209209
} else {
210210
if (!plainIn.hasRemaining()) {
211211
plainIn.clear();
212-
channel.read(plainIn);
212+
NioHelper.read(channel, plainIn);
213213
plainIn.flip();
214214
}
215215
return plainIn.hasRemaining();

src/main/java/com/rabbitmq/client/impl/nio/SslEngineByteBufferInputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public int read() throws IOException {
6565
cipherIn.clear();
6666
}
6767

68-
int bytesRead = channel.read(cipherIn);
68+
int bytesRead = NioHelper.read(channel, cipherIn);
6969
if (bytesRead > 0) {
7070
cipherIn.flip();
7171
} else {

src/main/java/com/rabbitmq/client/impl/nio/SslEngineHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ private static SSLEngineResult.HandshakeStatus unwrap(ByteBuffer cipherIn, ByteB
8787
throw new SSLException("Buffer overflow during handshake");
8888
case BUFFER_UNDERFLOW:
8989
cipherIn.compact();
90-
channel.read(cipherIn);
90+
NioHelper.read(channel, cipherIn);
9191
cipherIn.flip();
9292
break;
9393
case CLOSED:

0 commit comments

Comments
 (0)