File tree Expand file tree Collapse file tree 3 files changed +95
-0
lines changed
internal/operators/completable
test/java/io/reactivex/completable Expand file tree Collapse file tree 3 files changed +95
-0
lines changed Original file line number Diff line number Diff line change @@ -339,6 +339,23 @@ public static Completable fromFuture(final Future<?> future) {
339339 return fromAction (Functions .futureAction (future ));
340340 }
341341
342+ /**
343+ * Returns a Completable instance that runs the given Runnable for each subscriber and
344+ * emits either its exception or simply completes.
345+ * <dl>
346+ * <dt><b>Scheduler:</b></dt>
347+ * <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
348+ * </dl>
349+ * @param run the runnable to run for each subscriber
350+ * @return the new Completable instance
351+ * @throws NullPointerException if run is null
352+ */
353+ @ SchedulerSupport (SchedulerSupport .NONE )
354+ public static Completable fromRunnable (final Runnable run ) {
355+ ObjectHelper .requireNonNull (run , "run is null" );
356+ return RxJavaPlugins .onAssembly (new CompletableFromRunnable (run ));
357+ }
358+
342359 /**
343360 * Returns a Completable instance that subscribes to the given Observable, ignores all values and
344361 * emits only the terminal event.
Original file line number Diff line number Diff line change 1+ /**
2+ * Copyright 2016 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+ * compliance with the License. You may obtain a copy of the License at
6+ *
7+ * http://www.apache.org/licenses/LICENSE-2.0
8+ *
9+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
10+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+ * the License for the specific language governing permissions and limitations under the License.
12+ */
13+
14+ package io .reactivex .internal .operators .completable ;
15+
16+ import io .reactivex .Completable ;
17+ import io .reactivex .CompletableObserver ;
18+ import io .reactivex .disposables .Disposable ;
19+ import io .reactivex .disposables .Disposables ;
20+ import io .reactivex .exceptions .Exceptions ;
21+
22+ public final class CompletableFromRunnable extends Completable {
23+
24+ final Runnable runnable ;
25+
26+ public CompletableFromRunnable (Runnable runnable ) {
27+ this .runnable = runnable ;
28+ }
29+
30+ @ Override
31+ protected void subscribeActual (CompletableObserver s ) {
32+ Disposable d = Disposables .empty ();
33+ s .onSubscribe (d );
34+ try {
35+ runnable .run ();
36+ } catch (Throwable e ) {
37+ Exceptions .throwIfFatal (e );
38+ if (!d .isDisposed ()) {
39+ s .onError (e );
40+ }
41+ return ;
42+ }
43+ if (!d .isDisposed ()) {
44+ s .onComplete ();
45+ }
46+ }
47+ }
Original file line number Diff line number Diff line change @@ -4473,6 +4473,37 @@ public void run() {
44734473 }
44744474 }
44754475
4476+ @ Test (expected = NullPointerException .class )
4477+ public void fromRunnableNull () {
4478+ Completable .fromRunnable (null );
4479+ }
4480+
4481+ @ Test (timeout = 1000 )
4482+ public void fromRunnableNormal () {
4483+ final AtomicInteger calls = new AtomicInteger ();
4484+
4485+ Completable c = Completable .fromRunnable (new Runnable () {
4486+ @ Override
4487+ public void run () {
4488+ calls .getAndIncrement ();
4489+ }
4490+ });
4491+
4492+ c .blockingAwait ();
4493+
4494+ Assert .assertEquals (1 , calls .get ());
4495+ }
4496+
4497+ @ Test (timeout = 1000 , expected = TestException .class )
4498+ public void fromRunnableThrows () {
4499+ Completable c = Completable .fromRunnable (new Runnable () {
4500+ @ Override
4501+ public void run () { throw new TestException (); }
4502+ });
4503+
4504+ c .blockingAwait ();
4505+ }
4506+
44764507 @ Test (expected = NullPointerException .class )
44774508 public void doOnErrorNullValue () {
44784509 Completable .complete ().doOnError (null );
You can’t perform that action at this time.
0 commit comments