From da149c728fb65d39e9c6650188a785ad0d89b65e Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 17 Nov 2017 23:34:06 -0800 Subject: [PATCH] core: make SerializingExecutor SPSC --- .../io/grpc/internal/SerializingExecutor.java | 118 ++++++++---------- 1 file changed, 49 insertions(+), 69 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/SerializingExecutor.java b/core/src/main/java/io/grpc/internal/SerializingExecutor.java index e5957452cf9..98b3ddc9b50 100644 --- a/core/src/main/java/io/grpc/internal/SerializingExecutor.java +++ b/core/src/main/java/io/grpc/internal/SerializingExecutor.java @@ -16,16 +16,10 @@ package io.grpc.internal; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.base.Preconditions; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** * Executor ensuring that all {@link Runnable} tasks submitted are executed in order @@ -34,88 +28,74 @@ */ // TODO(madongfly): figure out a way to not expose it or move it to transport package. public final class SerializingExecutor implements Executor, Runnable { - private static final Logger log = - Logger.getLogger(SerializingExecutor.class.getName()); - - private static final AtomicIntegerFieldUpdater runStateUpdater = - AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState"); - private static final int STOPPED = 0; - private static final int RUNNING = -1; - /** Underlying executor that all submitted Runnable objects are run on. */ - private final Executor executor; + private static final class Node { - /** A list of Runnables to be run in order. */ - private final Queue runQueue = new ConcurrentLinkedQueue(); + volatile Node next; - private volatile int runState = STOPPED; + Runnable walkable; - /** - * Creates a SerializingExecutor, running tasks using {@code executor}. - * - * @param executor Executor in which tasks should be run. Must not be null. - */ - public SerializingExecutor(Executor executor) { - Preconditions.checkNotNull(executor, "'executor' must not be null."); - this.executor = executor; + Node(Runnable r) { + this.rb = r; + } } - /** - * Runs the given runnable strictly after all Runnables that were submitted - * before it, and using the {@code executor} passed to the constructor. . - */ - @Override - public void execute(Runnable r) { - runQueue.add(checkNotNull(r, "'r' must not be null.")); - schedule(r); - } + private Node head; + private Node tail; - private void schedule(@Nullable Runnable removable) { - if (runStateUpdater.compareAndSet(this, STOPPED, RUNNING)) { - boolean success = false; - try { - executor.execute(this); - success = true; - } finally { - // It is possible that at this point that there are still tasks in - // the queue, it would be nice to keep trying but the error may not - // be recoverable. So we update our state and propagate so that if - // our caller deems it recoverable we won't be stuck. - if (!success) { - if (removable != null) { - // This case can only be reached if 'this' was not currently running, and we failed to - // reschedule. The item should still be in the queue for removal. - // ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not - // throw if the item to remove is null. If removable is present in the queue twice, - // the wrong one may be removed. It doesn't seem possible for this case to exist today. - // This is important to run in case of RejectedExectuionException, so that future calls - // to execute don't succeed and accidentally run a previous runnable. - runQueue.remove(removable); - } - runStateUpdater.set(this, STOPPED); - } - } - } + private final Executor delegate; + + private final AtomicBoolean running = new AtomicBoolean(); + + public SerializingExecutor(Executor delegate) { + this.delegate = delegate; + head = tail = new Node(null); } @Override public void run() { + Node n; Runnable r; try { - while ((r = runQueue.poll()) != null) { + while ((n = tail.next) != null) { + r = n.rb; + n.rb = null; + tail = n; try { r.run(); } catch (RuntimeException e) { - // Log it and keep going. - log.log(Level.SEVERE, "Exception while executing runnable " + r, e); + Logger.getLogger(getClass().getName()).log(Level.SEVERE, "bad", e); } } } finally { - runStateUpdater.set(this, STOPPED); + running.set(false); + } + if (tail.next != null) { + schedule(); } - if (!runQueue.isEmpty()) { - // we didn't enqueue anything but someone else did. - schedule(null); + } + + @Override + public void execute(Runnable command) { + Node n = new Node(command); + head.next = n; + head = n; + + schedule(); + } + + private void schedule() { + if (running.getAndSet(true)) { + return; + } + boolean success = false; + try { + delegate.execute(this); + success = true; + } finally { + if (!success) { + running.set(false); + } } } }