Skip to content

core: make SerializingExecutor SPSC [DO NOT MERGE] #3778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 49 additions & 69 deletions core/src/main/java/io/grpc/internal/SerializingExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,10 @@

package io.grpc.internal;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.base.Preconditions;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Executor ensuring that all {@link Runnable} tasks submitted are executed in order
Expand All @@ -34,88 +28,74 @@
*/
// TODO(madongfly): figure out a way to not expose it or move it to transport package.
public final class SerializingExecutor implements Executor, Runnable {
private static final Logger log =
Logger.getLogger(SerializingExecutor.class.getName());

private static final AtomicIntegerFieldUpdater<SerializingExecutor> runStateUpdater =
AtomicIntegerFieldUpdater.newUpdater(SerializingExecutor.class, "runState");
private static final int STOPPED = 0;
private static final int RUNNING = -1;

/** Underlying executor that all submitted Runnable objects are run on. */
private final Executor executor;
private static final class Node {

/** A list of Runnables to be run in order. */
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<Runnable>();
volatile Node next;

private volatile int runState = STOPPED;
Runnable walkable;

/**
* Creates a SerializingExecutor, running tasks using {@code executor}.
*
* @param executor Executor in which tasks should be run. Must not be null.
*/
public SerializingExecutor(Executor executor) {
Preconditions.checkNotNull(executor, "'executor' must not be null.");
this.executor = executor;
Node(Runnable r) {
this.rb = r;
}
}

/**
* Runs the given runnable strictly after all Runnables that were submitted
* before it, and using the {@code executor} passed to the constructor. .
*/
@Override
public void execute(Runnable r) {
runQueue.add(checkNotNull(r, "'r' must not be null."));
schedule(r);
}
private Node head;
private Node tail;

private void schedule(@Nullable Runnable removable) {
if (runStateUpdater.compareAndSet(this, STOPPED, RUNNING)) {
boolean success = false;
try {
executor.execute(this);
success = true;
} finally {
// It is possible that at this point that there are still tasks in
// the queue, it would be nice to keep trying but the error may not
// be recoverable. So we update our state and propagate so that if
// our caller deems it recoverable we won't be stuck.
if (!success) {
if (removable != null) {
// This case can only be reached if 'this' was not currently running, and we failed to
// reschedule. The item should still be in the queue for removal.
// ConcurrentLinkedQueue claims that null elements are not allowed, but seems to not
// throw if the item to remove is null. If removable is present in the queue twice,
// the wrong one may be removed. It doesn't seem possible for this case to exist today.
// This is important to run in case of RejectedExectuionException, so that future calls
// to execute don't succeed and accidentally run a previous runnable.
runQueue.remove(removable);
}
runStateUpdater.set(this, STOPPED);
}
}
}
private final Executor delegate;

private final AtomicBoolean running = new AtomicBoolean();

public SerializingExecutor(Executor delegate) {
this.delegate = delegate;
head = tail = new Node(null);
}

@Override
public void run() {
Node n;
Runnable r;
try {
while ((r = runQueue.poll()) != null) {
while ((n = tail.next) != null) {
r = n.rb;
n.rb = null;
tail = n;
try {
r.run();
} catch (RuntimeException e) {
// Log it and keep going.
log.log(Level.SEVERE, "Exception while executing runnable " + r, e);
Logger.getLogger(getClass().getName()).log(Level.SEVERE, "bad", e);
}
}
} finally {
runStateUpdater.set(this, STOPPED);
running.set(false);
}
if (tail.next != null) {
schedule();
}
if (!runQueue.isEmpty()) {
// we didn't enqueue anything but someone else did.
schedule(null);
}

@Override
public void execute(Runnable command) {
Node n = new Node(command);
head.next = n;
head = n;

schedule();
}

private void schedule() {
if (running.getAndSet(true)) {
return;
}
boolean success = false;
try {
delegate.execute(this);
success = true;
} finally {
if (!success) {
running.set(false);
}
}
}
}