Skip to content

Commit 6c758c4

Browse files
committed
Improve Http2Pool connection reuse for concurrent acquires
Signed-off-by: zimatars <[email protected]>
1 parent 4ea7612 commit 6c758c4

File tree

3 files changed

+89
-9
lines changed

3 files changed

+89
-9
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,11 @@ void drainLoop() {
406406
log.debug(format(slot.connection.channel(), "Channel activated"));
407407
}
408408
ACQUIRED.incrementAndGet(this);
409+
// Reserve concurrency and re-offer the slot before async deliver so concurrent acquires can reuse the connection
410+
slot.incrementConcurrencyAndGet();
411+
slot.deactivate();
409412
slot.connection.channel().eventLoop().execute(() -> {
410-
borrower.deliver(new Http2PooledRef(slot));
413+
borrower.deliver(new Http2PooledRef(slot), true);
411414
drain();
412415
});
413416
}
@@ -827,17 +830,31 @@ public String toString() {
827830
}
828831

829832
void deliver(Http2PooledRef poolSlot) {
833+
deliver(poolSlot, false);
834+
}
835+
836+
void deliver(Http2PooledRef poolSlot, boolean alreadyReserved) {
830837
assert poolSlot.slot.connection.channel().eventLoop().inEventLoop();
831838
poolSlot.slot.updateMaxConcurrentStreams();
832-
if (!poolSlot.slot.canOpenStream()) {
833-
poolSlot.slot.deactivate();
839+
840+
int effectiveConcurrency = poolSlot.slot.concurrency() - (alreadyReserved ? 1 : 0);
841+
if (!poolSlot.slot.canOpenStream(effectiveConcurrency)) {
842+
if (alreadyReserved) {
843+
// Concurrency was reserved in drainLoop, rollback reservation
844+
poolSlot.slot.decrementConcurrencyAndGet();
845+
}
846+
else {
847+
poolSlot.slot.deactivate();
848+
}
834849
pool.addPending(pool.pending, this, true);
835850
return;
836851
}
837852
stopPendingCountdown(true);
838-
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
839-
poolSlot.slot.incrementConcurrencyAndGet();
840-
poolSlot.slot.deactivate();
853+
if (!alreadyReserved) {
854+
// Increment concurrency BEFORE deactivate so that canOpenStream() is correct for other threads
855+
poolSlot.slot.incrementConcurrencyAndGet();
856+
poolSlot.slot.deactivate();
857+
}
841858
if (get()) {
842859
//CANCELLED or timeout reached
843860
poolSlot.invalidate().subscribe(aVoid -> {}, e -> Operators.onErrorDropped(e, Context.empty()));
@@ -1017,7 +1034,13 @@ private long computeMaxConcurrentStreams() {
10171034
}
10181035

10191036
boolean canOpenStream() {
1020-
int concurrency = this.concurrency;
1037+
return canOpenStream(this.concurrency);
1038+
}
1039+
1040+
boolean canOpenStream(int concurrency) {
1041+
if (concurrency < 0) {
1042+
return false;
1043+
}
10211044
long max = this.maxConcurrentStreams;
10221045
// For non-HTTP/2 connections (max == 0), allow opening a stream if concurrency is 0
10231046
// For HTTP/2 connections, check that we haven't reached max concurrent streams

reactor-netty-http/src/main/java/reactor/netty/http/client/Http3Pool.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024-2025 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2024-2026 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -88,6 +88,11 @@ boolean canOpenStream() {
8888
return true;
8989
}
9090

91+
@Override
92+
boolean canOpenStream(int concurrency) {
93+
return true;
94+
}
95+
9196
@Override
9297
boolean goAwayReceived() {
9398
ChannelHandlerContext connectionHandlerCtx = http3ClientConnectionHandlerCtx();

reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2021-2025 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2021-2026 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -144,6 +144,58 @@ void acquireRelease() {
144144
}
145145
}
146146

147+
@Test
148+
void concurrentAcquireReusesConnection() {
149+
AtomicInteger allocator = new AtomicInteger();
150+
ConcurrentLinkedQueue<EmbeddedChannel> channels = new ConcurrentLinkedQueue<>();
151+
152+
PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
153+
PoolBuilder.from(Mono.fromSupplier(() -> {
154+
allocator.incrementAndGet();
155+
EmbeddedChannel channel = new EmbeddedChannel(
156+
new TestChannelId(),
157+
Http2FrameCodecBuilder.forClient().build(),
158+
new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
159+
channels.add(channel);
160+
return Connection.from(channel);
161+
}))
162+
.idleResourceReuseLruOrder()
163+
.maxPendingAcquireUnbounded()
164+
.sizeBetween(0, 2);
165+
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null));
166+
167+
List<PooledRef<Connection>> acquired = new ArrayList<>();
168+
try {
169+
// Warm-up: allocate a connection and cache it.
170+
PooledRef<Connection> warm = http2Pool.acquire().block(Duration.ofSeconds(1));
171+
assertThat(warm).isNotNull();
172+
warm.release().block(Duration.ofSeconds(1));
173+
174+
// Don't run pending tasks yet, simulate concurrent acquires before deliver is scheduled.
175+
http2Pool.acquire().subscribe(acquired::add);
176+
http2Pool.acquire().subscribe(acquired::add);
177+
178+
assertThat(allocator).as("allocator count").hasValue(1);
179+
180+
channels.forEach(EmbeddedChannel::runPendingTasks);
181+
182+
assertThat(acquired).hasSize(2);
183+
assertThat(acquired.get(0).poolable())
184+
.as("both acquires should share the same H2 connection")
185+
.isSameAs(acquired.get(1).poolable());
186+
187+
for (PooledRef<Connection> ref : acquired) {
188+
ref.release().block(Duration.ofSeconds(1));
189+
}
190+
}
191+
finally {
192+
for (EmbeddedChannel channel : channels) {
193+
channel.finishAndReleaseAll();
194+
Connection.from(channel).dispose();
195+
}
196+
}
197+
}
198+
147199
@Test
148200
void doAcquireNotCalledIfBorrowerInScopeCancelledEarly() {
149201
AtomicInteger allocator = new AtomicInteger();

0 commit comments

Comments
 (0)