Skip to content

Switch to NIO #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 44 commits into from
Oct 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7af1936
Implement basic NIO support
acogoluegnes Sep 7, 2016
ceb9caf
Fix loops and IO frame reading
acogoluegnes Sep 8, 2016
ea1d67a
Fix connection sequence
acogoluegnes Sep 8, 2016
482449a
Use only NIO threads
acogoluegnes Sep 8, 2016
f695768
Set a limit in write queue
acogoluegnes Sep 8, 2016
a682f45
Improve frame handling
acogoluegnes Sep 9, 2016
fe2ecff
Fix connection
acogoluegnes Sep 9, 2016
5c080c0
Handle heartbeat and shutdown
acogoluegnes Sep 9, 2016
ef7eeb5
Handle connection shutdown in IO loops
acogoluegnes Sep 12, 2016
6335aa4
Create FrameHandlerFactory interface
acogoluegnes Sep 12, 2016
ed3deab
Clean NIO state automatically
acogoluegnes Sep 13, 2016
2a638c8
Polish
acogoluegnes Sep 13, 2016
554dc6a
Add NIO params
acogoluegnes Sep 14, 2016
85f6737
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 15, 2016
0386b4e
Re-organize NIO classes
acogoluegnes Sep 15, 2016
2d337d9
Add WriteRequest abstraction
acogoluegnes Sep 15, 2016
3f87858
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 15, 2016
65cfd4d
Handle TLS in NIO
acogoluegnes Sep 19, 2016
4bec5d1
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 19, 2016
d6a7478
Use streams in plain NIO loops
acogoluegnes Sep 19, 2016
44684c9
Use same logic for plain/TLS in IO loops
acogoluegnes Sep 20, 2016
c554cf9
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 20, 2016
e533072
Close SSL engine on channel closing
acogoluegnes Sep 20, 2016
22df994
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 20, 2016
4083b65
Check SocketChannel is open before reading/writing
acogoluegnes Sep 20, 2016
2826752
Use one thread for NIO
acogoluegnes Sep 21, 2016
0b8c4dd
Add tests for TLS on NIO
acogoluegnes Sep 21, 2016
5dba9cc
Document NIO API
acogoluegnes Sep 22, 2016
5717a22
Fix TLS handshake
acogoluegnes Sep 22, 2016
0532439
Improve error messages
acogoluegnes Sep 26, 2016
3a1320e
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 28, 2016
3a3eafb
Add TLS handshake test
acogoluegnes Sep 28, 2016
ee94a9e
Add TLS handshake test to test suite
acogoluegnes Sep 28, 2016
6168e97
Shutdown NIO loop ASAP
acogoluegnes Sep 29, 2016
9f3fe90
Add connection re-attemps to stabilize SSL tests
acogoluegnes Sep 29, 2016
33cd661
Repeat reading from socket when reading frame
acogoluegnes Sep 30, 2016
e680acd
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Sep 30, 2016
278c835
Check Connection availability in NIO loop
acogoluegnes Sep 30, 2016
2f19ad0
Merge branch 'master' into rabbitmq-java-client-11
acogoluegnes Oct 3, 2016
22bf675
Remove TLS handshake looping test
acogoluegnes Oct 3, 2016
d74b125
Use default ThreadFactory for NIO in test suite
acogoluegnes Oct 4, 2016
6d912dd
Handle EOF properly in NIO
acogoluegnes Oct 5, 2016
b652b00
Add use-nio profile for Maven test suite
acogoluegnes Oct 5, 2016
9a8db70
Bump NIO buffers to 32 K
acogoluegnes Oct 5, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,28 @@
</build>
</profile>

<!--
Profile to activate the NIO mode in the test suite:
mvn verify -P use-nio
-->
<profile>
<id>use-nio</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<systemPropertyVariables>
<use.nio>true</use.nio>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
</profile>

<profile>
<!--
The "release" Maven profile is used to push release artifacts to a
Expand Down
72 changes: 66 additions & 6 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package com.rabbitmq.client;

import com.rabbitmq.client.impl.*;
import com.rabbitmq.client.impl.nio.NioParams;
import com.rabbitmq.client.impl.nio.SocketChannelFrameHandlerFactory;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;

import javax.net.SocketFactory;
Expand Down Expand Up @@ -108,6 +110,12 @@ public class ConnectionFactory implements Cloneable {

private MetricsCollector metricsCollector;

private boolean nio = false;
private FrameHandlerFactory frameHandlerFactory;
private NioParams nioParams = new NioParams();

private SSLContext sslContext;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -541,7 +549,7 @@ public boolean isSSL(){
}

/**
* Convenience method for setting up a SSL socket factory, using
* Convenience method for setting up a SSL socket factory/engine, using
* the DEFAULT_SSL_PROTOCOL and a trusting TrustManager.
*/
public void useSslProtocol()
Expand All @@ -551,7 +559,7 @@ public void useSslProtocol()
}

