Skip to content

Commit c4149a2

Browse files
committed
stub: optimize ThreadlessExecutor used for blocking calls
Replace LinkedBlockingQueue with ConcurrentLinkedQueue and explicit blocking.
1 parent 026e4c5 commit c4149a2

File tree

1 file changed

+24
-8
lines changed

1 file changed

+24
-8
lines changed

stub/src/main/java/io/grpc/stub/ClientCalls.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@
3434
import java.util.NoSuchElementException;
3535
import java.util.concurrent.ArrayBlockingQueue;
3636
import java.util.concurrent.BlockingQueue;
37+
import java.util.concurrent.ConcurrentLinkedQueue;
3738
import java.util.concurrent.ExecutionException;
3839
import java.util.concurrent.Executor;
3940
import java.util.concurrent.Future;
40-
import java.util.concurrent.LinkedBlockingQueue;
41+
import java.util.concurrent.locks.LockSupport;
4142
import java.util.logging.Level;
4243
import java.util.logging.Logger;
4344
import javax.annotation.Nullable;
@@ -627,10 +628,12 @@ public void onClose(Status status, Metadata trailers) {
627628
}
628629
}
629630

630-
private static final class ThreadlessExecutor implements Executor {
631+
@SuppressWarnings("serial")
632+
private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runnable>
633+
implements Executor {
631634
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());
632635

633-
private final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
636+
private volatile Thread waiter;
634637

635638
// Non private to avoid synthetic class
636639
ThreadlessExecutor() {}
@@ -639,20 +642,33 @@ private static final class ThreadlessExecutor implements Executor {
639642
* Waits until there is a Runnable, then executes it and all queued Runnables after it.
640643
*/
641644
public void waitAndDrain() throws InterruptedException {
642-
Runnable runnable = queue.take();
643-
while (runnable != null) {
645+
Runnable runnable = poll();
646+
if (runnable == null) {
647+
waiter = Thread.currentThread();
648+
try {
649+
while ((runnable = poll()) == null) {
650+
LockSupport.park(this);
651+
}
652+
} finally {
653+
waiter = null;
654+
}
655+
}
656+
do {
644657
try {
645658
runnable.run();
646659
} catch (Throwable t) {
647660
log.log(Level.WARNING, "Runnable threw exception", t);
648661
}
649-
runnable = queue.poll();
650-
}
662+
} while ((runnable = poll()) != null);
651663
}
652664

653665
@Override
654666
public void execute(Runnable runnable) {
655-
queue.add(runnable);
667+
add(runnable);
668+
final Thread waitingThread = waiter;
669+
if (waitingThread != null) {
670+
LockSupport.unpark(waitingThread);
671+
}
656672
}
657673
}
658674
}

0 commit comments

Comments
 (0)