Skip to content

GH-2837: TCP NIO Prioritize accept #2865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ else if (key.isAcceptable()) {
doAccept(selector, server, now);
}
catch (Exception e) {
logger.error("Exception accepting new connection", e);
logger.error("Exception accepting new connection(s)", e);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
*/
public class TcpNioServerConnectionFactory extends AbstractServerConnectionFactory {

private boolean multiAccept = true;

private volatile ServerSocketChannel serverChannel;

private volatile boolean usingDirectBuffers;
Expand All @@ -64,6 +66,18 @@ public TcpNioServerConnectionFactory(int port) {
super(port);
}

/**
* Set to false to only accept one connection per iteration over the
* selector keys. This might be necessary to avoid accepts overwhelming
* reads of existing sockets. By default when the {@code OP_ACCEPT} operation
* is ready, we will keep accepting connections in a loop until no more arrive.
* @param multiAccept false to accept connections one-at-a-time.
* @since 5.1.4
*/
public void setMultiAccept(boolean multiAccept) {
this.multiAccept = multiAccept;
}

@Override
public String getComponentType() {
return "tcp-nio-server-connection-factory";
Expand Down Expand Up @@ -204,40 +218,45 @@ private void doSelect(ServerSocketChannel server, final Selector selectorToSelec
protected void doAccept(final Selector selectorForNewSocket, ServerSocketChannel server, long now) {
logger.debug("New accept");
try {
SocketChannel channel = server.accept();
if (isShuttingDown()) {
if (logger.isInfoEnabled()) {
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort()
+ " rejected; the server is in the process of shutting down.");
}
channel.close();
}
else {
try {
channel.configureBlocking(false);
Socket socket = channel.socket();
setSocketAttributes(socket);
TcpNioConnection connection = createTcpNioConnection(channel);
if (connection == null) {
return;
SocketChannel channel = null;
do {
channel = server.accept();
if (channel != null) {
if (isShuttingDown()) {
if (logger.isInfoEnabled()) {
logger.info("New connection from " + channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort()
+ " rejected; the server is in the process of shutting down.");
}
channel.close();
}
connection.setTaskExecutor(getTaskExecutor());
connection.setLastRead(now);
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
else {
try {
channel.configureBlocking(false);
Socket socket = channel.socket();
setSocketAttributes(socket);
TcpNioConnection connection = createTcpNioConnection(channel);
if (connection == null) {
return;
}
connection.setTaskExecutor(getTaskExecutor());
connection.setLastRead(now);
if (getSslHandshakeTimeout() != null && connection instanceof TcpNioSSLConnection) {
((TcpNioSSLConnection) connection).setHandshakeTimeout(getSslHandshakeTimeout());
}
this.channelMap.put(channel, connection);
channel.register(selectorForNewSocket, SelectionKey.OP_READ, connection);
connection.publishConnectionOpenEvent();
}
catch (IOException e) {
logger.error("Exception accepting new connection from "
+ channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort(), e);
channel.close();
}
}
this.channelMap.put(channel, connection);
channel.register(selectorForNewSocket, SelectionKey.OP_READ, connection);
connection.publishConnectionOpenEvent();
}
catch (IOException e) {
logger.error("Exception accepting new connection from "
+ channel.socket().getInetAddress().getHostAddress()
+ ":" + channel.socket().getPort(), e);
channel.close();
}
}
} while (this.multiAccept && channel != null);
}
catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -73,6 +74,7 @@
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.integration.ip.event.IpIntegrationEvent;
import org.springframework.integration.ip.tcp.connection.TcpNioConnection.ChannelInputStream;
import org.springframework.integration.ip.tcp.serializer.ByteArrayCrLfSerializer;
import org.springframework.integration.ip.tcp.serializer.MapJsonSerializer;
Expand Down Expand Up @@ -506,7 +508,7 @@ public void transferHeaders() throws Exception {
doAnswer(new Answer<Integer>() {

@Override
public Integer answer(InvocationOnMock invocation) throws Throwable {
public Integer answer(InvocationOnMock invocation) {
ByteBuffer buff = invocation.getArgument(0);
byte[] bytes = written.toByteArray();
buff.put(bytes);
Expand Down Expand Up @@ -837,6 +839,57 @@ public void testNoDelayOnClose() throws Exception {
assertThat(watch.getLastTaskTimeMillis()).isLessThan(950L);
}

@Test
public void testMultiAccept() throws InterruptedException, IOException {
testMulti(true);
}

@Test
public void testNoMultiAccept() throws InterruptedException, IOException {
testMulti(false);
}

private void testMulti(boolean multiAccept) throws InterruptedException, IOException {
CountDownLatch serverReadyLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(21);
List<Socket> sockets = new ArrayList<>();
TcpNioServerConnectionFactory server = new TcpNioServerConnectionFactory(0);
try {
List<IpIntegrationEvent> events = Collections.synchronizedList(new ArrayList<>());
List<Message<?>> messages = Collections.synchronizedList(new ArrayList<>());
server.setMultiAccept(multiAccept);
server.setApplicationEventPublisher(e -> {
if (e instanceof TcpConnectionServerListeningEvent) {
serverReadyLatch.countDown();
}
events.add((IpIntegrationEvent) e);
latch.countDown();
});
server.registerListener(m -> {
messages.add(m);
latch.countDown();
return false;
});
server.afterPropertiesSet();
server.start();
assertThat(serverReadyLatch.await(10, TimeUnit.SECONDS)).isTrue();
for (int i = 0; i < 10; i++) {
Socket socket = SocketFactory.getDefault().createSocket("localhost", server.getPort());
socket.getOutputStream().write("foo\r\n".getBytes());
sockets.add(socket);
}
assertThat(latch.await(10, TimeUnit.SECONDS));
assertThat(events).hasSize(11); // server ready + 10 opens
assertThat(messages).hasSize(10);
}
finally {
for (Socket socket : sockets) {
socket.close();
}
server.stop();
}
}

private void readFully(InputStream is, byte[] buff) throws IOException {
for (int i = 0; i < buff.length; i++) {
buff[i] = (byte) is.read();
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/ip.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,10 @@ Alternatively, you can insert a resequencer downstream of the inbound endpoint t
If you set `apply-sequence` to `true` on the connection factory, messages arriving on a TCP connection have `sequenceNumber` and `correlationId` headers set.
The resequencer uses these headers to return the messages to their proper sequence.

IMPORTANT: Starting with version 5.1.4, priority is given to accepting new connections over reading from existing connections.
This should, generally, have little impact unless you have a very high rate of new incoming connections.
If you wish to revert to the previous behavior of giving reads priority, set the `multiAccept` property on the `TcpNioServerConnectionFactory` to `false`.

==== Pool Size

The pool size attribute is no longer used.
Expand Down
3 changes: 3 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,6 @@ See <<remote-persistent-flf>> for more information.

The length header used by the `ByteArrayLengthHeaderSerializer` can now include the length of the header in addition to the payload.
See <<tcp-codecs>> for more information.

When using a `TcpNioServerConnectionFactory`, priority is now given to accepting new connections over reading from existing connections, but it is configurable.
See <<note-nio>> for more information.