diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 4ae993775b..cfcd1c5935 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -339,6 +339,23 @@ public static Completable fromFuture(final Future> future) {
return fromAction(Functions.futureAction(future));
}
+ /**
+ * Returns a Completable instance that runs the given Runnable for each subscriber and
+ * emits either its exception or simply completes.
+ *
+ * - Scheduler:
+ * - {@code fromRunnable} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param run the runnable to run for each subscriber
+ * @return the new Completable instance
+ * @throws NullPointerException if run is null
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public static Completable fromRunnable(final Runnable run) {
+ ObjectHelper.requireNonNull(run, "run is null");
+ return RxJavaPlugins.onAssembly(new CompletableFromRunnable(run));
+ }
+
/**
* Returns a Completable instance that subscribes to the given Observable, ignores all values and
* emits only the terminal event.
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java
new file mode 100644
index 0000000000..851b0df4c3
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableFromRunnable.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.completable;
+
+import io.reactivex.Completable;
+import io.reactivex.CompletableObserver;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.disposables.Disposables;
+import io.reactivex.exceptions.Exceptions;
+
+public final class CompletableFromRunnable extends Completable {
+
+ final Runnable runnable;
+
+ public CompletableFromRunnable(Runnable runnable) {
+ this.runnable = runnable;
+ }
+
+ @Override
+ protected void subscribeActual(CompletableObserver s) {
+ Disposable d = Disposables.empty();
+ s.onSubscribe(d);
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ if (!d.isDisposed()) {
+ s.onError(e);
+ }
+ return;
+ }
+ if (!d.isDisposed()) {
+ s.onComplete();
+ }
+ }
+}
diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java
index 682aeb32a5..421c0ad09f 100644
--- a/src/test/java/io/reactivex/completable/CompletableTest.java
+++ b/src/test/java/io/reactivex/completable/CompletableTest.java
@@ -4473,6 +4473,37 @@ public void run() {
}
}
+ @Test(expected = NullPointerException.class)
+ public void fromRunnableNull() {
+ Completable.fromRunnable(null);
+ }
+
+ @Test(timeout = 1000)
+ public void fromRunnableNormal() {
+ final AtomicInteger calls = new AtomicInteger();
+
+ Completable c = Completable.fromRunnable(new Runnable() {
+ @Override
+ public void run() {
+ calls.getAndIncrement();
+ }
+ });
+
+ c.blockingAwait();
+
+ Assert.assertEquals(1, calls.get());
+ }
+
+ @Test(timeout = 1000, expected = TestException.class)
+ public void fromRunnableThrows() {
+ Completable c = Completable.fromRunnable(new Runnable() {
+ @Override
+ public void run() { throw new TestException(); }
+ });
+
+ c.blockingAwait();
+ }
+
@Test(expected = NullPointerException.class)
public void doOnErrorNullValue() {
Completable.complete().doOnError(null);