-
Notifications
You must be signed in to change notification settings - Fork 691
Description
Description
When using HTTP/2, Http2Pool should preferentially multiplex concurrent requests on the same connection (until max concurrent streams is reached). However, under concurrent acquires, the pool can allocate a new connection even though an existing HTTP/2 connection still has available stream capacity.
This seems to be caused by a small timing window in reactor.netty.http.client.Http2Pool#drainLoop: the selected slot is removed from the internal queue, but it is only re-offered back to the pool asynchronously on the Channel's EventLoop. A second concurrent acquire that runs before the EventLoop task executes may not see the reusable connection and may trigger a new allocation.
Relevant code
reactor.netty.http.client.Http2Pool#drainLoop schedules delivery on the EventLoop:
slot.connection.channel().eventLoop().execute(() -> {
borrower.deliver(new Http2PooledRef(slot));
drain();
});Borrower#deliver is where concurrency is incremented and the slot is re-offered back to the pool. Because this happens asynchronously, there is a window where the slot is not visible to subsequent borrowers.
Expected behavior
With HTTP/2 multiplexing enabled and the connection still below max concurrent streams:
- concurrent acquires should reuse the same HTTP/2 connection (open multiple streams),
- and should not allocate additional connections due to scheduling/timing.
Actual behavior
Under concurrent acquires, the pool may allocate a new connection even though an existing HTTP/2 connection can still open a new stream.
Steps to Reproduce
This can be reproduced with a setup that:
- Warm-ups the pool so one HTTP/2 connection is created and cached.
- Triggers two acquires back-to-back / concurrently, while delaying the EventLoop execution of the deliver task.
- Observes allocator invocation count (or allocated connections) > 1.
Pseudo steps:
- Create an HTTP/2 channel (e.g.
EmbeddedChannelwithHttp2FrameCodec+Http2MultiplexHandler) - Build
Http2Poolwith max connections > 1 - Acquire once and release to keep the connection cached
- Call
acquire().subscribe(...)twice without running the channel pending tasks - Check allocator is invoked only once (expected) but currently can be invoked twice (actual)
- Then run pending tasks to complete the delivers
Reproduction Code
The following JUnit 5 test demonstrates the issue. On affected versions it fails with something like expected: 1 but was: 2 (second allocation happens even though the cached H2 connection can still open a new stream).
package reactor.netty.http.client;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.internal.shaded.reactor.pool.PoolBuilder;
import reactor.netty.internal.shaded.reactor.pool.PoolConfig;
import reactor.netty.internal.shaded.reactor.pool.PooledRef;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
class Http2PoolReproducerTest {
@Test
void concurrentAcquireAllocatesExtraConnection() {
AtomicInteger allocator = new AtomicInteger();
ConcurrentLinkedQueue<EmbeddedChannel> channels = new ConcurrentLinkedQueue<>();
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
allocator.incrementAndGet();
EmbeddedChannel channel = new EmbeddedChannel(
Http2FrameCodecBuilder.forClient().build(),
new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
channels.add(channel);
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
Http2Pool pool = poolBuilder.build(config -> new Http2Pool(config, null));
List<PooledRef<Connection>> acquired = new ArrayList<>();
try {
// Warm-up: allocate a connection and cache it.
PooledRef<Connection> warm = pool.acquire().block(Duration.ofSeconds(1));
assertThat(warm).isNotNull();
warm.release().block(Duration.ofSeconds(1));
// Don't run pending tasks yet, simulate concurrent acquires before deliver is scheduled.
pool.acquire().subscribe(acquired::add);
pool.acquire().subscribe(acquired::add);
// Expected: still 1 (reuse cached H2 connection). Actual (before fix): becomes 2.
assertThat(allocator).as("allocator count").hasValue(1);
channels.forEach(EmbeddedChannel::runPendingTasks);
assertThat(acquired).hasSize(2);
assertThat(acquired.get(0).poolable())
.as("both acquires should share the same H2 connection")
.isSameAs(acquired.get(1).poolable());
}
finally {
for (PooledRef<Connection> ref : acquired) {
ref.release().block(Duration.ofSeconds(1));
}
for (EmbeddedChannel channel : channels) {
channel.finishAndReleaseAll();
Connection.from(channel).dispose();
}
}
}
}Proposed direction
Consider eliminating the visibility window by reserving stream capacity and re-offering the slot back to the pool before scheduling the async deliver, so concurrent borrowers can still see and reuse the connection.
Environment
- Reactor Netty version: 1.3.1
- JVM version (java -version): 21
Related PR
I’ve opened PR #4051 with a focused improvement and a regression test.