Skip to content

Commit 9428e34

Browse files
garyrussellartembilan
authored andcommitted
GH-2837: TCP NIO Prioritize accept
See #2837 Give priority to accepting new connections over reading data from existing connections. Add a flag to allow users to revert to the previous behavior of giving reads priority. **cherry-pick to 5.1.x**
1 parent 56c4458 commit 9428e34

File tree

5 files changed

+112
-33
lines changed

5 files changed

+112
-33
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ else if (key.isAcceptable()) {
726726
doAccept(selector, server, now);
727727
}
728728
catch (Exception e) {
729-
logger.error("Exception accepting new connection", e);
729+
logger.error("Exception accepting new connection(s)", e);
730730
}
731731
}
732732
else {

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioServerConnectionFactory.java

+50-31
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
*/
4747
public class TcpNioServerConnectionFactory extends AbstractServerConnectionFactory {
4848

49+
private boolean multiAccept = true;
50+
4951
private volatile ServerSocketChannel serverChannel;
5052

5153
private volatile boolean usingDirectBuffers;
@@ -64,6 +66,18 @@ public TcpNioServerConnectionFactory(int port) {
6466
super(port);
6567
}
6668

69+
/**
70+
* Set to false to only accept one connection per iteration over the
71+
* selector keys. This might be necessary to avoid accepts overwhelming
72+
* reads of existing sockets. By default when the {@code OP_ACCEPT} operation
73+
* is ready, we will keep accepting connections in a loop until no more arrive.
74+
* @param multiAccept false to accept connections one-at-a-time.
75+
* @since 5.1.4
76+
*/
77+
public void setMultiAccept(boolean multiAccept) {
78+
this.multiAccept = multiAccept;
79+
}
80+
6781
@Override
6882
public String getComponentType() {
6983
return "tcp-nio-server-connection-factory";
@@ -204,40 +218,45 @@ private void doSelect(ServerSocketChannel server, final Selector selectorToSelec
204218
protected void doAccept(final Selector selectorForNewSocket, ServerSocketChannel server, long now) {
205219
logger.debug("New accept");
206220
try {
207-
SocketChannel channel = server.accept();
208-
if (isShuttingDown()) {
209-
if (logger.isInfoEnabled()) {
210-
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
211-
+ ":" + channel.socket().getPort()
212-
+ " rejected; the server is in the process of shutting down.");
213-
}
214-
channel.close();
215-
}
216-
else {
217-
try {
218-
channel.configureBlocking(false);
219-
Socket socket = channel.socket();
220-
setSocketAttributes(socket);
221-
TcpNioConnection connection = createTcpNioConnection(channel);
222-
if (connection == null) {
223-
return;
221+
SocketChannel channel = null;
222+
do {
223+
channel = server.accept();
224+
if (channel != null) {
225+
if (isShuttingDown()) {
226+
if (logger.isInfoEnabled()) {
227+
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
228+
+ ":" + channel.socket().getPort()
229+
+ " rejected; the server is in the process of shutting down.");
230+
}
231+
channel.close();
224232
}
225-
connection.setTaskExecutor(getTaskExecutor());
226-
connection.setLastRead(now);
227-
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
228-
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
233+
else {
234+
try {
235+
channel.configureBlocking(false);
236+
Socket socket = channel.socket();
237+
setSocketAttributes(socket);
238+
TcpNioConnection connection = createTcpNioConnection(channel);
239+
if (connection == null) {
240+
return;
241+
}
242+
connection.setTaskExecutor(getTaskExecutor());
243+
connection.setLastRead(now);
244+
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
245+
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
246+
}
247+
this.channelMap.put(channel, connection);
248+
channel.register(selectorForNewSocket, SelectionKey.OP_READ, connection);
249+
connection.publishConnectionOpenEvent();
250+
}
251+
catch (IOException e) {
252+
logger.error("Exception accepting new connection from "
253+
+ channel.socket().getInetAddress().getHostAddress()
254+
+ ":" + channel.socket().getPort(), e);
255+
channel.close();
256+
}
229257
}
230-
this.channelMap.put(channel, connection);
231-
channel.register(selectorForNewSocket, SelectionKey.OP_READ, connection);
232-
connection.publishConnectionOpenEvent();
233258
}
234-
catch (IOException e) {
235-
logger.error("Exception accepting new connection from "
236-
+ channel.socket().getInetAddress().getHostAddress()
237-
+ ":" + channel.socket().getPort(), e);
238-
channel.close();
239-
}
240-
}
259+
} while (this.multiAccept && channel != null);
241260
}
242261
catch (IOException e) {
243262
throw new UncheckedIOException(e);

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java

+54-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.nio.channels.SocketChannel;
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
44+
import java.util.Collections;
4445
import java.util.HashMap;
4546
import java.util.HashSet;
4647
import java.util.List;
@@ -73,6 +74,7 @@
7374
import org.springframework.context.ApplicationEventPublisher;
7475
import org.springframework.core.task.AsyncTaskExecutor;
7576
import org.springframework.core.task.SimpleAsyncTaskExecutor;
77+
import org.springframework.integration.ip.event.IpIntegrationEvent;
7678
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
7779
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
7880
import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
@@ -506,7 +508,7 @@ public void transferHeaders() throws Exception {
506508
doAnswer(new Answer<Integer>() {
507509

508510
@Override
509-
public Integer answer(InvocationOnMock invocation) throws Throwable {
511+
public Integer answer(InvocationOnMock invocation) {
510512
ByteBuffer buff = invocation.getArgument(0);
511513
byte[] bytes = written.toByteArray();
512514
buff.put(bytes);
@@ -837,6 +839,57 @@ public void testNoDelayOnClose() throws Exception {
837839
assertThat(watch.getLastTaskTimeMillis()).isLessThan(950L);
838840
}
839841

842+
@Test
843+
public void testMultiAccept() throws InterruptedException, IOException {
844+
testMulti(true);
845+
}
846+
847+
@Test
848+
public void testNoMultiAccept() throws InterruptedException, IOException {
849+
testMulti(false);
850+
}
851+
852+
private void testMulti(boolean multiAccept) throws InterruptedException, IOException {
853+
CountDownLatch serverReadyLatch = new CountDownLatch(1);
854+
CountDownLatch latch = new CountDownLatch(21);
855+
List<Socket> sockets = new ArrayList<>();
856+
TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0);
857+
try {
858+
List<IpIntegrationEvent> events = Collections.synchronizedList(new ArrayList<>());
859+
List<Message<?>> messages = Collections.synchronizedList(new ArrayList<>());
860+
server.setMultiAccept(multiAccept);
861+
server.setApplicationEventPublisher(e -> {
862+
if (e instanceof TcpConnectionServerListeningEvent) {
863+
serverReadyLatch.countDown();
864+
}
865+
events.add((IpIntegrationEvent) e);
866+
latch.countDown();
867+
});
868+
server.registerListener(m -> {
869+
messages.add(m);
870+
latch.countDown();
871+
return false;
872+
});
873+
server.afterPropertiesSet();
874+
server.start();
875+
assertThat(serverReadyLatch.await(10, TimeUnit.SECONDS)).isTrue();
876+
for (int i = 0; i < 10; i++) {
877+
Socket socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
878+
socket.getOutputStream().write("foo\r\n".getBytes());
879+
sockets.add(socket);
880+
}
881+
assertThat(latch.await(10, TimeUnit.SECONDS));
882+
assertThat(events).hasSize(11); // server ready + 10 opens
883+
assertThat(messages).hasSize(10);
884+
}
885+
finally {
886+
for (Socket socket : sockets) {
887+
socket.close();
888+
}
889+
server.stop();
890+
}
891+
}
892+
840893
private void readFully(InputStream is, byte[] buff) throws IOException {
841894
for (int i = 0; i < buff.length; i++) {
842895
buff[i] = (byte) is.read();

src/reference/asciidoc/ip.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,10 @@ Alternatively, you can insert a resequencer downstream of the inbound endpoint t
974974
If you set `apply-sequence` to `true` on the connection factory, messages arriving on a TCP connection have `sequenceNumber` and `correlationId` headers set.
975975
The resequencer uses these headers to return the messages to their proper sequence.
976976

977+
IMPORTANT: Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections.
978+
This should, generally, have little impact unless you have a very high rate of new incoming connections.
979+
If you wish to revert to the previous behavior of giving reads priority, set the `multiAccept` property on the `TcpNioServerConnectionFactory` to `false`.
980+
977981
==== Pool Size
978982

979983
The pool size attribute is no longer used.

src/reference/asciidoc/whats-new.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,6 @@ See <<remote-persistent-flf>> for more information.
3636

3737
The length header used by the `ByteArrayLengthHeaderSerializer` can now include the length of the header in addition to the payload.
3838
See <<tcp-codecs>> for more information.
39+
40+
When using a `TcpNioServerConnectionFactory`, priority is now given to accepting new connections over reading from existing connections, but it is configurable.
41+
See <<note-nio>> for more information.

0 commit comments

Comments
 (0)