Skip to content

Commit a62e924

Browse files
committed
Fix race condition in pending acquire timer management (#4073)
There was a synchronisation issue between pendingOffer and drainLoop where a timer could be set after stopPendingCountdown was called, resulting in a timer that would never be stopped and would incorrectly fire even though the borrower had already been served. Signed-off-by: Violeta Georgieva <696661+violetagg@users.noreply.github.com>
1 parent dcd3eb0 commit a62e924

File tree

1 file changed

+22
-4
lines changed
  • reactor-netty-http/src/main/java/reactor/netty/http/client

1 file changed

+22
-4
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -656,8 +656,11 @@ void pendingOffer(Borrower borrower) {
656656
int permits = poolConfig.allocationStrategy().estimatePermitCount();
657657
if (permits + estimateStreamsCount < postOffer) {
658658
borrower.pendingAcquireStart = clock.millis();
659-
if (!borrower.acquireTimeout.isZero()) {
660-
borrower.timeoutTask = poolConfig.pendingAcquireTimer().apply(borrower, borrower.acquireTimeout);
659+
if (!borrower.acquireTimeout.isZero() && borrower.timeoutTask == Borrower.TIMEOUT_DISPOSED) {
660+
Disposable task = poolConfig.pendingAcquireTimer().apply(borrower, borrower.acquireTimeout);
661+
if (!borrower.setTimeoutTask(task)) {
662+
task.dispose();
663+
}
661664
}
662665
}
663666

@@ -763,14 +766,18 @@ void scheduleEviction() {
763766
static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable {
764767

765768
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();
769+
static final Disposable TIMEOUT_STOPPED = Disposables.disposed();
766770

767771
final Duration acquireTimeout;
768772
final CoreSubscriber<? super Http2PooledRef> actual;
769773
final Http2Pool pool;
770774

771775
long pendingAcquireStart;
772776

773-
Disposable timeoutTask;
777+
volatile Disposable timeoutTask;
778+
@SuppressWarnings("rawtypes")
779+
static final AtomicReferenceFieldUpdater<Borrower, Disposable> TIMEOUT_TASK =
780+
AtomicReferenceFieldUpdater.newUpdater(Borrower.class, Disposable.class, "timeoutTask");
774781

775782
Borrower(CoreSubscriber<? super Http2PooledRef> actual, Http2Pool pool, Duration acquireTimeout) {
776783
this.acquireTimeout = acquireTimeout;
@@ -851,6 +858,16 @@ void fail(Throwable error) {
851858
}
852859
}
853860

861+
/**
862+
* Atomically set the timeout task if not already stopped.
863+
*
864+
* @return true if the task was set, false if countdown was already stopped
865+
* @see #stopPendingCountdown(boolean)
866+
*/
867+
boolean setTimeoutTask(Disposable task) {
868+
return TIMEOUT_TASK.compareAndSet(this, TIMEOUT_DISPOSED, task);
869+
}
870+
854871
void stopPendingCountdown(boolean success) {
855872
if (pendingAcquireStart > 0) {
856873
if (success) {
@@ -862,7 +879,8 @@ void stopPendingCountdown(boolean success) {
862879

863880
pendingAcquireStart = 0;
864881
}
865-
timeoutTask.dispose();
882+
Disposable task = TIMEOUT_TASK.getAndSet(this, TIMEOUT_STOPPED);
883+
task.dispose();
866884
}
867885
}
868886

0 commit comments

Comments
 (0)