Skip to content

Commit 5781c45

Browse files
committed
Simplify StreamFactoryFactory implementations
* Remove builders * Remove now-unused factory configuration * Simplify usage in MongoClient instantiation JAVA-5161
1 parent 7fbd48b commit 5781c45

File tree

10 files changed

+41
-180
lines changed

10 files changed

+41
-180
lines changed

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.net.SocketAddress;
2828
import java.net.StandardSocketOptions;
2929
import java.nio.ByteBuffer;
30-
import java.nio.channels.AsynchronousChannelGroup;
3130
import java.nio.channels.AsynchronousSocketChannel;
3231
import java.nio.channels.CompletionHandler;
3332
import java.util.LinkedList;
@@ -44,14 +43,12 @@
4443
public final class AsynchronousSocketChannelStream extends AsynchronousChannelStream {
4544
private final ServerAddress serverAddress;
4645
private final SocketSettings settings;
47-
private final AsynchronousChannelGroup group;
4846

4947
public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final SocketSettings settings,
50-
final PowerOfTwoBufferPool bufferProvider, final AsynchronousChannelGroup group) {
48+
final PowerOfTwoBufferPool bufferProvider) {
5149
super(serverAddress, settings, bufferProvider);
5250
this.serverAddress = serverAddress;
5351
this.settings = settings;
54-
this.group = group;
5552
}
5653

5754
@SuppressWarnings("deprecation")
@@ -77,7 +74,7 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
7774
SocketAddress socketAddress = socketAddressQueue.poll();
7875

