Skip to content

Commit 6f3fd09

Browse files
garyrussellartembilan
authored andcommitted
GH-2837: TCP NIO Prioritize accept
See #2837 Give priority to accepting new connections over reading data from existing connections. Add a flag to allow users to revert to the previous behavior of giving reads priority. **cherry-pick to 5.1.x** # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioServerConnectionFactory.java # src/reference/asciidoc/whats-new.adoc # TcpNioConnectionTests.java
1 parent 6e9bb60 commit 6f3fd09

File tree

4 files changed

+121
-38
lines changed

4 files changed

+121
-38
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,7 @@ public void setSslHandshakeTimeout(int sslHandshakeTimeout) {
476476
* @see #setSslHandshakeTimeout(int)
477477
* @since 4.3.6
478478
*/
479+
@Nullable
479480
protected Integer getSslHandshakeTimeout() {
480481
return this.sslHandshakeTimeout;
481482
}
@@ -724,7 +725,7 @@ else if (key.isAcceptable()) {
724725
doAccept(selector, server, now);
725726
}
726727
catch (Exception e) {
727-
logger.error("Exception accepting new connection", e);
728+
logger.error("Exception accepting new connection(s)", e);
728729
}
729730
}
730731
else {

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioServerConnectionFactory.java

Lines changed: 61 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.ip.tcp.connection;
1818

1919
import java.io.IOException;
20+
import java.io.UncheckedIOException;
2021
import java.net.InetAddress;
2122
import java.net.InetSocketAddress;
2223
import java.net.Socket;
@@ -34,7 +35,7 @@
3435
import org.springframework.util.Assert;
3536

3637
/**
37-
/**
38+
/**
3839
* Implements a server connection factory that produces {@link TcpNioConnection}s using
3940
* a {@link ServerSocketChannel}. Must have a {@link TcpListener} registered.
4041
*
@@ -45,6 +46,8 @@
4546
*/
4647
public class TcpNioServerConnectionFactory extends AbstractServerConnectionFactory {
4748

49+
private boolean multiAccept = true;
50+
4851
private volatile ServerSocketChannel serverChannel;
4952

5053
private volatile boolean usingDirectBuffers;
@@ -63,6 +66,18 @@ public TcpNioServerConnectionFactory(int port) {
6366
super(port);
6467
}
6568

69+
/**
70+
* Set to false to only accept one connection per iteration over the
71+
* selector keys. This might be necessary to avoid accepts overwhelming
72+
* reads of existing sockets. By default when the {@code OP_ACCEPT} operation
73+
* is ready, we will keep accepting connections in a loop until no more arrive.
74+
* @param multiAccept false to accept connections one-at-a-time.
75+
* @since 5.1.4
76+
*/
77+
public void setMultiAccept(boolean multiAccept) {
78+
this.multiAccept = multiAccept;
79+
}
80+
6681
@Override
6782
public String getComponentType() {
6883
return "tcp-nio-server-connection-factory";
@@ -193,55 +208,65 @@ private void doSelect(ServerSocketChannel server, final Selector selector) throw
193208
}
194209

195210
/**
196-
* @param selector The selector.
211+
* @param selectorForNewSocket The selector.
197212
* @param server The server socket channel.
198213
* @param now The current time.
199-
* @throws IOException Any IOException.
200214
*/
201215
@Override
202-
protected void doAccept(final Selector selector, ServerSocketChannel server, long now) throws IOException {
216+
protected void doAccept(final Selector selectorForNewSocket, ServerSocketChannel server, long now) {
203217
logger.debug("New accept");
204-
SocketChannel channel = server.accept();
205-
if (isShuttingDown()) {
206-
if (logger.isInfoEnabled()) {
207-
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
208-
+ ":" + channel.socket().getPort()
209-
+ " rejected; the server is in the process of shutting down.");
210-
}
211-
channel.close();
212-
}
213-
else {
214-
try {
215-
channel.configureBlocking(false);
216-
Socket socket = channel.socket();
217-
setSocketAttributes(socket);
218-
TcpNioConnection connection = createTcpNioConnection(channel);
219-
if (connection == null) {
220-
return;
221-
}
222-
connection.setTaskExecutor(getTaskExecutor());
223-
connection.setLastRead(now);
224-
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
225-
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
218+
try {
219+
SocketChannel channel = null;
220+
do {
221+
channel = server.accept();
222+
if (channel != null) {
223+
if (isShuttingDown()) {
224+
if (logger.isInfoEnabled()) {
225+
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
226+
+ ":" + channel.socket().getPort()
227+
+ " rejected; the server is in the process of shutting down.");
228+
}
229+
channel.close();
230+
}
231+
else {
232+
try {
233+
channel.configureBlocking(false);
234+
Socket socket = channel.socket();
235+
setSocketAttributes(socket);
236+
TcpNioConnection connection = createTcpNioConnection(channel);
237+
if (connection == null) {
238+
return;
239+
}
240+
connection.setTaskExecutor(getTaskExecutor());
241+
connection.setLastRead(now);
242+
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
243+
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
244+
}
245+
this.channelMap.put(channel, connection);
246+
channel.register(selectorForNewSocket, SelectionKey.OP_READ, connection);
247+
connection.publishConnectionOpenEvent();
248+
}
249+
catch (IOException e) {
250+
logger.error("Exception accepting new connection from "
251+
+ channel.socket().getInetAddress().getHostAddress()
252+
+ ":" + channel.socket().getPort(), e);
253+
channel.close();
254+
}
255+
}
226256
}
227-
this.channelMap.put(channel, connection);
228-
channel.register(selector, SelectionKey.OP_READ, connection);
229-
connection.publishConnectionOpenEvent();
230-
}
231-
catch (Exception e) {
232-
logger.error("Exception accepting new connection from "
233-
+ channel.socket().getInetAddress().getHostAddress()
234-
+ ":" + channel.socket().getPort(), e);
235-
channel.close();
236257
}
258+
while (this.multiAccept && channel != null);
259+
}
260+
catch (IOException e) {
261+
throw new UncheckedIOException(e);
237262
}
238263
}
239264

240265
@Nullable
241266
private TcpNioConnection createTcpNioConnection(SocketChannel socketChannel) {
242267
try {
243268
TcpNioConnection connection = this.tcpNioConnectionSupport.createNewConnection(socketChannel, true,
244-
isLookupHost(), getApplicationEventPublisher(), getComponentName());
269+
isLookupHost(), getApplicationEventPublisher(), getComponentName());
245270
connection.setUsingDirectBuffers(this.usingDirectBuffers);
246271
TcpConnectionSupport wrappedConnection = wrapConnection(connection);
247272
initializeConnection(wrappedConnection, socketChannel.socket());

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.nio.channels.SocketChannel;
4848
import java.util.ArrayList;
4949
import java.util.Arrays;
50+
import java.util.Collections;
5051
import java.util.HashMap;
5152
import java.util.HashSet;
5253
import java.util.List;
@@ -79,6 +80,7 @@
7980
import org.springframework.context.ApplicationEventPublisher;
8081
import org.springframework.core.task.AsyncTaskExecutor;
8182
import org.springframework.core.task.SimpleAsyncTaskExecutor;
83+
import org.springframework.integration.ip.event.IpIntegrationEvent;
8284
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
8385
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
8486
import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
@@ -510,7 +512,7 @@ public void transferHeaders() throws Exception {
510512
doAnswer(new Answer<Integer>() {
511513

512514
@Override
513-
public Integer answer(InvocationOnMock invocation) throws Throwable {
515+
public Integer answer(InvocationOnMock invocation) {
514516
ByteBuffer buff = invocation.getArgument(0);
515517
byte[] bytes = written.toByteArray();
516518
buff.put(bytes);
@@ -840,6 +842,57 @@ public void testNoDelayOnClose() throws Exception {
840842
assertThat(watch.getLastTaskTimeMillis(), lessThan(950L));
841843
}
842844

845+
@Test
846+
public void testMultiAccept() throws InterruptedException, IOException {
847+
testMulti(true);
848+
}
849+
850+
@Test
851+
public void testNoMultiAccept() throws InterruptedException, IOException {
852+
testMulti(false);
853+
}
854+
855+
private void testMulti(boolean multiAccept) throws InterruptedException, IOException {
856+
CountDownLatch serverReadyLatch = new CountDownLatch(1);
857+
CountDownLatch latch = new CountDownLatch(21);
858+
List<Socket> sockets = new ArrayList<>();
859+
TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0);
860+
try {
861+
List<IpIntegrationEvent> events = Collections.synchronizedList(new ArrayList<>());
862+
List<Message<?>> messages = Collections.synchronizedList(new ArrayList<>());
863+
server.setMultiAccept(multiAccept);
864+
server.setApplicationEventPublisher(e -> {
865+
if (e instanceof TcpConnectionServerListeningEvent) {
866+
serverReadyLatch.countDown();
867+
}
868+
events.add((IpIntegrationEvent) e);
869+
latch.countDown();
870+
});
871+
server.registerListener(m -> {
872+
messages.add(m);
873+
latch.countDown();
874+
return false;
875+
});
876+
server.afterPropertiesSet();
877+
server.start();
878+
assertTrue(serverReadyLatch.await(10, TimeUnit.SECONDS));
879+
for (int i = 0; i < 10; i++) {
880+
Socket socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
881+
socket.getOutputStream().write("foo\r\n".getBytes());
882+
sockets.add(socket);
883+
}
884+
assertTrue(latch.await(10, TimeUnit.SECONDS));
885+
assertEquals(11, events.size()); // server ready + 10 opens
886+
assertEquals(10, messages.size());
887+
}
888+
finally {
889+
for (Socket socket : sockets) {
890+
socket.close();
891+
}
892+
server.stop();
893+
}
894+
}
895+
843896
private void readFully(InputStream is, byte[] buff) throws IOException {
844897
for (int i = 0; i < buff.length; i++) {
845898
buff[i] = (byte) is.read();

src/reference/asciidoc/ip.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,10 @@ Alternatively, you can insert a resequencer downstream of the inbound endpoint t
971971
If you set `apply-sequence` to `true` on the connection factory, messages arriving on a TCP connection have `sequenceNumber` and `correlationId` headers set.
972972
The resequencer uses these headers to return the messages to their proper sequence.
973973

974+
IMPORTANT: Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections.
975+
This should, generally, have little impact unless you have a very high rate of new incoming connections.
976+
If you wish to revert to the previous behavior of giving reads priority, set the `multiAccept` property on the `TcpNioServerConnectionFactory` to `false`.
977+
974978
==== Pool Size
975979

976980
The pool size attribute is no longer used.

0 commit comments

Comments
 (0)