/**
* Convenience method for setting up a SSL socket factory, using
* Convenience method for setting up a SSL socket factory/engine, using
* the supplied protocol and a very trusting TrustManager.
*/
public void useSslProtocol(String protocol)
Expand All @@ -561,7 +569,7 @@ public void useSslProtocol(String protocol)
}

/**
* Convenience method for setting up an SSL socket factory.
* Convenience method for setting up an SSL socket factory/engine.
* Pass in the SSL protocol to use, e.g. "TLSv1" or "TLSv1.2".
*
* @param protocol SSL protocol to use.
Expand All @@ -575,13 +583,14 @@ public void useSslProtocol(String protocol, TrustManager trustManager)
}

/**
* Convenience method for setting up an SSL socket factory.
* Convenience method for setting up an SSL socket factory/engine.
* Pass in an initialized SSLContext.
*
* @param context An initialized SSLContext
*/
public void useSslProtocol(SSLContext context) {
setSocketFactory(context.getSocketFactory());
this.sslContext = context;
}

public static String computeDefaultTlsProcotol(String[] supportedProtocols) {
Expand Down Expand Up @@ -641,8 +650,19 @@ public MetricsCollector getMetricsCollector() {
return metricsCollector;
}

protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
protected synchronized FrameHandlerFactory createFrameHandlerFactory() throws IOException {
if(nio) {
if(this.frameHandlerFactory == null) {
if(this.nioParams.getNioExecutor() == null && this.nioParams.getThreadFactory() == null) {
this.nioParams.setThreadFactory(getThreadFactory());
}
this.frameHandlerFactory = new SocketChannelFrameHandlerFactory(connectionTimeout, nioParams, isSSL(), sslContext);
}
return this.frameHandlerFactory;
} else {
return new SocketFrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
}

}

/**
Expand Down Expand Up @@ -1019,4 +1039,44 @@ public void setNetworkRecoveryInterval(int networkRecoveryInterval) {
public void setNetworkRecoveryInterval(long networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}

/**
* Sets the parameters when using NIO.
*
*
* @param nioParams
* @see NioParams
*/
public void setNioParams(NioParams nioParams) {
this.nioParams = nioParams;
}

/**
* Use non-blocking IO (NIO) for communication with the server.
* With NIO, several connections created from the same {@link ConnectionFactory}
* can use the same IO thread.
*
* A client process using a lot of not-so-active connections can benefit
* from NIO, as it would use fewer threads than with the traditional, blocking IO mode.
*
* Use {@link NioParams} to tune NIO and a {@link SocketChannelConfigurator} to
* configure the underlying {@link java.nio.channels.SocketChannel}s for connections.
*
* @see NioParams
* @see SocketChannelConfigurator
* @see java.nio.channels.SocketChannel
* @see java.nio.channels.Selector
*/
public void useNio() {
this.nio = true;
}

/**
* Use blocking IO for communication with the server.
* With blocking IO, each connection creates its own thread
* to read data from the server.
*/
public void useBlockingIo() {
this.nio = false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;
import java.nio.channels.SocketChannel;

public class DefaultSocketChannelConfigurator implements SocketChannelConfigurator {

/**
* Provides a hook to insert custom configuration of the {@link SocketChannel}s
* used to connect to an AMQP server before they connect.
*
* The default behaviour of this method is to disable Nagle's
* algorithm to get more consistently low latency. However it
* may be overridden freely and there is no requirement to retain
* this behaviour.
*
* @param socketChannel The socket channel that is to be used for the Connection
*/
@Override
public void configure(SocketChannel socketChannel) throws IOException {
socketChannel.socket().setTcpNoDelay(true);
}
}
29 changes: 29 additions & 0 deletions src/main/java/com/rabbitmq/client/SocketChannelConfigurator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2
// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see
// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].

package com.rabbitmq.client;

import java.io.IOException;
import java.nio.channels.SocketChannel;

public interface SocketChannelConfigurator {

/**
* Provides a hook to insert custom configuration of the {@link SocketChannel}s
* used to connect to an AMQP server before they connect.
*/
void configure(SocketChannel socketChannel) throws IOException;

}
8 changes: 6 additions & 2 deletions src/main/java/com/rabbitmq/client/impl/AMQChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ public void handleCompleteInboundCommand(AMQCommand command) throws IOException
if (!processAsync(command)) {
// The filter decided not to handle/consume the command,
// so it must be some reply to an earlier RPC.
nextOutstandingRpc().handleCommand(command);
markRpcFinished();
RpcContinuation nextOutstandingRpc = nextOutstandingRpc();
// the outstanding RPC can be null when calling Channel#asyncRpc
if(nextOutstandingRpc != null) {
nextOutstandingRpc.handleCommand(command);
markRpcFinished();
}
}
}

Expand Down
Loading