7976
try {
80-
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open(group);
77+
AsynchronousSocketChannel attemptConnectionChannel = AsynchronousSocketChannel.open();
8178
attemptConnectionChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
8279
attemptConnectionChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
8380
if (settings.getReceiveBufferSize() > 0) {
@@ -97,10 +94,6 @@ private void initializeSocketChannel(final AsyncCompletionHandler<Void> handler,
9794
}
9895
}
9996

100-
public AsynchronousChannelGroup getGroup() {
101-
return group;
102-
}
103-
10497
private class OpenCompletionHandler implements CompletionHandler<Void, Object> {
10598
private final AtomicReference<AsyncCompletionHandler<Void>> handlerReference;
10699
private final Queue<SocketAddress> socketAddressQueue;

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactory.java

+1-21
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
import com.mongodb.ServerAddress;
2020
import com.mongodb.connection.SocketSettings;
2121
import com.mongodb.connection.SslSettings;
22-
import com.mongodb.lang.Nullable;
23-
24-
import java.nio.channels.AsynchronousChannelGroup;
2522

2623
import static com.mongodb.assertions.Assertions.assertFalse;
2724
import static com.mongodb.assertions.Assertions.notNull;
@@ -32,7 +29,6 @@
3229
public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
3330
private final PowerOfTwoBufferPool bufferProvider = PowerOfTwoBufferPool.DEFAULT;
3431
private final SocketSettings settings;
35-
private final AsynchronousChannelGroup group;
3632

3733
/**
3834
* Create a new factory with the default {@code BufferProvider} and {@code AsynchronousChannelGroup}.
@@ -41,29 +37,13 @@ public class AsynchronousSocketChannelStreamFactory implements StreamFactory {
4137
* @param sslSettings the settings for connecting via SSL
4238
*/
4339
public AsynchronousSocketChannelStreamFactory(final SocketSettings settings, final SslSettings sslSettings) {
44-
this(settings, sslSettings, null);
45-
}
46-
47-
/**
48-
* Create a new factory.
49-
*
50-
* @param settings the socket settings
51-
* @param sslSettings the SSL settings
52-
* @param group the {@code AsynchronousChannelGroup} to use or null for the default group
53-
*
54-
* @since 3.6
55-
*/
56-
public AsynchronousSocketChannelStreamFactory(final SocketSettings settings, final SslSettings sslSettings,
57-
@Nullable final AsynchronousChannelGroup group) {
5840
assertFalse(sslSettings.isEnabled());
59-
6041
this.settings = notNull("settings", settings);
61-
this.group = group;
6242
}
6343

6444
@Override
6545
public Stream create(final ServerAddress serverAddress) {
66-
return new AsynchronousSocketChannelStream(serverAddress, settings, bufferProvider, group);
46+
return new AsynchronousSocketChannelStream(serverAddress, settings, bufferProvider);
6747
}
6848

6949
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactory.java

+1-46
Original file line numberDiff line numberDiff line change
@@ -19,59 +19,14 @@
1919
import com.mongodb.connection.SocketSettings;
2020
import com.mongodb.connection.SslSettings;
2121

22-
import java.nio.channels.AsynchronousChannelGroup;
23-
2422
/**
2523
* A {@code StreamFactoryFactory} implementation for AsynchronousSocketChannel-based streams.
2624
*
2725
* @see java.nio.channels.AsynchronousSocketChannel
2826
*/
2927
public final class AsynchronousSocketChannelStreamFactoryFactory implements StreamFactoryFactory {
30-
private final AsynchronousChannelGroup group;
31-
32-
/**
33-
* Gets a builder for an instance of {@code AsynchronousSocketChannelStreamFactoryFactory}.
34-
* @return the builder
35-
* @since 3.6
36-
*/
37-
public static Builder builder() {
38-
return new Builder();
39-
}
40-
41-
/**
42-
* A builder for an instance of {@code AsynchronousSocketChannelStreamFactoryFactory}.
43-
*
44-
* @since 3.6
45-
*/
46-
public static final class Builder {
47-
private AsynchronousChannelGroup group;
48-
49-
/**
50-
* Sets the {@code AsynchronousChannelGroup}
51-
*
52-
* @param group the {@code AsynchronousChannelGroup}
53-
* @return this
54-
*/
55-
public Builder group(final AsynchronousChannelGroup group) {
56-
this.group = group;
57-
return this;
58-
}
59-
60-
/**
61-
* Build an instance of {@code AsynchronousSocketChannelStreamFactoryFactory}.
62-
* @return the AsynchronousSocketChannelStreamFactoryFactory
63-
*/
64-
public AsynchronousSocketChannelStreamFactoryFactory build() {
65-
return new AsynchronousSocketChannelStreamFactoryFactory(this);
66-
}
67-
}
68-
6928
@Override
7029
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
71-
return new AsynchronousSocketChannelStreamFactory(socketSettings, sslSettings, group);
72-
}
73-
74-
private AsynchronousSocketChannelStreamFactoryFactory(final Builder builder) {
75-
group = builder.group;
30+
return new AsynchronousSocketChannelStreamFactory(socketSettings, sslSettings);
7631
}
7732
}

driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java

+5-14
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,20 @@
1717
package com.mongodb.internal.connection;
1818

1919
import com.mongodb.MongoClientException;
20-
import com.mongodb.MongoClientSettings;
2120
import com.mongodb.connection.NettyTransportSettings;
2221
import com.mongodb.connection.TransportSettings;
2322
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
24-
import com.mongodb.lang.Nullable;
2523

2624
/**
2725
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2826
*/
2927
public final class StreamFactoryHelper {
30-
@Nullable
31-
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final MongoClientSettings settings) {
32-
StreamFactoryFactory streamFactoryFactory = null;
33-
TransportSettings transportSettings = settings.getTransportSettings();
34-
if (transportSettings != null) {
35-
if (transportSettings instanceof NettyTransportSettings) {
36-
streamFactoryFactory =
37-
NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings).build();
38-
} else {
39-
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
40-
}
28+
public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final TransportSettings transportSettings) {
29+
if (transportSettings instanceof NettyTransportSettings) {
30+
return NettyStreamFactoryFactory.builder().applySettings((NettyTransportSettings) transportSettings).build();
31+
} else {
32+
throw new MongoClientException("Unsupported transport settings: " + transportSettings.getClass().getName());
4133
}
42-
return streamFactoryFactory;
4334
}
4435

4536
private StreamFactoryHelper() {

driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.mongodb.internal.connection
22

3-
import util.spock.annotations.Slow
43
import com.mongodb.MongoSocketException
54
import com.mongodb.MongoSocketOpenException
65
import com.mongodb.ServerAddress
@@ -9,10 +8,9 @@ import com.mongodb.connection.SocketSettings
98
import com.mongodb.connection.SslSettings
109
import spock.lang.IgnoreIf
1110
import spock.lang.Specification
11+
import util.spock.annotations.Slow
1212

13-
import java.nio.channels.AsynchronousChannelGroup
1413
import java.util.concurrent.CountDownLatch
15-
import java.util.concurrent.Executors
1614

1715
import static com.mongodb.ClusterFixture.getSslSettings
1816
import static java.util.concurrent.TimeUnit.MILLISECONDS
@@ -26,8 +24,7 @@ class AsyncSocketChannelStreamSpecification extends Specification {
2624
def port = 27017
2725
def socketSettings = SocketSettings.builder().connectTimeout(100, MILLISECONDS).build()
2826
def sslSettings = SslSettings.builder().build()
29-
def channelGroup = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(5))
30-
def factoryFactory = AsynchronousSocketChannelStreamFactoryFactory.builder().group(channelGroup).build()
27+
def factoryFactory = new AsynchronousSocketChannelStreamFactoryFactory()
3128
def factory = factoryFactory.create(socketSettings, sslSettings)
3229
def inetAddresses = [new InetSocketAddress(InetAddress.getByName('192.168.255.255'), port),
3330
new InetSocketAddress(InetAddress.getByName('127.0.0.1'), port)]
@@ -51,9 +48,7 @@ class AsyncSocketChannelStreamSpecification extends Specification {
5148
def port = 27017
5249
def socketSettings = SocketSettings.builder().connectTimeout(100, MILLISECONDS).build()
5350
def sslSettings = SslSettings.builder().build()
54-
def factoryFactory = AsynchronousSocketChannelStreamFactoryFactory.builder()
55-
.group(AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(5)))
56-
.build()
51+
def factoryFactory = new AsynchronousSocketChannelStreamFactoryFactory()
5752

5853
def factory = factoryFactory.create(socketSettings, sslSettings)
5954

@@ -81,7 +76,7 @@ class AsyncSocketChannelStreamSpecification extends Specification {
8176

8277
def stream = new AsynchronousSocketChannelStream(serverAddress,
8378
SocketSettings.builder().connectTimeout(100, MILLISECONDS).build(),
84-
new PowerOfTwoBufferPool(), null)
79+
new PowerOfTwoBufferPool())
8580
def callback = new CallbackErrorHolder()
8681

8782
when:

driver-core/src/test/unit/com/mongodb/internal/connection/AsynchronousSocketChannelStreamFactoryFactorySpecification.groovy

+2-20
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,22 @@ import com.mongodb.connection.SslSettings
2222
import spock.lang.Specification
2323
import spock.lang.Unroll
2424

25-
import java.nio.channels.AsynchronousChannelGroup
26-
import java.util.concurrent.ExecutorService
27-
import java.util.concurrent.Executors
28-
2925
class AsynchronousSocketChannelStreamFactoryFactorySpecification extends Specification {
3026

3127
@Unroll
3228
def 'should create the expected #description AsynchronousSocketChannelStream'() {
3329
given:
34-
def factory = factoryFactory.create(socketSettings, sslSettings)
30+
def factory = new AsynchronousSocketChannelStreamFactoryFactory().create(socketSettings, sslSettings)
3531

3632
when:
37-
AsynchronousSocketChannelStream stream = factory.create(serverAddress)
33+
AsynchronousSocketChannelStream stream = factory.create(serverAddress) as AsynchronousSocketChannelStream
3834

3935
then:
4036
stream.getSettings() == socketSettings
4137
stream.getAddress() == serverAddress
42-
(stream.getGroup() == null) == hasCustomGroup
43-
44-
cleanup:
45-
stream.getGroup()?.shutdown()
46-
47-
where:
48-
description | factoryFactory | hasCustomGroup
49-
'default' | DEFAULT_FACTORY | true
50-
'custom' | CUSTOM_FACTORY | false
5138
}
5239

5340
SocketSettings socketSettings = SocketSettings.builder().build()
5441
SslSettings sslSettings = SslSettings.builder().build()
5542
ServerAddress serverAddress = new ServerAddress()
56-
ExecutorService service = Executors.newFixedThreadPool(1)
57-
static final DEFAULT_FACTORY = AsynchronousSocketChannelStreamFactoryFactory.builder().build()
58-
static final CUSTOM_FACTORY = AsynchronousSocketChannelStreamFactoryFactory.builder()
59-
.group(AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(5)))
60-
.build()
6143
}

driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java

+1-11
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,26 @@
1616

1717
package com.mongodb.internal.connection;
1818

19-
import com.mongodb.MongoClientSettings;
2019
import com.mongodb.connection.NettyTransportSettings;
2120
import com.mongodb.connection.TransportSettings;
2221
import com.mongodb.internal.connection.netty.NettyStreamFactoryFactory;
2322
import io.netty.buffer.PooledByteBufAllocator;
2423
import io.netty.channel.nio.NioEventLoopGroup;
2524
import org.junit.jupiter.api.Test;
2625

27-
import static com.mongodb.assertions.Assertions.assertNull;
2826
import static org.junit.jupiter.api.Assertions.assertEquals;
2927

3028
@SuppressWarnings("deprecation")
3129
class StreamFactoryHelperTest {
3230

33-
@Test
34-
void streamFactoryFactoryIsNullWithDefaultSettings() {
35-
MongoClientSettings settings = MongoClientSettings.builder().build();
36-
assertNull(StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings));
37-
}
3831
@Test
3932
void streamFactoryFactoryIsDerivedFromTransportSettings() {
4033
NettyTransportSettings nettyTransportSettings = TransportSettings.nettyBuilder()
4134
.eventLoopGroup(new NioEventLoopGroup())
4235
.allocator(PooledByteBufAllocator.DEFAULT)
4336
.socketChannelClass(io.netty.channel.socket.oio.OioSocketChannel.class)
4437
.build();
45-
MongoClientSettings settings = MongoClientSettings.builder()
46-
.transportSettings(nettyTransportSettings)
47-
.build();
4838
assertEquals(NettyStreamFactoryFactory.builder().applySettings(nettyTransportSettings).build(),
49-
StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings));
39+
StreamFactoryHelper.getStreamFactoryFactoryFromSettings(nettyTransportSettings));
5040
}
5141
}

0 commit comments

Comments
 (0)