3232import java .util .Iterator ;
3333import java .util .Queue ;
3434import java .util .Set ;
35- import java .util .concurrent .ExecutorService ;
36- import java .util .concurrent .Executors ;
37- import java .util .concurrent .LinkedBlockingQueue ;
38- import java .util .concurrent .ThreadFactory ;
35+ import java .util .concurrent .* ;
36+ import java .util .concurrent .atomic . AtomicLong ;
37+ import java .util .concurrent .locks . Lock ;
38+ import java .util .concurrent .locks . ReentrantLock ;
3939
4040/**
4141 *
@@ -44,19 +44,31 @@ public class SocketChannelFrameHandlerFactory extends AbstractFrameHandlerFactor
4444
4545 private static final Logger LOGGER = LoggerFactory .getLogger (SocketChannelFrameHandlerFactory .class );
4646
47- private final SelectorState readSelectorState ;
48- private final SelectorState writeSelectorState ;
47+ private SelectorState readSelectorState ;
48+ private SelectorState writeSelectorState ;
4949
50- // FIXME provide constructor with executorservice
51- private final ExecutorService executorService = null ;
50+ private final ExecutorService executorService ;
5251
53- private final ThreadFactory threadFactory = Executors . defaultThreadFactory () ;
52+ private final ThreadFactory threadFactory ;
5453
55- public SocketChannelFrameHandlerFactory (int connectionTimeout , SocketConfigurator configurator , boolean ssl ) throws IOException {
54+ private final Lock stateLock = new ReentrantLock ();
55+
56+ private final AtomicLong connectionCount = new AtomicLong ();
57+
58+ private Thread readThread , writeThread ;
59+
60+ private Future <?> readTask , writeTask ;
61+
62+ public SocketChannelFrameHandlerFactory (int connectionTimeout , SocketConfigurator configurator , boolean ssl , ExecutorService executorService ) throws IOException {
63+ super (connectionTimeout , configurator , ssl );
64+ this .executorService = executorService ;
65+ this .threadFactory = null ;
66+ }
67+
68+ public SocketChannelFrameHandlerFactory (int connectionTimeout , SocketConfigurator configurator , boolean ssl , ThreadFactory threadFactory ) throws IOException {
5669 super (connectionTimeout , configurator , ssl );
57- this .readSelectorState = new SelectorState (Selector .open ());
58- this .writeSelectorState = new SelectorState (Selector .open ());
59- startIoLoops ();
70+ this .executorService = null ;
71+ this .threadFactory = threadFactory ;
6072 }
6173
6274 @ Override
@@ -67,6 +79,15 @@ public FrameHandler create(Address addr) throws IOException {
6779 SocketChannel channel = SocketChannel .open ();
6880 configurator .configure (channel .socket ());
6981
82+ // lock
83+ stateLock .lock ();
84+ try {
85+ connectionCount .incrementAndGet ();
86+ initStateIfNecessary ();
87+ } finally {
88+ stateLock .unlock ();
89+ }
90+
7091 // FIXME handle connection failure
7192 channel .connect (address );
7293
@@ -78,15 +99,68 @@ public FrameHandler create(Address addr) throws IOException {
7899 return frameHandler ;
79100 }
80101
102+ protected boolean cleanUp () {
103+ // get connection count
104+ // lock
105+ // if connection count has changed, do nothing
106+ // if connection count hasn't changed, clean
107+ long connectionCountNow = connectionCount .get ();
108+ stateLock .lock ();
109+ try {
110+ if (connectionCountNow != connectionCount .get ()) {
111+ // a connection request has come in meanwhile, don't do anything
112+ return false ;
113+ }
114+
115+ if (this .executorService == null ) {
116+ this .writeThread .interrupt ();
117+ } else {
118+ boolean canceled = this .writeTask .cancel (true );
119+ if (!canceled ) {
120+ LOGGER .info ("Could not stop write NIO task" );
121+ }
122+ }
123+
124+ try {
125+ readSelectorState .selector .close ();
126+ } catch (IOException e ) {
127+ LOGGER .warn ("Could not close read selector: {}" , e .getMessage ());
128+ }
129+ try {
130+ writeSelectorState .selector .close ();
131+ } catch (IOException e ) {
132+ LOGGER .warn ("Could not close write selector: {}" , e .getMessage ());
133+ }
134+
135+ this .readSelectorState = null ;
136+ this .writeSelectorState = null ;
137+
138+ } finally {
139+ stateLock .unlock ();
140+ }
141+ return true ;
142+ }
143+
144+ protected void initStateIfNecessary () throws IOException {
145+ if (this .readSelectorState == null ) {
146+ // create selectors
147+ this .readSelectorState = new SelectorState (Selector .open ());
148+ this .writeSelectorState = new SelectorState (Selector .open ());
149+
150+ // create threads/tasks
151+ startIoLoops ();
152+ }
153+ }
154+
81155 protected void startIoLoops () {
82156 if (executorService == null ) {
83- Thread readThread = Environment .newThread (threadFactory , new ReadLoop (readSelectorState ), "rabbitmq-nio-read" );
84- Thread writeThread = Environment .newThread (threadFactory , new WriteLoop (writeSelectorState ), "rabbitmq-nio-write" );
157+ this . readThread = Environment .newThread (threadFactory , new ReadLoop (this . readSelectorState ), "rabbitmq-nio-read" );
158+ this . writeThread = Environment .newThread (threadFactory , new WriteLoop (this . writeSelectorState ), "rabbitmq-nio-write" );
85159 readThread .start ();
86160 writeThread .start ();
87161 } else {
88- this .executorService .submit (new ReadLoop (readSelectorState ));
89- this .executorService .submit (new WriteLoop (writeSelectorState ));
162+ this .readTask = this . executorService .submit (new ReadLoop (this . readSelectorState ));
163+ this .writeTask = this . executorService .submit (new WriteLoop (this . writeSelectorState ));
90164 }
91165 }
92166
@@ -104,9 +178,9 @@ public void run() {
104178 // FIXME find a better default?
105179 ByteBuffer buffer = ByteBuffer .allocate (8192 );
106180 try {
181+ int idlenessCount = 0 ;
107182 while (true && !Thread .currentThread ().isInterrupted ()) {
108183
109- // if there's no read key anymore
110184 for (SelectionKey selectionKey : selector .keys ()) {
111185 SocketChannelFrameHandlerState state = (SocketChannelFrameHandlerState ) selectionKey .attachment ();
112186 // FIXME connection should always be here
@@ -123,18 +197,30 @@ public void run() {
123197 }
124198 }
125199
200+ // FIXME really necessary? key are supposed to be removed when channel is closed
201+ /*
126202 if(!selectionKey.channel().isOpen()) {
127- // FIXME maybe better to wait for an exception and to trigger AMQConnection shutdown?
128- // or check if AMQConnection is closed and init shutdown if appropriate
129203 LOGGER.warn("Channel for connection {} closed, removing it from IO thread", state.getConnection());
130204 selectionKey.cancel();
131205 }
206+ */
132207 }
133208
134209 int select ;
135210 if (state .statesToBeRegistered .isEmpty ()) {
136211 // we can block, registration will call Selector.wakeup()
137212 select = selector .select (1000 );
213+ idlenessCount ++;
214+ if (idlenessCount == 10 && selector .keys ().size () == 0 ) {
215+ //if(false) {
216+ // we haven't been doing anything for a while, shutdown state
217+ boolean clean = cleanUp ();
218+ if (clean ) {
219+ // we stop this thread
220+ return ;
221+ }
222+ // there may be incoming connections, keep going
223+ }
138224 } else {
139225 // we don't have to block, we need to select and clean cancelled keys before registration
140226 select = selector .selectNow ();
@@ -149,6 +235,7 @@ public void run() {
149235 }
150236
151237 if (select > 0 ) {
238+ idlenessCount = 0 ;
152239 Set <SelectionKey > readyKeys = selector .selectedKeys ();
153240 Iterator <SelectionKey > iterator = readyKeys .iterator ();
154241 while (iterator .hasNext ()) {
@@ -165,11 +252,19 @@ public void run() {
165252 Frame frame = Frame .readFrom (channel , buffer );
166253
167254 try {
168- state .getConnection ().handleReadFrame (frame );
255+ boolean noProblem = state .getConnection ().handleReadFrame (frame );
256+ if (noProblem && (!state .getConnection ().isRunning () || state .getConnection ().hasBrokerInitiatedShutdown ())) {
257+ // looks like the frame was Close-Ok or Close
258+ dispatchShutdownToConnection (state );
259+ key .cancel ();
260+ break ;
261+ }
262+
169263 } catch (Throwable ex ) {
170264 // problem during frame processing, tell connection, and
171265 // we can stop for this channel
172266 handleIoError (state , ex );
267+ key .cancel ();
173268 break ;
174269 }
175270
@@ -231,7 +326,13 @@ public void run() {
231326 RegistrationState registration ;
232327 while ((registration = state .statesToBeRegistered .poll ()) != null ) {
233328 int operations = registration .operations ;
234- registration .state .getChannel ().register (selector , operations , registration .state );
329+ try {
330+ registration .state .getChannel ().register (selector , operations , registration .state );
331+ } catch (Exception e ) {
332+ // can happen if the channel has been closed since the operation has been enqueued
333+ LOGGER .info ("Error while registering socket channel for write: {}" , e .getMessage ());
334+ }
335+
235336 }
236337
237338 if (select > 0 ) {
@@ -264,10 +365,11 @@ public void run() {
264365 frame .writeTo (channel , buffer );
265366 written ++;
266367 }
368+ Frame .drain (channel , buffer );
267369 } catch (Exception e ) {
268370 handleIoError (state , e );
269371 } finally {
270- Frame . drain ( channel , buffer );
372+ buffer . clear ( );
271373 key .cancel ();
272374 }
273375 }
@@ -310,6 +412,22 @@ public void run() {
310412 }
311413 }
312414
415+ protected void dispatchShutdownToConnection (final SocketChannelFrameHandlerState state ) {
416+ Runnable shutdown = new Runnable () {
417+ @ Override
418+ public void run () {
419+ state .getConnection ().doFinalShutdown ();
420+ }
421+ };
422+ if (this .executorService == null ) {
423+ String name = "rabbitmq-connection-shutdown-" + state .getConnection ();
424+ Thread shutdownThread = Environment .newThread (threadFactory , shutdown , name );
425+ shutdownThread .start ();
426+ } else {
427+ this .executorService .submit (shutdown );
428+ }
429+ }
430+
313431 public static class RegistrationState {
314432
315433 private final SocketChannelFrameHandlerState state ;
0 commit comments