Skip to content

Commit bb7709e

Browse files
authored
When an explicit EventLoopGroup is configured ensure only one connection pool is created (#3321)
Fixes #3316
1 parent f520606 commit bb7709e

File tree

2 files changed

+71
-4
lines changed

2 files changed

+71
-4
lines changed

reactor-netty-core/src/main/java/reactor/netty/transport/Transport.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2020-2021 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -231,8 +231,7 @@ public <O> T option(ChannelOption<O> key, @Nullable O value) {
231231
* @return a new {@link Transport} reference
232232
*/
233233
public T runOn(EventLoopGroup eventLoopGroup) {
234-
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
235-
return runOn(preferNative -> eventLoopGroup);
234+
return runOn(new EventLoopGroupLoopResources(eventLoopGroup));
236235
}
237236

238237
/**
@@ -389,4 +388,38 @@ public final T wiretap(String category, LogLevel level, AdvancedByteBufFormat fo
389388
protected abstract T duplicate();
390389

391390
static final Logger log = Loggers.getLogger(Transport.class);
391+
392+
static final class EventLoopGroupLoopResources implements LoopResources {
393+
394+
final EventLoopGroup eventLoopGroup;
395+
396+
EventLoopGroupLoopResources(EventLoopGroup eventLoopGroup) {
397+
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
398+
this.eventLoopGroup = eventLoopGroup;
399+
}
400+
401+
@Override
402+
public EventLoopGroup onServer(boolean useNative) {
403+
return eventLoopGroup;
404+
}
405+
406+
@Override
407+
public boolean equals(Object o) {
408+
if (this == o) {
409+
return true;
410+
}
411+
if (o == null || getClass() != o.getClass()) {
412+
return false;
413+
}
414+
EventLoopGroupLoopResources that = (EventLoopGroupLoopResources) o;
415+
return Objects.equals(eventLoopGroup, that.eventLoopGroup);
416+
}
417+
418+
@Override
419+
public int hashCode() {
420+
int result = 1;
421+
result = 31 * result + Objects.hashCode(eventLoopGroup);
422+
return result;
423+
}
424+
}
392425
}

reactor-netty-core/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.concurrent.ConcurrentMap;
2727
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.Executors;
2930
import java.util.concurrent.Future;
3031
import java.util.concurrent.ScheduledExecutorService;
@@ -51,6 +52,7 @@
5152
import reactor.core.publisher.Signal;
5253
import reactor.netty.Connection;
5354
import reactor.netty.ConnectionObserver;
55+
import reactor.netty.DisposableChannel;
5456
import reactor.netty.DisposableServer;
5557
import reactor.netty.SocketUtils;
5658
import reactor.netty.channel.ChannelMetricsRecorder;
@@ -527,6 +529,38 @@ private void doTestIssue1790(boolean fifoPool) {
527529
}
528530
}
529531

532+
@Test
533+
void testIssue3316() throws ExecutionException, InterruptedException {
534+
DisposableServer disposableServer =
535+
TcpServer.create()
536+
.port(0)
537+
.handle((in, out) -> out.send(in.receive().retain()))
538+
.bindNow();
539+
540+
DefaultPooledConnectionProvider provider =
541+
(DefaultPooledConnectionProvider) ConnectionProvider.create("testIssue3316", 400);
542+
EventLoopGroup group = new NioEventLoopGroup();
543+
try {
544+
Flux.range(0, 400)
545+
.flatMap(i ->
546+
TcpClient.create(provider)
547+
.port(disposableServer.port())
548+
.runOn(group)
549+
.connect())
550+
.doOnNext(DisposableChannel::dispose)
551+
.blockLast(Duration.ofSeconds(5));
552+
553+
assertThat(provider.channelPools.size()).isEqualTo(1);
554+
}
555+
finally {
556+
disposableServer.disposeNow();
557+
provider.disposeLater()
558+
.block(Duration.ofSeconds(5));
559+
group.shutdownGracefully()
560+
.get();
561+
}
562+
}
563+
530564
static final class PoolImpl extends AtomicInteger implements InstrumentedPool<PooledConnection> {
531565

532566
@Override

0 commit comments

Comments
 (0)