Skip to content

Commit d3a5776

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add Maybe.flatMapSingle (#4614)
* 2.x: Add Maybe.flatMapSingle * Changes * Fix generics
1 parent c81f438 commit d3a5776

File tree

3 files changed

+268
-1
lines changed

3 files changed

+268
-1
lines changed

src/main/java/io/reactivex/Maybe.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex;
1515

16+
import java.util.NoSuchElementException;
1617
import java.util.concurrent.*;
1718

1819
import org.reactivestreams.*;
@@ -2551,6 +2552,29 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
25512552
return toFlowable().flatMap(mapper);
25522553
}
25532554

2555+
/**
2556+
* Returns a {@link Single} based on applying a specified function to the item emitted by the
2557+
* source {@link Maybe}, where that function returns a {@link Single}.
2558+
* When this Maybe completes a {@link NoSuchElementException} will be thrown.
2559+
* <p>
2560+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
2561+
* <dl>
2562+
* <dt><b>Scheduler:</b></dt>
2563+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
2564+
* </dl>
2565+
*
2566+
* @param mapper
2567+
* a function that, when applied to the item emitted by the source Maybe, returns a
2568+
* Single
2569+
* @return the Single returned from {@code mapper} when applied to the item emitted by the source Maybe
2570+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
2571+
*/
2572+
@SchedulerSupport(SchedulerSupport.NONE)
2573+
public final <R> Single<R> flatMapSingle(final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
2574+
ObjectHelper.requireNonNull(mapper, "mapper is null");
2575+
return RxJavaPlugins.onAssembly(new MaybeFlatMapSingle<T, R>(this, mapper));
2576+
}
2577+
25542578
/**
25552579
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
25562580
* source {@link Maybe}, where that function returns a {@link Completable}.
@@ -2564,7 +2588,7 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
25642588
* @param mapper
25652589
* a function that, when applied to the item emitted by the source Maybe, returns a
25662590
* Completable
2567-
* @return the Completable returned from {@code func} when applied to the item emitted by the source Maybe
2591+
* @return the Completable returned from {@code mapper} when applied to the item emitted by the source Maybe
25682592
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
25692593
*/
25702594
@SchedulerSupport(SchedulerSupport.NONE)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.MaybeObserver;
17+
import io.reactivex.MaybeSource;
18+
import io.reactivex.Single;
19+
import io.reactivex.SingleObserver;
20+
import io.reactivex.SingleSource;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.exceptions.Exceptions;
23+
import io.reactivex.functions.Function;
24+
import io.reactivex.internal.disposables.DisposableHelper;
25+
import io.reactivex.internal.functions.ObjectHelper;
26+
import java.util.NoSuchElementException;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
29+
/**
30+
* Maps the success value of the source MaybeSource into a Single.
31+
* @param <T>
32+
*/
33+
public final class MaybeFlatMapSingle<T, R> extends Single<R> {
34+
35+
final MaybeSource<T> source;
36+
37+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
38+
39+
public MaybeFlatMapSingle(MaybeSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
40+
this.source = source;
41+
this.mapper = mapper;
42+
}
43+
44+
@Override
45+
protected void subscribeActual(SingleObserver<? super R> actual) {
46+
source.subscribe(new FlatMapMaybeObserver<T, R>(actual, mapper));
47+
}
48+
49+
static final class FlatMapMaybeObserver<T, R>
50+
extends AtomicReference<Disposable>
51+
implements MaybeObserver<T>, Disposable {
52+
53+
private static final long serialVersionUID = 4827726964688405508L;
54+
55+
final SingleObserver<? super R> actual;
56+
57+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
58+
59+
FlatMapMaybeObserver(SingleObserver<? super R> actual, Function<? super T, ? extends SingleSource<? extends R>> mapper) {
60+
this.actual = actual;
61+
this.mapper = mapper;
62+
}
63+
64+
@Override
65+
public void dispose() {
66+
DisposableHelper.dispose(this);
67+
}
68+
69+
@Override
70+
public boolean isDisposed() {
71+
return DisposableHelper.isDisposed(get());
72+
}
73+
74+
@Override
75+
public void onSubscribe(Disposable d) {
76+
if (DisposableHelper.setOnce(this, d)) {
77+
actual.onSubscribe(this);
78+
}
79+
}
80+
81+
@Override
82+
public void onSuccess(T value) {
83+
SingleSource<? extends R> ss;
84+
85+
try {
86+
ss = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null SingleSource");
87+
} catch (Throwable ex) {
88+
Exceptions.throwIfFatal(ex);
89+
onError(ex);
90+
return;
91+
}
92+
93+
ss.subscribe(new FlatMapSingleObserver<R>(this, actual));
94+
}
95+
96+
@Override
97+
public void onError(Throwable e) {
98+
actual.onError(e);
99+
}
100+
101+
@Override
102+
public void onComplete() {
103+
actual.onError(new NoSuchElementException());
104+
}
105+
}
106+
107+
static final class FlatMapSingleObserver<R> implements SingleObserver<R> {
108+
109+
final AtomicReference<Disposable> parent;
110+
111+
final SingleObserver<? super R> actual;
112+
113+
FlatMapSingleObserver(AtomicReference<Disposable> parent, SingleObserver<? super R> actual) {
114+
this.parent = parent;
115+
this.actual = actual;
116+
}
117+
118+
@Override
119+
public void onSubscribe(final Disposable d) {
120+
DisposableHelper.replace(parent, d);
121+
}
122+
123+
@Override
124+
public void onSuccess(final R value) {
125+
actual.onSuccess(value);
126+
}
127+
128+
@Override
129+
public void onError(final Throwable e) {
130+
actual.onError(e);
131+
}
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.Single;
18+
import io.reactivex.SingleSource;
19+
import io.reactivex.functions.Function;
20+
import java.util.NoSuchElementException;
21+
import org.junit.Test;
22+
23+
public class MaybeFlatMapSingleTest {
24+
@Test(expected = NullPointerException.class)
25+
public void flatMapSingleNull() {
26+
Maybe.just(1)
27+
.flatMapSingle(null);
28+
}
29+
30+
@Test
31+
public void flatMapSingleValue() {
32+
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
33+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
34+
if (integer == 1) {
35+
return Single.just(2);
36+
}
37+
38+
return Single.just(1);
39+
}
40+
})
41+
.test()
42+
.assertResult(2);
43+
}
44+
45+
@Test
46+
public void flatMapSingleValueDifferentType() {
47+
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<String>>() {
48+
@Override public SingleSource<String> apply(final Integer integer) throws Exception {
49+
if (integer == 1) {
50+
return Single.just("2");
51+
}
52+
53+
return Single.just("1");
54+
}
55+
})
56+
.test()
57+
.assertResult("2");
58+
}
59+
60+
@Test
61+
public void flatMapSingleValueNull() {
62+
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
63+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
64+
return null;
65+
}
66+
})
67+
.test()
68+
.assertNoValues()
69+
.assertError(NullPointerException.class)
70+
.assertErrorMessage("The mapper returned a null SingleSource");
71+
}
72+
73+
@Test
74+
public void flatMapSingleValueErrorThrown() {
75+
Maybe.just(1).flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
76+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
77+
throw new RuntimeException("something went terribly wrong!");
78+
}
79+
})
80+
.test()
81+
.assertNoValues()
82+
.assertError(RuntimeException.class)
83+
.assertErrorMessage("something went terribly wrong!");
84+
}
85+
86+
@Test
87+
public void flatMapSingleError() {
88+
RuntimeException exception = new RuntimeException("test");
89+
90+
Maybe.error(exception).flatMapSingle(new Function<Object, SingleSource<Object>>() {
91+
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
92+
return Single.just(new Object());
93+
}
94+
})
95+
.test()
96+
.assertError(exception);
97+
}
98+
99+
@Test
100+
public void flatMapSingleEmpty() {
101+
Maybe.<Integer>empty().flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
102+
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
103+
return Single.just(2);
104+
}
105+
})
106+
.test()
107+
.assertNoValues()
108+
.assertError(NoSuchElementException.class);
109+
}
110+
}

0 commit comments

Comments
 (0)