Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,8 @@ void drainLoop() {
// when cached connections are below minimum connections, then allocate a new connection
boolean belowMinConnections = minConnections > 0 &&
poolConfig.allocationStrategy().permitGranted() < minConnections;
Slot slot = belowMinConnections ? null : findConnection(resources);
int resourcesCount = idleSize;
Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount);
if (slot != null) {
Borrower borrower = pollPending(borrowers, true);
if (borrower == null || borrower.get()) {
Expand All @@ -412,10 +413,12 @@ void drainLoop() {
});
}
else {
int resourcesCount = idleSize;
if (minConnections > 0 &&
poolConfig.allocationStrategy().permitGranted() >= minConnections &&
resourcesCount == 0) {
poolConfig.allocationStrategy().permitGranted() > resourcesCount) {
// connections allocations were triggered
}
else if (minConnections == 0 && poolConfig.allocationStrategy().permitGranted() > resourcesCount) {
// connections allocations were triggered
}
else {
Expand Down Expand Up @@ -539,8 +542,7 @@ void evictInBackground() {
scheduleEviction();
}

@Nullable Slot findConnection(ConcurrentLinkedQueue<Slot> resources) {
int resourcesCount = idleSize;
@Nullable Slot findConnection(ConcurrentLinkedQueue<Slot> resources, int resourcesCount) {
while (resourcesCount > 0) {
// There are connections in the queue

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2025 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2026 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -144,6 +144,58 @@ void acquireRelease() {
}
}

@Test
void concurrentAcquireReusesConnection() {
AtomicInteger allocator = new AtomicInteger();
ConcurrentLinkedQueue<EmbeddedChannel> channels = new ConcurrentLinkedQueue<>();

PoolBuilder<Connection, PoolConfig<Connection>> poolBuilder =
PoolBuilder.from(Mono.fromSupplier(() -> {
allocator.incrementAndGet();
EmbeddedChannel channel = new EmbeddedChannel(
new TestChannelId(),
Http2FrameCodecBuilder.forClient().build(),
new Http2MultiplexHandler(new ChannelHandlerAdapter() {}));
channels.add(channel);
return Connection.from(channel);
}))
.idleResourceReuseLruOrder()
.maxPendingAcquireUnbounded()
.sizeBetween(0, 2);
Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null));

List<PooledRef<Connection>> acquired = new ArrayList<>();
try {
// Warm-up: allocate a connection and cache it.
PooledRef<Connection> warm = http2Pool.acquire().block(Duration.ofSeconds(1));
assertThat(warm).isNotNull();
warm.release().block(Duration.ofSeconds(1));

// Don't run pending tasks yet, simulate concurrent acquires before deliver is scheduled.
http2Pool.acquire().subscribe(acquired::add);
http2Pool.acquire().subscribe(acquired::add);

assertThat(allocator).as("allocator count").hasValue(1);

channels.forEach(EmbeddedChannel::runPendingTasks);

assertThat(acquired).hasSize(2);
assertThat(acquired.get(0).poolable())
.as("both acquires should share the same H2 connection")
.isSameAs(acquired.get(1).poolable());

for (PooledRef<Connection> ref : acquired) {
ref.release().block(Duration.ofSeconds(1));
}
}
finally {
for (EmbeddedChannel channel : channels) {
channel.finishAndReleaseAll();
Connection.from(channel).dispose();
}
}
}

@Test
void doAcquireNotCalledIfBorrowerInScopeCancelledEarly() {
AtomicInteger allocator = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -42,6 +42,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.awaitility.Awaitility.await;
import static reactor.netty.http.HttpProtocol.H2;

class Http2PooledConnectionProviderCustomMetricsTest extends BaseHttpTest {
Expand Down Expand Up @@ -78,7 +79,7 @@ void measureActiveStreamsSize() throws InterruptedException {
.maxConnections(10)
.build();

CountDownLatch latch = new CountDownLatch(5);
CountDownLatch latch = new CountDownLatch(1);
HttpClient httpClient =
createClient(pool, disposableServer::address)
.protocol(H2)
Expand All @@ -94,10 +95,14 @@ void measureActiveStreamsSize() throws InterruptedException {
.subscribe());

assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
assertThat(isRegistered.get()).isTrue();
HttpConnectionPoolMetrics actual = metrics.get();
assertThat(actual).isNotNull();
assertThat(actual.activeStreamSize()).isEqualTo(5);

await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
assertThat(isRegistered.get()).isTrue();
HttpConnectionPoolMetrics actual = metrics.get();
assertThat(actual).isNotNull();
assertThat(actual.activeStreamSize()).isEqualTo(5);
});
}
finally {
pool.disposeLater().block(Duration.ofSeconds(5));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2026 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1854,7 +1854,9 @@ void testGracefulShutdownH2(HttpProtocol[] serverProtocols, HttpProtocol[] clien
private void doTestGracefulShutdown(HttpServer server, HttpClient client) throws Exception {
CountDownLatch latch1 = new CountDownLatch(2);
CountDownLatch latch2 = new CountDownLatch(2);
CountDownLatch latchGoAway = new CountDownLatch(2);
// HTTP/2 GOAWAY is a connection-level frame. With multiplexing enabled, both concurrent requests
// can share the same connection, so we should only expect at least one GOAWAY to be observed.
CountDownLatch latchGoAway = new CountDownLatch(1);
CountDownLatch latch3 = new CountDownLatch(1);
LoopResources loop = LoopResources.create("testGracefulShutdown");
group = new DefaultChannelGroup(executor);
Expand Down Expand Up @@ -1906,7 +1908,7 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg)
// Stop accepting incoming requests, wait at most 3s for the active requests to finish
disposableServer.disposeNow();

assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("2 GOAWAY should have been received").isTrue();
assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("GOAWAY should have been received").isTrue();
assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue();

// Dispose the event loop
Expand Down