Skip to content

Commit ab21265

Browse files
authored
2.x: improve performance of Observable.flatMapIterable (#4612)
1 parent d3a5776 commit ab21265

File tree

8 files changed

+418
-4
lines changed

8 files changed

+418
-4
lines changed

src/main/java/io/reactivex/Observable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5736,7 +5736,7 @@ public final <R> Observable<R> concatMapEagerDelayError(Function<? super T, ? ex
57365736
@SchedulerSupport(SchedulerSupport.NONE)
57375737
public final <U> Observable<U> concatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
57385738
ObjectHelper.requireNonNull(mapper, "mapper is null");
5739-
return concatMap(ObservableInternalHelper.flatMapIntoIterable(mapper));
5739+
return RxJavaPlugins.onAssembly(new ObservableFlattenIterable<T, U>(this, mapper));
57405740
}
57415741

57425742
/**
@@ -7188,7 +7188,7 @@ public final <U, R> Observable<R> flatMap(Function<? super T, ? extends Observab
71887188
@SchedulerSupport(SchedulerSupport.NONE)
71897189
public final <U> Observable<U> flatMapIterable(final Function<? super T, ? extends Iterable<? extends U>> mapper) {
71907190
ObjectHelper.requireNonNull(mapper, "mapper is null");
7191-
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper));
7191+
return RxJavaPlugins.onAssembly(new ObservableFlattenIterable<T, U>(this, mapper));
71927192
}
71937193

71947194
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import java.util.Iterator;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.Function;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Maps a sequence into an Iterable and emits its values.
28+
*
29+
* @param <T> the input value type to map to Iterable
30+
* @param <R> the element type of the Iterable and the output
31+
*/
32+
public final class ObservableFlattenIterable<T, R> extends AbstractObservableWithUpstream<T, R> {
33+
34+
final Function<? super T, ? extends Iterable<? extends R>> mapper;
35+
36+
public ObservableFlattenIterable(ObservableSource<T> source,
37+
Function<? super T, ? extends Iterable<? extends R>> mapper) {
38+
super(source);
39+
this.mapper = mapper;
40+
}
41+
42+
@Override
43+
protected void subscribeActual(Observer<? super R> observer) {
44+
source.subscribe(new FlattenIterableObserver<T, R>(observer, mapper));
45+
}
46+
47+
static final class FlattenIterableObserver<T, R> implements Observer<T>, Disposable {
48+
final Observer<? super R> actual;
49+
50+
final Function<? super T, ? extends Iterable<? extends R>> mapper;
51+
52+
Disposable d;
53+
54+
FlattenIterableObserver(Observer<? super R> actual, Function<? super T, ? extends Iterable<? extends R>> mapper) {
55+
this.actual = actual;
56+
this.mapper = mapper;
57+
}
58+
59+
@Override
60+
public void onSubscribe(Disposable d) {
61+
if (DisposableHelper.validate(this.d, d)) {
62+
this.d = d;
63+
64+
actual.onSubscribe(this);
65+
}
66+
}
67+
68+
@Override
69+
public void onNext(T value) {
70+
if (d == DisposableHelper.DISPOSED) {
71+
return;
72+
}
73+
74+
Iterator<? extends R> it;
75+
76+
try {
77+
it = mapper.apply(value).iterator();
78+
} catch (Throwable ex) {
79+
Exceptions.throwIfFatal(ex);
80+
d.dispose();
81+
onError(ex);
82+
return;
83+
}
84+
85+
Observer<? super R> a = actual;
86+
87+
for (;;) {
88+
boolean b;
89+
90+
try {
91+
b = it.hasNext();
92+
} catch (Throwable ex) {
93+
Exceptions.throwIfFatal(ex);
94+
d.dispose();
95+
onError(ex);
96+
return;
97+
}
98+
99+
if (b) {
100+
R v;
101+
102+
try {
103+
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
104+
} catch (Throwable ex) {
105+
Exceptions.throwIfFatal(ex);
106+
d.dispose();
107+
onError(ex);
108+
return;
109+
}
110+
111+
a.onNext(v);
112+
} else {
113+
break;
114+
}
115+
}
116+
}
117+
118+
@Override
119+
public void onError(Throwable e) {
120+
if (d == DisposableHelper.DISPOSED) {
121+
RxJavaPlugins.onError(e);
122+
return;
123+
}
124+
actual.onError(e);
125+
}
126+
127+
@Override
128+
public void onComplete() {
129+
if (d == DisposableHelper.DISPOSED) {
130+
return;
131+
}
132+
actual.onComplete();
133+
}
134+
135+
@Override
136+
public boolean isDisposed() {
137+
return d.isDisposed();
138+
}
139+
140+
@Override
141+
public void dispose() {
142+
d.dispose();
143+
d = DisposableHelper.DISPOSED;
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.TimeUnit;
17+
18+
import org.openjdk.jmh.annotations.*;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
import org.reactivestreams.Publisher;
21+
22+
import io.reactivex.functions.Function;
23+
24+
@BenchmarkMode(Mode.Throughput)
25+
@Warmup(iterations = 5)
26+
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@Fork(value = 1)
29+
@State(Scope.Thread)
30+
public class FlatMapJustPerf {
31+
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
32+
public int times;
33+
34+
Flowable<Integer> flowable;
35+
36+
Observable<Integer> observable;
37+
38+
@Setup
39+
public void setup() {
40+
Integer[] array = new Integer[times];
41+
42+
flowable = Flowable.fromArray(array).flatMap(new Function<Integer, Publisher<Integer>>() {
43+
@Override
44+
public Publisher<Integer> apply(Integer v) throws Exception {
45+
return Flowable.just(v);
46+
}
47+
});
48+
49+
observable = Observable.fromArray(array).flatMap(new Function<Integer, Observable<Integer>>() {
50+
@Override
51+
public Observable<Integer> apply(Integer v) throws Exception {
52+
return Observable.just(v);
53+
}
54+
});
55+
}
56+
57+
@Benchmark
58+
public void flowable(Blackhole bh) {
59+
flowable.subscribe(new PerfConsumer(bh));
60+
}
61+
62+
@Benchmark
63+
public void observable(Blackhole bh) {
64+
observable.subscribe(new PerfConsumer(bh));
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.*;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
import io.reactivex.functions.Function;
23+
24+
@BenchmarkMode(Mode.Throughput)
25+
@Warmup(iterations = 5)
26+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@Fork(value = 1)
29+
@State(Scope.Thread)
30+
public class FlattenCrossMapPerf {
31+
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
32+
public int times;
33+
34+
Flowable<Integer> flowable;
35+
36+
Observable<Integer> observable;
37+
38+
@Setup
39+
public void setup() {
40+
Integer[] array = new Integer[times];
41+
Arrays.fill(array, 777);
42+
43+
Integer[] arrayInner = new Integer[1000000 / times];
44+
Arrays.fill(arrayInner, 888);
45+
46+
final Iterable<Integer> list = Arrays.asList(arrayInner);
47+
48+
flowable = Flowable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
49+
@Override
50+
public Iterable<Integer> apply(Integer v) throws Exception {
51+
return list;
52+
}
53+
});
54+
55+
observable = Observable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
56+
@Override
57+
public Iterable<Integer> apply(Integer v) throws Exception {
58+
return list;
59+
}
60+
});
61+
}
62+
63+
@Benchmark
64+
public void flowable(Blackhole bh) {
65+
flowable.subscribe(new PerfConsumer(bh));
66+
}
67+
68+
@Benchmark
69+
public void observable(Blackhole bh) {
70+
observable.subscribe(new PerfConsumer(bh));
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.*;
17+
import java.util.concurrent.TimeUnit;
18+
19+
import org.openjdk.jmh.annotations.*;
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
import io.reactivex.functions.Function;
23+
24+
@BenchmarkMode(Mode.Throughput)
25+
@Warmup(iterations = 5)
26+
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
27+
@OutputTimeUnit(TimeUnit.SECONDS)
28+
@Fork(value = 1)
29+
@State(Scope.Thread)
30+
public class FlattenJustPerf {
31+
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
32+
public int times;
33+
34+
Flowable<Integer> flowable;
35+
36+
Observable<Integer> observable;
37+
38+
@Setup
39+
public void setup() {
40+
Integer[] array = new Integer[times];
41+
Arrays.fill(array, 777);
42+
43+
final Iterable<Integer> singletonList = Collections.singletonList(1);
44+
45+
flowable = Flowable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
46+
@Override
47+
public Iterable<Integer> apply(Integer v) throws Exception {
48+
return singletonList;
49+
}
50+
});
51+
52+
observable = Observable.fromArray(array).flatMapIterable(new Function<Integer, Iterable<Integer>>() {
53+
@Override
54+
public Iterable<Integer> apply(Integer v) throws Exception {
55+
return singletonList;
56+
}
57+
});
58+
}
59+
60+
@Benchmark
61+
public void flowable(Blackhole bh) {
62+
flowable.subscribe(new PerfConsumer(bh));
63+
}
64+
65+
@Benchmark
66+
public void observable(Blackhole bh) {
67+
observable.subscribe(new PerfConsumer(bh));
68+
}
69+
}

0 commit comments

Comments
 (0)