diff --git a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java index 29877d480..522cdda2f 100644 --- a/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java +++ b/reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java @@ -391,7 +391,8 @@ void drainLoop() { // when cached connections are below minimum connections, then allocate a new connection boolean belowMinConnections = minConnections > 0 && poolConfig.allocationStrategy().permitGranted() < minConnections; - Slot slot = belowMinConnections ? null : findConnection(resources); + int resourcesCount = idleSize; + Slot slot = belowMinConnections ? null : findConnection(resources, resourcesCount); if (slot != null) { Borrower borrower = pollPending(borrowers, true); if (borrower == null || borrower.get()) { @@ -412,10 +413,12 @@ void drainLoop() { }); } else { - int resourcesCount = idleSize; if (minConnections > 0 && poolConfig.allocationStrategy().permitGranted() >= minConnections && - resourcesCount == 0) { + poolConfig.allocationStrategy().permitGranted() > resourcesCount) { + // connections allocations were triggered + } + else if (minConnections == 0 && poolConfig.allocationStrategy().permitGranted() > resourcesCount) { // connections allocations were triggered } else { @@ -539,8 +542,7 @@ void evictInBackground() { scheduleEviction(); } - @Nullable Slot findConnection(ConcurrentLinkedQueue resources) { - int resourcesCount = idleSize; + @Nullable Slot findConnection(ConcurrentLinkedQueue resources, int resourcesCount) { while (resourcesCount > 0) { // There are connections in the queue diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java index c04b5cc55..d2b06a68f 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2026 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -144,6 +144,58 @@ void acquireRelease() { } } + @Test + void concurrentAcquireReusesConnection() { + AtomicInteger allocator = new AtomicInteger(); + ConcurrentLinkedQueue channels = new ConcurrentLinkedQueue<>(); + + PoolBuilder> poolBuilder = + PoolBuilder.from(Mono.fromSupplier(() -> { + allocator.incrementAndGet(); + EmbeddedChannel channel = new EmbeddedChannel( + new TestChannelId(), + Http2FrameCodecBuilder.forClient().build(), + new Http2MultiplexHandler(new ChannelHandlerAdapter() {})); + channels.add(channel); + return Connection.from(channel); + })) + .idleResourceReuseLruOrder() + .maxPendingAcquireUnbounded() + .sizeBetween(0, 2); + Http2Pool http2Pool = poolBuilder.build(config -> new Http2Pool(config, null)); + + List> acquired = new ArrayList<>(); + try { + // Warm-up: allocate a connection and cache it. + PooledRef warm = http2Pool.acquire().block(Duration.ofSeconds(1)); + assertThat(warm).isNotNull(); + warm.release().block(Duration.ofSeconds(1)); + + // Don't run pending tasks yet, simulate concurrent acquires before deliver is scheduled. + http2Pool.acquire().subscribe(acquired::add); + http2Pool.acquire().subscribe(acquired::add); + + assertThat(allocator).as("allocator count").hasValue(1); + + channels.forEach(EmbeddedChannel::runPendingTasks); + + assertThat(acquired).hasSize(2); + assertThat(acquired.get(0).poolable()) + .as("both acquires should share the same H2 connection") + .isSameAs(acquired.get(1).poolable()); + + for (PooledRef ref : acquired) { + ref.release().block(Duration.ofSeconds(1)); + } + } + finally { + for (EmbeddedChannel channel : channels) { + channel.finishAndReleaseAll(); + Connection.from(channel).dispose(); + } + } + } + @Test void doAcquireNotCalledIfBorrowerInScopeCancelledEarly() { AtomicInteger allocator = new AtomicInteger(); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PooledConnectionProviderCustomMetricsTest.java b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PooledConnectionProviderCustomMetricsTest.java index 6cd005868..e47e9f24c 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PooledConnectionProviderCustomMetricsTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/client/Http2PooledConnectionProviderCustomMetricsTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2026 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; import static reactor.netty.http.HttpProtocol.H2; class Http2PooledConnectionProviderCustomMetricsTest extends BaseHttpTest { @@ -78,7 +79,7 @@ void measureActiveStreamsSize() throws InterruptedException { .maxConnections(10) .build(); - CountDownLatch latch = new CountDownLatch(5); + CountDownLatch latch = new CountDownLatch(1); HttpClient httpClient = createClient(pool, disposableServer::address) .protocol(H2) @@ -94,10 +95,14 @@ void measureActiveStreamsSize() throws InterruptedException { .subscribe()); assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); - assertThat(isRegistered.get()).isTrue(); - HttpConnectionPoolMetrics actual = metrics.get(); - assertThat(actual).isNotNull(); - assertThat(actual.activeStreamSize()).isEqualTo(5); + + await().atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertThat(isRegistered.get()).isTrue(); + HttpConnectionPoolMetrics actual = metrics.get(); + assertThat(actual).isNotNull(); + assertThat(actual.activeStreamSize()).isEqualTo(5); + }); } finally { pool.disposeLater().block(Duration.ofSeconds(5)); diff --git a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java index 651dfd2b3..c51652dc8 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java +++ b/reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2011-2025 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2011-2026 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1854,7 +1854,9 @@ void testGracefulShutdownH2(HttpProtocol[] serverProtocols, HttpProtocol[] clien private void doTestGracefulShutdown(HttpServer server, HttpClient client) throws Exception { CountDownLatch latch1 = new CountDownLatch(2); CountDownLatch latch2 = new CountDownLatch(2); - CountDownLatch latchGoAway = new CountDownLatch(2); + // HTTP/2 GOAWAY is a connection-level frame. With multiplexing enabled, both concurrent requests + // can share the same connection, so we should only expect at least one GOAWAY to be observed. + CountDownLatch latchGoAway = new CountDownLatch(1); CountDownLatch latch3 = new CountDownLatch(1); LoopResources loop = LoopResources.create("testGracefulShutdown"); group = new DefaultChannelGroup(executor); @@ -1906,7 +1908,7 @@ public void channelRead(@NotNull ChannelHandlerContext ctx, @NotNull Object msg) // Stop accepting incoming requests, wait at most 3s for the active requests to finish disposableServer.disposeNow(); - assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("2 GOAWAY should have been received").isTrue(); + assertThat(latchGoAway.await(30, TimeUnit.SECONDS)).as("GOAWAY should have been received").isTrue(); assertThat(latch2.await(30, TimeUnit.SECONDS)).isTrue(); // Dispose the event loop