32
32
import java .util .Iterator ;
33
33
import java .util .Queue ;
34
34
import java .util .Set ;
35
- import java .util .concurrent .*;
35
+ import java .util .concurrent .ExecutorService ;
36
+ import java .util .concurrent .Future ;
37
+ import java .util .concurrent .LinkedBlockingQueue ;
38
+ import java .util .concurrent .ThreadFactory ;
36
39
import java .util .concurrent .atomic .AtomicLong ;
37
40
import java .util .concurrent .locks .Lock ;
38
41
import java .util .concurrent .locks .ReentrantLock ;
@@ -57,7 +60,12 @@ public class SocketChannelFrameHandlerFactory extends AbstractFrameHandlerFactor
57
60
58
61
private Thread readThread , writeThread ;
59
62
60
- private Future <?> readTask , writeTask ;
63
+ private Future <?> writeTask ;
64
+
65
+ // FIXME make the following configuration settings
66
+ // size of byte buffers
67
+ // nb of NIO threads (should be even, 1 thread for read, 1 thread for write to scale IO
68
+ // SocketChannelFrameHandlerState.write timeout
61
69
62
70
public SocketChannelFrameHandlerFactory (int connectionTimeout , SocketConfigurator configurator , boolean ssl , ExecutorService executorService ) throws IOException {
63
71
super (connectionTimeout , configurator , ssl );
@@ -100,10 +108,6 @@ public FrameHandler create(Address addr) throws IOException {
100
108
}
101
109
102
110
protected boolean cleanUp () {
103
- // get connection count
104
- // lock
105
- // if connection count has changed, do nothing
106
- // if connection count hasn't changed, clean
107
111
long connectionCountNow = connectionCount .get ();
108
112
stateLock .lock ();
109
113
try {
@@ -143,11 +147,9 @@ protected boolean cleanUp() {
143
147
144
148
protected void initStateIfNecessary () throws IOException {
145
149
if (this .readSelectorState == null ) {
146
- // create selectors
147
150
this .readSelectorState = new SelectorState (Selector .open ());
148
151
this .writeSelectorState = new SelectorState (Selector .open ());
149
152
150
- // create threads/tasks
151
153
startIoLoops ();
152
154
}
153
155
}
@@ -159,7 +161,7 @@ protected void startIoLoops() {
159
161
readThread .start ();
160
162
writeThread .start ();
161
163
} else {
162
- this .readTask = this . executorService .submit (new ReadLoop (this .readSelectorState ));
164
+ this .executorService .submit (new ReadLoop (this .readSelectorState ));
163
165
this .writeTask = this .executorService .submit (new WriteLoop (this .writeSelectorState ));
164
166
}
165
167
}
@@ -183,7 +185,6 @@ public void run() {
183
185
184
186
for (SelectionKey selectionKey : selector .keys ()) {
185
187
SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState ) selectionKey .attachment ();
186
- // FIXME connection should always be here
187
188
if (state .getConnection ().getHeartbeat () > 0 ) {
188
189
long now = System .currentTimeMillis ();
189
190
if ((now - state .getLastActivity ()) > state .getConnection ().getHeartbeat () * 1000 * 2 ) {
@@ -196,14 +197,6 @@ public void run() {
196
197
}
197
198
}
198
199
}
199
-
200
- // FIXME really necessary? key are supposed to be removed when channel is closed
201
- /*
202
- if(!selectionKey.channel().isOpen()) {
203
- LOGGER.warn("Channel for connection {} closed, removing it from IO thread", state.getConnection());
204
- selectionKey.cancel();
205
- }
206
- */
207
200
}
208
201
209
202
int select ;
@@ -354,7 +347,7 @@ public void run() {
354
347
buffer .put ((byte ) AMQP .PROTOCOL .MINOR );
355
348
buffer .put ((byte ) AMQP .PROTOCOL .REVISION );
356
349
buffer .flip ();
357
- while (buffer .hasRemaining () && channel .write (buffer ) != 0 );
350
+ while (buffer .hasRemaining () && channel .write (buffer ) != - 1 );
358
351
buffer .clear ();
359
352
state .setSendHeader (false );
360
353
}
0 commit comments