Skip to content

Commit b64a1b6

Browse files
authored
Fix race condition in pending acquire timer management (#4073)
There was a synchronisation issue between pendingOffer and drainLoop where a timer could be set after stopPendingCountdown was called, resulting in a timer that would never be stopped and would incorrectly fire even though the borrower had already been served. Signed-off-by: Violeta Georgieva <[email protected]>
1 parent 45f68ba commit b64a1b6

File tree

1 file changed

+22
-4
lines changed
  • reactor-netty-http/src/main/java/reactor/netty/http/client

1 file changed

+22
-4
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/client/Http2Pool.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -655,8 +655,11 @@ void pendingOffer(Borrower borrower) {
655655
int permits = poolConfig.allocationStrategy().estimatePermitCount();
656656
if (permits + estimateStreamsCount < postOffer) {
657657
borrower.pendingAcquireStart = clock.millis();
658-
if (!borrower.acquireTimeout.isZero()) {
659-
borrower.timeoutTask = poolConfig.pendingAcquireTimer().apply(borrower, borrower.acquireTimeout);
658+
if (!borrower.acquireTimeout.isZero() && borrower.timeoutTask == Borrower.TIMEOUT_DISPOSED) {
659+
Disposable task = poolConfig.pendingAcquireTimer().apply(borrower, borrower.acquireTimeout);
660+
if (!borrower.setTimeoutTask(task)) {
661+
task.dispose();
662+
}
660663
}
661664
}
662665

@@ -760,14 +763,18 @@ void scheduleEviction() {
760763
static final class Borrower extends AtomicBoolean implements Scannable, Subscription, Runnable {
761764

762765
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();
766+
static final Disposable TIMEOUT_STOPPED = Disposables.disposed();
763767

764768
final Duration acquireTimeout;
765769
final CoreSubscriber<? super Http2PooledRef> actual;
766770
final Http2Pool pool;
767771

768772
long pendingAcquireStart;
769773

770-
Disposable timeoutTask;
774+
volatile Disposable timeoutTask;
775+
@SuppressWarnings("rawtypes")
776+
static final AtomicReferenceFieldUpdater<Borrower, Disposable> TIMEOUT_TASK =
777+
AtomicReferenceFieldUpdater.newUpdater(Borrower.class, Disposable.class, "timeoutTask");
771778

772779
Borrower(CoreSubscriber<? super Http2PooledRef> actual, Http2Pool pool, Duration acquireTimeout) {
773780
this.acquireTimeout = acquireTimeout;
@@ -855,6 +862,16 @@ void fail(Throwable error) {
855862
}
856863
}
857864

865+
/**
866+
* Atomically set the timeout task if not already stopped.
867+
*
868+
* @return true if the task was set, false if countdown was already stopped
869+
* @see #stopPendingCountdown(boolean)
870+
*/
871+
boolean setTimeoutTask(Disposable task) {
872+
return TIMEOUT_TASK.compareAndSet(this, TIMEOUT_DISPOSED, task);
873+
}
874+
858875
void stopPendingCountdown(boolean success) {
859876
if (pendingAcquireStart > 0) {
860877
if (success) {
@@ -866,7 +883,8 @@ void stopPendingCountdown(boolean success) {
866883

867884
pendingAcquireStart = 0;
868885
}
869-
timeoutTask.dispose();
886+
Disposable task = TIMEOUT_TASK.getAndSet(this, TIMEOUT_STOPPED);
887+
task.dispose();
870888
}
871889
}
872890

0 commit comments

Comments
 (0)