Skip to content

Commit c81f438

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add Completable.andThen(MaybeSource) (#4616)
* 2.x: Add Completable.andThen(MaybeSource) * Add missing import * Requested changes
1 parent d85d40e commit c81f438

File tree

4 files changed

+203
-1
lines changed

4 files changed

+203
-1
lines changed

src/main/java/io/reactivex/Completable.java

+21
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.internal.operators.completable.*;
2727
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
2828
import io.reactivex.internal.operators.maybe.MaybeFromCompletable;
29+
import io.reactivex.internal.operators.maybe.MaybeDelayWithCompletable;
2930
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
3031
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
3132
import io.reactivex.internal.util.ExceptionHelper;
@@ -799,6 +800,26 @@ public final <T> Single<T> andThen(SingleSource<T> next) {
799800
return RxJavaPlugins.onAssembly(new SingleDelayWithCompletable<T>(next, this));
800801
}
801802

803+
/**
804+
* Returns a {@link Maybe} which will subscribe to this Completable and once that is completed then
805+
* will subscribe to the {@code next} MaybeSource. An error event from this Completable will be
806+
* propagated to the downstream subscriber and will result in skipping the subscription of the
807+
* Maybe.
808+
* <dl>
809+
* <dt><b>Scheduler:</b></dt>
810+
* <dd>{@code andThen} does not operate by default on a particular {@link Scheduler}.</dd>
811+
* </dl>
812+
*
813+
* @param <T> the value type of the next MaybeSource
814+
* @param next the Maybe to subscribe after this Completable is completed, not null
815+
* @return Maybe that composes this Completable and next
816+
*/
817+
@SchedulerSupport(SchedulerSupport.NONE)
818+
public final <T> Maybe<T> andThen(MaybeSource<T> next) {
819+
ObjectHelper.requireNonNull(next, "next is null");
820+
return RxJavaPlugins.onAssembly(new MaybeDelayWithCompletable<T>(next, this));
821+
}
822+
802823
/**
803824
* Returns a Completable that first runs this Completable
804825
* and then the other completable.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.CompletableObserver;
17+
import io.reactivex.CompletableSource;
18+
import io.reactivex.Maybe;
19+
import io.reactivex.MaybeObserver;
20+
import io.reactivex.MaybeSource;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
25+
public final class MaybeDelayWithCompletable<T> extends Maybe<T> {
26+
27+
final MaybeSource<T> source;
28+
29+
final CompletableSource other;
30+
31+
public MaybeDelayWithCompletable(MaybeSource<T> source, CompletableSource other) {
32+
this.source = source;
33+
this.other = other;
34+
}
35+
36+
@Override
37+
protected void subscribeActual(MaybeObserver<? super T> subscriber) {
38+
other.subscribe(new OtherObserver<T>(subscriber, source));
39+
}
40+
41+
static final class OtherObserver<T>
42+
extends AtomicReference<Disposable>
43+
implements CompletableObserver, Disposable {
44+
private static final long serialVersionUID = 703409937383992161L;
45+
46+
final MaybeObserver<? super T> actual;
47+
48+
final MaybeSource<T> source;
49+
50+
OtherObserver(MaybeObserver<? super T> actual, MaybeSource<T> source) {
51+
this.actual = actual;
52+
this.source = source;
53+
}
54+
55+
@Override
56+
public void onSubscribe(Disposable d) {
57+
if (DisposableHelper.setOnce(this, d)) {
58+
59+
actual.onSubscribe(this);
60+
}
61+
}
62+
63+
@Override
64+
public void onError(Throwable e) {
65+
actual.onError(e);
66+
}
67+
68+
@Override
69+
public void onComplete() {
70+
source.subscribe(new DelayWithMainObserver<T>(this, actual));
71+
}
72+
73+
@Override
74+
public void dispose() {
75+
DisposableHelper.dispose(this);
76+
}
77+
78+
@Override
79+
public boolean isDisposed() {
80+
return DisposableHelper.isDisposed(get());
81+
}
82+
}
83+
84+
static final class DelayWithMainObserver<T> implements MaybeObserver<T> {
85+
86+
final AtomicReference<Disposable> parent;
87+
88+
final MaybeObserver<? super T> actual;
89+
90+
DelayWithMainObserver(AtomicReference<Disposable> parent, MaybeObserver<? super T> actual) {
91+
this.parent = parent;
92+
this.actual = actual;
93+
}
94+
95+
@Override
96+
public void onSubscribe(Disposable d) {
97+
DisposableHelper.replace(parent, d);
98+
}
99+
100+
@Override
101+
public void onSuccess(T value) {
102+
actual.onSuccess(value);
103+
}
104+
105+
@Override
106+
public void onError(Throwable e) {
107+
actual.onError(e);
108+
}
109+
110+
@Override
111+
public void onComplete() {
112+
actual.onComplete();
113+
}
114+
}
115+
}

src/main/java/io/reactivex/internal/operators/single/SingleDelayWithCompletable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static final class OtherObserver<T>
5353

5454
@Override
5555
public void onSubscribe(Disposable d) {
56-
if (DisposableHelper.set(this, d)) {
56+
if (DisposableHelper.setOnce(this, d)) {
5757

5858
actual.onSubscribe(this);
5959
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.Maybe;
18+
import org.junit.Test;
19+
20+
public class CompletableAndThenTest {
21+
@Test(expected = NullPointerException.class)
22+
public void andThenMaybeNull() {
23+
Completable.complete()
24+
.andThen((Maybe<Object>) null);
25+
}
26+
27+
@Test
28+
public void andThenMaybeCompleteValue() {
29+
Completable.complete()
30+
.andThen(Maybe.just(1))
31+
.test()
32+
.assertResult(1);
33+
}
34+
35+
@Test
36+
public void andThenMaybeCompleteError() {
37+
Completable.complete()
38+
.andThen(Maybe.error(new RuntimeException("test")))
39+
.test()
40+
.assertNotComplete()
41+
.assertNoValues()
42+
.assertError(RuntimeException.class)
43+
.assertErrorMessage("test");
44+
}
45+
46+
@Test
47+
public void andThenMaybeCompleteEmpty() {
48+
Completable.complete()
49+
.andThen(Maybe.empty())
50+
.test()
51+
.assertNoValues()
52+
.assertNoErrors()
53+
.assertComplete();
54+
}
55+
56+
@Test
57+
public void andThenMaybeError() {
58+
Completable.error(new RuntimeException("bla"))
59+
.andThen(Maybe.empty())
60+
.test()
61+
.assertNotComplete()
62+
.assertNoValues()
63+
.assertError(RuntimeException.class)
64+
.assertErrorMessage("bla");
65+
}
66+
}

0 commit comments

Comments
 (0)