3636
3737/**
3838 * A client connection factory that creates {@link TcpNioConnection}s.
39+ *
3940 * @author Gary Russell
4041 * @author Artem Bilan
42+ *
4143 * @since 2.0
4244 *
4345 */
4446public class TcpNioClientConnectionFactory extends
4547 AbstractClientConnectionFactory implements SchedulingAwareRunnable {
4648
47- private volatile boolean usingDirectBuffers ;
49+ private final Map < SocketChannel , TcpNioConnection > channelMap = new ConcurrentHashMap <>() ;
4850
49- private volatile Selector selector ;
51+ private final BlockingQueue < SocketChannel > newChannels = new LinkedBlockingQueue <>() ;
5052
51- private final Map < SocketChannel , TcpNioConnection > channelMap = new ConcurrentHashMap < SocketChannel , TcpNioConnection >() ;
53+ private boolean usingDirectBuffers ;
5254
53- private final BlockingQueue < SocketChannel > newChannels = new LinkedBlockingQueue < SocketChannel > ();
55+ private TcpNioConnectionSupport tcpNioConnectionSupport = new DefaultTcpNioConnectionSupport ();
5456
55- private volatile TcpNioConnectionSupport tcpNioConnectionSupport = new DefaultTcpNioConnectionSupport () ;
57+ private volatile Selector selector ;
5658
5759 /**
5860 * Creates a TcpNioClientConnectionFactory for connections to the host and port.
@@ -69,12 +71,12 @@ protected void checkActive() {
6971 int n = 0 ;
7072 while (this .selector == null ) {
7173 try {
72- Thread .sleep (100 );
74+ Thread .sleep (100 ); // NOSONAR magic number
7375 }
7476 catch (@ SuppressWarnings ("unused" ) InterruptedException e ) {
7577 Thread .currentThread ().interrupt ();
7678 }
77- if (n ++ > 600 ) {
79+ if (n ++ > 600 ) { // NOSONAR magic number
7880 throw new UncheckedIOException (new IOException ("Factory failed to start" ));
7981 }
8082 }
@@ -85,17 +87,19 @@ protected TcpConnectionSupport buildNewConnection() {
8587 try {
8688 SocketChannel socketChannel = SocketChannel .open (new InetSocketAddress (getHost (), getPort ()));
8789 setSocketAttributes (socketChannel .socket ());
88- TcpNioConnection connection = this .tcpNioConnectionSupport .createNewConnection (
89- socketChannel , false , this .isLookupHost (), this .getApplicationEventPublisher (), getComponentName ());
90+ TcpNioConnection connection =
91+ this .tcpNioConnectionSupport .createNewConnection (socketChannel , false , isLookupHost (),
92+ getApplicationEventPublisher (), getComponentName ());
9093 connection .setUsingDirectBuffers (this .usingDirectBuffers );
91- connection .setTaskExecutor (this .getTaskExecutor ());
92- if (getSslHandshakeTimeout () != null && connection instanceof TcpNioSSLConnection ) {
93- ((TcpNioSSLConnection ) connection ).setHandshakeTimeout (getSslHandshakeTimeout ());
94+ connection .setTaskExecutor (getTaskExecutor ());
95+ Integer sslHandshakeTimeout = getSslHandshakeTimeout ();
96+ if (sslHandshakeTimeout != null && connection instanceof TcpNioSSLConnection ) {
97+ ((TcpNioSSLConnection ) connection ).setHandshakeTimeout (sslHandshakeTimeout );
9498 }
9599 TcpConnectionSupport wrappedConnection = wrapConnection (connection );
96100 initializeConnection (wrappedConnection , socketChannel .socket ());
97101 socketChannel .configureBlocking (false );
98- if (this . getSoTimeout () > 0 ) {
102+ if (getSoTimeout () > 0 ) {
99103 connection .setLastRead (System .currentTimeMillis ());
100104 }
101105 this .channelMap .put (socketChannel , connection );
@@ -144,9 +148,9 @@ public void stop() {
144148 @ Override
145149 public void start () {
146150 synchronized (this .lifecycleMonitor ) {
147- if (!this . isActive ()) {
148- this . setActive (true );
149- this . getTaskExecutor ().execute (this );
151+ if (!isActive ()) {
152+ setActive (true );
153+ getTaskExecutor ().execute (this );
150154 }
151155 }
152156 super .start ();
@@ -155,51 +159,51 @@ public void start() {
155159 @ Override
156160 public void run () {
157161 if (logger .isDebugEnabled ()) {
158- logger .debug ("Read selector running for connections to " + this . getHost () + ":" + this . getPort ());
162+ logger .debug ("Read selector running for connections to " + getHost () + ":" + getPort ());
159163 }
160164 try {
161165 this .selector = Selector .open ();
162- while (this .isActive ()) {
163- SocketChannel newChannel ;
164- int soTimeout = this .getSoTimeout ();
165- int selectionCount = 0 ;
166- try {
167- long timeout = soTimeout < 0 ? 0 : soTimeout ;
168- if (getDelayedReads ().size () > 0 && (timeout == 0 || getReadDelay () < timeout )) {
169- timeout = getReadDelay ();
170- }
171- selectionCount = this .selector .select (timeout );
172- }
173- catch (@ SuppressWarnings ("unused" ) CancelledKeyException cke ) {
174- if (logger .isDebugEnabled ()) {
175- logger .debug ("CancelledKeyException during Selector.select()" );
176- }
177- }
178- while ((newChannel = this .newChannels .poll ()) != null ) {
179- try {
180- newChannel .register (this .selector , SelectionKey .OP_READ , this .channelMap .get (newChannel ));
181- }
182- catch (@ SuppressWarnings ("unused" ) ClosedChannelException cce ) {
183- if (logger .isDebugEnabled ()) {
184- logger .debug ("Channel closed before registering with selector for reading" );
185- }
186- }
187- }
188- processNioSelections (selectionCount , this .selector , null , this .channelMap );
166+ while (isActive ()) {
167+ processSelectorWhileActive ();
189168 }
190169 }
191170 catch (ClosedSelectorException cse ) {
192- if (this . isActive ()) {
171+ if (isActive ()) {
193172 logger .error ("Selector closed" , cse );
194173 }
195174 }
196175 catch (Exception e ) {
197176 logger .error ("Exception in read selector thread" , e );
198- this . setActive (false );
177+ setActive (false );
199178 }
200179 if (logger .isDebugEnabled ()) {
201- logger .debug ("Read selector exiting for connections to " + this .getHost () + ":" + this .getPort ());
180+ logger .debug ("Read selector exiting for connections to " + getHost () + ":" + getPort ());
181+ }
182+ }
183+
184+ private void processSelectorWhileActive () throws IOException {
185+ SocketChannel newChannel ;
186+ int soTimeout = getSoTimeout ();
187+ int selectionCount = 0 ;
188+ try {
189+ long timeout = soTimeout < 0 ? 0 : soTimeout ;
190+ if (getDelayedReads ().size () > 0 && (timeout == 0 || getReadDelay () < timeout )) {
191+ timeout = getReadDelay ();
192+ }
193+ selectionCount = this .selector .select (timeout );
194+ }
195+ catch (@ SuppressWarnings ("unused" ) CancelledKeyException cke ) {
196+ logger .debug ("CancelledKeyException during Selector.select()" );
197+ }
198+ while ((newChannel = this .newChannels .poll ()) != null ) {
199+ try {
200+ newChannel .register (this .selector , SelectionKey .OP_READ , this .channelMap .get (newChannel ));
201+ }
202+ catch (@ SuppressWarnings ("unused" ) ClosedChannelException cce ) {
203+ logger .debug ("Channel closed before registering with selector for reading" );
204+ }
202205 }
206+ processNioSelections (selectionCount , this .selector , null , this .channelMap );
203207 }
204208
205209 /**
0 commit comments