Skip to content

Commit 5f88bc4

Browse files
njhillcarl-mastrangelo
authored andcommitted
stub: optimize ThreadlessExecutor used for blocking calls
The `ThreadlessExecutor` currently used for blocking calls uses `LinkedBlockingQueue` which is relatively heavy both in terms of allocations and synchronization overhead (e.g. when compared to `ConcurrentLinkedQueue`). It accounts for ~10% of allocations and ~5% of allocated bytes per-call in the `TransportBenchmark` when using in-process transport with [stats and tracing disabled](#5510). Changing to use a `ConcurrentLinkedQueue` results in a ~5% speedup of that benchmark. Before: ``` Benchmark (direct) (transport) Mode Cnt Score Error Units TransportBenchmark.unaryCall1024 true INPROCESS avgt 60 1877.339 ± 46.309 ns/op TransportBenchmark.unaryCall1024 false INPROCESS avgt 60 12680.525 ± 208.684 ns/op ``` After: ``` Benchmark (direct) (transport) Mode Cnt Score Error Units TransportBenchmark.unaryCall1024 true INPROCESS avgt 60 1779.188 ± 36.769 ns/op TransportBenchmark.unaryCall1024 false INPROCESS avgt 60 12532.470 ± 238.271 ns/op ```
1 parent ecccdbb commit 5f88bc4

File tree

1 file changed

+30
-7
lines changed

1 file changed

+30
-7
lines changed

stub/src/main/java/io/grpc/stub/ClientCalls.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@
3434
import java.util.NoSuchElementException;
3535
import java.util.concurrent.ArrayBlockingQueue;
3636
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.ConcurrentLinkedQueue;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.Executor;
3940
import java.util.concurrent.Future;
40-
import java.util.concurrent.LinkedBlockingQueue;
41+
import java.util.concurrent.locks.LockSupport;
4142
import java.util.logging.Level;
4243
import java.util.logging.Logger;
4344
import javax.annotation.Nullable;
@@ -627,32 +628,54 @@ public void onClose(Status status, Metadata trailers) {
627628
}
628629
}
629630

630-
private static final class ThreadlessExecutor implements Executor {
631+
@SuppressWarnings("serial")
632+
private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
633+
implements Executor {
631634
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
632635

633-
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
636+
private volatile Thread waiter;
634637

635638
// Non private to avoid synthetic class
636639
ThreadlessExecutor() {}
637640

638641
/**
639642
* Waits until there is a Runnable, then executes it and all queued Runnables after it.
643+
* Must only be called by one thread at a time.
640644
*/
641645
public void waitAndDrain() throws InterruptedException {
642-
Runnable runnable = queue.take();
643-
while (runnable != null) {
646+
final Thread currentThread = Thread.currentThread();
647+
throwIfInterrupted(currentThread);
648+
Runnable runnable = poll();
649+
if (runnable == null) {
650+
waiter = currentThread;
651+
try {
652+
while ((runnable = poll()) == null) {
653+
LockSupport.park(this);
654+
throwIfInterrupted(currentThread);
655+
}
656+
} finally {
657+
waiter = null;
658+
}
659+
}
660+
do {
644661
try {
645662
runnable.run();
646663
} catch (Throwable t) {
647664
log.log(Level.WARNING, "Runnable threw exception", t);
648665
}
649-
runnable = queue.poll();
666+
} while ((runnable = poll()) != null);
667+
}
668+
669+
private static void throwIfInterrupted(Thread currentThread) throws InterruptedException {
670+
if (currentThread.isInterrupted()) {
671+
throw new InterruptedException();
650672
}
651673
}
652674

653675
@Override
654676
public void execute(Runnable runnable) {
655-
queue.add(runnable);
677+
add(runnable);
678+
LockSupport.unpark(waiter); // no-op if null
656679
}
657680
}
658681
}

0 commit comments

Comments
 (0)