Skip to content

Commit c181169

Browse files
authored
3.x: Reenable XFlatMapTest.maybeSingle, add missing Single operators (#6893)
1 parent 5f6aafc commit c181169

13 files changed

+1153
-13
lines changed

src/main/java/io/reactivex/rxjava3/core/Maybe.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -3653,7 +3653,7 @@ public final <R> Maybe<R> flatMap(@NonNull Function<? super T, ? extends MaybeSo
36533653
}
36543654

36553655
/**
3656-
* Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of this {@code Maybe} into {@link MaybeSource} and emits that
3656+
* Maps the {@code onSuccess}, {@code onError} or {@code onComplete} signals of the current {@code Maybe} into a {@link MaybeSource} and emits that
36573657
* {@code MaybeSource}'s signals.
36583658
* <p>
36593659
* <img width="640" height="354" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.mmm.png" alt="">
@@ -3691,7 +3691,7 @@ public final <R> Maybe<R> flatMap(
36913691
* Returns a {@code Maybe} that emits the results of a specified function to the pair of values emitted by the
36923692
* current {@code Maybe} and a specified mapped {@link MaybeSource}.
36933693
* <p>
3694-
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMap.r.png" alt="">
3694+
* <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.combiner.png" alt="">
36953695
* <dl>
36963696
* <dt><b>Scheduler:</b></dt>
36973697
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/rxjava3/core/Single.java

+66
Original file line numberDiff line numberDiff line change
@@ -3196,6 +3196,72 @@ public final <R> Single<R> flatMap(@NonNull Function<? super T, ? extends Single
31963196
return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
31973197
}
31983198

3199+
/**
3200+
* Returns a {@code Single} that emits the results of a specified function to the pair of values emitted by the
3201+
* current {@code Single} and a specified mapped {@link SingleSource}.
3202+
* <p>
3203+
* <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.combiner.png" alt="">
3204+
* <dl>
3205+
* <dt><b>Scheduler:</b></dt>
3206+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
3207+
* </dl>
3208+
*
3209+
* @param <U>
3210+
* the type of items emitted by the {@code SingleSource} returned by the {@code mapper} function
3211+
* @param <R>
3212+
* the type of items emitted by the resulting {@code Single}
3213+
* @param mapper
3214+
* a function that returns a {@code SingleSource} for the item emitted by the current {@code Single}
3215+
* @param combiner
3216+
* a function that combines one item emitted by each of the source and collection {@code SingleSource} and
3217+
* returns an item to be emitted by the resulting {@code SingleSource}
3218+
* @return the new {@code Single} instance
3219+
* @throws NullPointerException if {@code mapper} or {@code combiner} is {@code null}
3220+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
3221+
* @since 3.0.0
3222+
*/
3223+
@CheckReturnValue
3224+
@NonNull
3225+
@SchedulerSupport(SchedulerSupport.NONE)
3226+
public final <U, R> Single<R> flatMap(@NonNull Function<? super T, ? extends SingleSource<? extends U>> mapper,
3227+
@NonNull BiFunction<? super T, ? super U, ? extends R> combiner) {
3228+
Objects.requireNonNull(mapper, "mapper is null");
3229+
Objects.requireNonNull(combiner, "combiner is null");
3230+
return RxJavaPlugins.onAssembly(new SingleFlatMapBiSelector<>(this, mapper, combiner));
3231+
}
3232+
3233+
/**
3234+
* Maps the {@code onSuccess} or {@code onError} signals of the current {@code Single} into a {@link SingleSource} and emits that
3235+
* {@code SingleSource}'s signals.
3236+
* <p>
3237+
* <img width="640" height="449" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.notification.png" alt="">
3238+
* <dl>
3239+
* <dt><b>Scheduler:</b></dt>
3240+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
3241+
* </dl>
3242+
*
3243+
* @param <R>
3244+
* the result type
3245+
* @param onSuccessMapper
3246+
* a function that returns a {@code SingleSource} to merge for the {@code onSuccess} item emitted by this {@code Single}
3247+
* @param onErrorMapper
3248+
* a function that returns a {@code SingleSource} to merge for an {@code onError} notification from this {@code Single}
3249+
* @return the new {@code Single} instance
3250+
* @throws NullPointerException if {@code onSuccessMapper} or {@code onErrorMapper} is {@code null}
3251+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
3252+
* @since 3.0.0
3253+
*/
3254+
@CheckReturnValue
3255+
@NonNull
3256+
@SchedulerSupport(SchedulerSupport.NONE)
3257+
public final <R> Single<R> flatMap(
3258+
@NonNull Function<? super T, ? extends SingleSource<? extends R>> onSuccessMapper,
3259+
@NonNull Function<? super Throwable, ? extends SingleSource<? extends R>> onErrorMapper) {
3260+
Objects.requireNonNull(onSuccessMapper, "onSuccessMapper is null");
3261+
Objects.requireNonNull(onErrorMapper, "onErrorMapper is null");
3262+
return RxJavaPlugins.onAssembly(new SingleFlatMapNotification<>(this, onSuccessMapper, onErrorMapper));
3263+
}
3264+
31993265
/**
32003266
* Returns a {@link Maybe} that is based on applying a specified function to the item emitted by the current {@code Single},
32013267
* where that function returns a {@link MaybeSource}.

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapNotification.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ public void onSuccess(T value) {
109109
return;
110110
}
111111

112-
source.subscribe(new InnerObserver());
112+
if (!isDisposed()) {
113+
source.subscribe(new InnerObserver());
114+
}
113115
}
114116

115117
@Override
@@ -124,7 +126,9 @@ public void onError(Throwable e) {
124126
return;
125127
}
126128

127-
source.subscribe(new InnerObserver());
129+
if (!isDisposed()) {
130+
source.subscribe(new InnerObserver());
131+
}
128132
}
129133

130134
@Override
@@ -139,7 +143,9 @@ public void onComplete() {
139143
return;
140144
}
141145

142-
source.subscribe(new InnerObserver());
146+
if (!isDisposed()) {
147+
source.subscribe(new InnerObserver());
148+
}
143149
}
144150

145151
final class InnerObserver implements MaybeObserver<R> {

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapSingle.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,9 @@ public void onSuccess(T value) {
8989
return;
9090
}
9191

92-
ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
92+
if (!isDisposed()) {
93+
ss.subscribe(new FlatMapSingleObserver<R>(this, downstream));
94+
}
9395
}
9496

9597
@Override

src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ public void onSuccess(T t) {
106106
return;
107107
}
108108

109-
o.subscribe(this);
109+
if (!isDisposed()) {
110+
o.subscribe(this);
111+
}
110112
}
111113

112114
}

src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapPublisher.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,9 @@ public void onSuccess(T t) {
116116
return;
117117
}
118118

119-
p.subscribe(this);
119+
if (get() != SubscriptionHelper.CANCELLED) {
120+
p.subscribe(this);
121+
}
120122
}
121123

122124
@Override

src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ public void onSuccess(T t) {
106106
return;
107107
}
108108

109-
o.subscribe(this);
109+
if (!isDisposed()) {
110+
o.subscribe(this);
111+
}
110112
}
111113

112114
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.single;
15+
16+
import java.util.Objects;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
19+
import io.reactivex.rxjava3.core.*;
20+
import io.reactivex.rxjava3.disposables.Disposable;
21+
import io.reactivex.rxjava3.exceptions.Exceptions;
22+
import io.reactivex.rxjava3.functions.*;
23+
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
24+
25+
/**
26+
* Maps a source item to another SingleSource then calls a BiFunction with the
27+
* original item and the secondary item to generate the final result.
28+
*
29+
* @param <T> the main value type
30+
* @param <U> the second value type
31+
* @param <R> the result value type
32+
* @since 3.0.0
33+
*/
34+
public final class SingleFlatMapBiSelector<T, U, R> extends Single<R> {
35+
36+
final SingleSource<T> source;
37+
38+
final Function<? super T, ? extends SingleSource<? extends U>> mapper;
39+
40+
final BiFunction<? super T, ? super U, ? extends R> resultSelector;
41+
42+
public SingleFlatMapBiSelector(SingleSource<T> source,
43+
Function<? super T, ? extends SingleSource<? extends U>> mapper,
44+
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
45+
this.source = source;
46+
this.mapper = mapper;
47+
this.resultSelector = resultSelector;
48+
}
49+
50+
@Override
51+
protected void subscribeActual(SingleObserver<? super R> observer) {
52+
source.subscribe(new FlatMapBiMainObserver<T, U, R>(observer, mapper, resultSelector));
53+
}
54+
55+
static final class FlatMapBiMainObserver<T, U, R>
56+
implements SingleObserver<T>, Disposable {
57+
58+
final Function<? super T, ? extends SingleSource<? extends U>> mapper;
59+
60+
final InnerObserver<T, U, R> inner;
61+
62+
FlatMapBiMainObserver(SingleObserver<? super R> actual,
63+
Function<? super T, ? extends SingleSource<? extends U>> mapper,
64+
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
65+
this.inner = new InnerObserver<>(actual, resultSelector);
66+
this.mapper = mapper;
67+
}
68+
69+
@Override
70+
public void dispose() {
71+
DisposableHelper.dispose(inner);
72+
}
73+
74+
@Override
75+
public boolean isDisposed() {
76+
return DisposableHelper.isDisposed(inner.get());
77+
}
78+
79+
@Override
80+
public void onSubscribe(Disposable d) {
81+
if (DisposableHelper.setOnce(inner, d)) {
82+
inner.downstream.onSubscribe(this);
83+
}
84+
}
85+
86+
@Override
87+
public void onSuccess(T value) {
88+
SingleSource<? extends U> next;
89+
90+
try {
91+
next = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
92+
} catch (Throwable ex) {
93+
Exceptions.throwIfFatal(ex);
94+
inner.downstream.onError(ex);
95+
return;
96+
}
97+
98+
if (DisposableHelper.replace(inner, null)) {
99+
inner.value = value;
100+
next.subscribe(inner);
101+
}
102+
}
103+
104+
@Override
105+
public void onError(Throwable e) {
106+
inner.downstream.onError(e);
107+
}
108+
109+
static final class InnerObserver<T, U, R>
110+
extends AtomicReference<Disposable>
111+
implements SingleObserver<U> {
112+
113+
private static final long serialVersionUID = -2897979525538174559L;
114+
115+
final SingleObserver<? super R> downstream;
116+
117+
final BiFunction<? super T, ? super U, ? extends R> resultSelector;
118+
119+
T value;
120+
121+
InnerObserver(SingleObserver<? super R> actual,
122+
BiFunction<? super T, ? super U, ? extends R> resultSelector) {
123+
this.downstream = actual;
124+
this.resultSelector = resultSelector;
125+
}
126+
127+
@Override
128+
public void onSubscribe(Disposable d) {
129+
DisposableHelper.setOnce(this, d);
130+
}
131+
132+
@Override
133+
public void onSuccess(U value) {
134+
T t = this.value;
135+
this.value = null;
136+
137+
R r;
138+
139+
try {
140+
r = Objects.requireNonNull(resultSelector.apply(t, value), "The resultSelector returned a null value");
141+
} catch (Throwable ex) {
142+
Exceptions.throwIfFatal(ex);
143+
downstream.onError(ex);
144+
return;
145+
}
146+
147+
downstream.onSuccess(r);
148+
}
149+
150+
@Override
151+
public void onError(Throwable e) {
152+
downstream.onError(e);
153+
}
154+
}
155+
}
156+
}

0 commit comments

Comments
 (0)