Skip to content

Commit faf6392

Browse files
vanniktechakarnokd
authored andcommitted
3.x: Remove vararg overloads for combineLatest in Observable + Flowable (#6635)
Fix tests.
1 parent 97790c6 commit faf6392

File tree

8 files changed

+36
-421
lines changed

8 files changed

+36
-421
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

+16-155
Large diffs are not rendered by default.

src/main/java/io/reactivex/rxjava3/core/Observable.java

+8-96
Original file line numberDiff line numberDiff line change
@@ -168,49 +168,6 @@ public static int bufferSize() {
168168
return Flowable.bufferSize();
169169
}
170170

171-
/**
172-
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
173-
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
174-
* aggregation is defined by a specified function.
175-
* <p>
176-
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
177-
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
178-
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
179-
* <p>
180-
* If any of the sources never produces an item but only terminates (normally or with an error), the
181-
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
182-
* If that input source is also synchronous, other sources after it will not be subscribed to.
183-
* <p>
184-
* If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
185-
* any items and without any calls to the combiner function.
186-
*
187-
* <p>
188-
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatest.png" alt="">
189-
* <dl>
190-
* <dt><b>Scheduler:</b></dt>
191-
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
192-
* </dl>
193-
*
194-
* @param <T>
195-
* the common base type of source values
196-
* @param <R>
197-
* the result type
198-
* @param sources
199-
* the collection of source ObservableSources
200-
* @param combiner
201-
* the aggregation function used to combine the items emitted by the source ObservableSources
202-
* @param bufferSize
203-
* the internal buffer size and prefetch amount applied to every source Observable
204-
* @return an Observable that emits items that are the result of combining the items emitted by the source
205-
* ObservableSources by means of the given aggregation function
206-
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
207-
*/
208-
@CheckReturnValue
209-
@SchedulerSupport(SchedulerSupport.NONE)
210-
public static <T, R> Observable<R> combineLatest(Function<? super Object[], ? extends R> combiner, int bufferSize, ObservableSource<? extends T>... sources) {
211-
return combineLatest(sources, combiner, bufferSize);
212-
}
213-
214171
/**
215172
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
216173
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
@@ -437,7 +394,7 @@ public static <T1, T2, R> Observable<R> combineLatest(
437394
BiFunction<? super T1, ? super T2, ? extends R> combiner) {
438395
ObjectHelper.requireNonNull(source1, "source1 is null");
439396
ObjectHelper.requireNonNull(source2, "source2 is null");
440-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2);
397+
return combineLatest(new ObservableSource[] { source1, source2 }, Functions.toFunction(combiner), bufferSize());
441398
}
442399

443400
/**
@@ -482,7 +439,7 @@ public static <T1, T2, T3, R> Observable<R> combineLatest(
482439
ObjectHelper.requireNonNull(source1, "source1 is null");
483440
ObjectHelper.requireNonNull(source2, "source2 is null");
484441
ObjectHelper.requireNonNull(source3, "source3 is null");
485-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3);
442+
return combineLatest(new ObservableSource[] { source1, source2, source3 }, Functions.toFunction(combiner), bufferSize());
486443
}
487444

488445
/**
@@ -531,7 +488,7 @@ public static <T1, T2, T3, T4, R> Observable<R> combineLatest(
531488
ObjectHelper.requireNonNull(source2, "source2 is null");
532489
ObjectHelper.requireNonNull(source3, "source3 is null");
533490
ObjectHelper.requireNonNull(source4, "source4 is null");
534-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4);
491+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4 }, Functions.toFunction(combiner), bufferSize());
535492
}
536493

537494
/**
@@ -585,7 +542,7 @@ public static <T1, T2, T3, T4, T5, R> Observable<R> combineLatest(
585542
ObjectHelper.requireNonNull(source3, "source3 is null");
586543
ObjectHelper.requireNonNull(source4, "source4 is null");
587544
ObjectHelper.requireNonNull(source5, "source5 is null");
588-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5);
545+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5 }, Functions.toFunction(combiner), bufferSize());
589546
}
590547

591548
/**
@@ -643,7 +600,7 @@ public static <T1, T2, T3, T4, T5, T6, R> Observable<R> combineLatest(
643600
ObjectHelper.requireNonNull(source4, "source4 is null");
644601
ObjectHelper.requireNonNull(source5, "source5 is null");
645602
ObjectHelper.requireNonNull(source6, "source6 is null");
646-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6);
603+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6 }, Functions.toFunction(combiner), bufferSize());
647604
}
648605

649606
/**
@@ -706,7 +663,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(
706663
ObjectHelper.requireNonNull(source5, "source5 is null");
707664
ObjectHelper.requireNonNull(source6, "source6 is null");
708665
ObjectHelper.requireNonNull(source7, "source7 is null");
709-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7);
666+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7 }, Functions.toFunction(combiner), bufferSize());
710667
}
711668

712669
/**
@@ -773,7 +730,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(
773730
ObjectHelper.requireNonNull(source6, "source6 is null");
774731
ObjectHelper.requireNonNull(source7, "source7 is null");
775732
ObjectHelper.requireNonNull(source8, "source8 is null");
776-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8);
733+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8 }, Functions.toFunction(combiner), bufferSize());
777734
}
778735

779736
/**
@@ -845,7 +802,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
845802
ObjectHelper.requireNonNull(source7, "source7 is null");
846803
ObjectHelper.requireNonNull(source8, "source8 is null");
847804
ObjectHelper.requireNonNull(source9, "source9 is null");
848-
return combineLatest(Functions.toFunction(combiner), bufferSize(), source1, source2, source3, source4, source5, source6, source7, source8, source9);
805+
return combineLatest(new ObservableSource[] { source1, source2, source3, source4, source5, source6, source7, source8, source9 }, Functions.toFunction(combiner), bufferSize());
849806
}
850807

851808
/**
@@ -890,51 +847,6 @@ public static <T, R> Observable<R> combineLatestDelayError(ObservableSource<? ex
890847
return combineLatestDelayError(sources, combiner, bufferSize());
891848
}
892849

893-
/**
894-
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
895-
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this
896-
* aggregation is defined by a specified function and delays any error from the sources until
897-
* all source ObservableSources terminate.
898-
* <p>
899-
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/combineLatestDelayError.png" alt="">
900-
* <p>
901-
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
902-
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
903-
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
904-
* <p>
905-
* If any of the sources never produces an item but only terminates (normally or with an error), the
906-
* resulting sequence terminates immediately (normally or with all the errors accumulated till that point).
907-
* If that input source is also synchronous, other sources after it will not be subscribed to.
908-
* <p>
909-
* If there are no ObservableSources provided, the resulting sequence completes immediately without emitting
910-
* any items and without any calls to the combiner function.
911-
*
912-
* <dl>
913-
* <dt><b>Scheduler:</b></dt>
914-
* <dd>{@code combineLatestDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
915-
* </dl>
916-
*
917-
* @param <T>
918-
* the common base type of source values
919-
* @param <R>
920-
* the result type
921-
* @param sources
922-
* the collection of source ObservableSources
923-
* @param combiner
924-
* the aggregation function used to combine the items emitted by the source ObservableSources
925-
* @param bufferSize
926-
* the internal buffer size and prefetch amount applied to every source Observable
927-
* @return an Observable that emits items that are the result of combining the items emitted by the source
928-
* ObservableSources by means of the given aggregation function
929-
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
930-
*/
931-
@CheckReturnValue
932-
@SchedulerSupport(SchedulerSupport.NONE)
933-
public static <T, R> Observable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
934-
int bufferSize, ObservableSource<? extends T>... sources) {
935-
return combineLatestDelayError(sources, combiner, bufferSize);
936-
}
937-
938850
/**
939851
* Combines a collection of source ObservableSources by emitting an item that aggregates the latest values of each of
940852
* the source ObservableSources each time an item is received from any of the source ObservableSources, where this

src/test/java/io/reactivex/rxjava3/flowable/FlowableNullTests.java

-76
Original file line numberDiff line numberDiff line change
@@ -76,27 +76,6 @@ public void ambIterableOneIsNull() {
7676
.assertError(NullPointerException.class);
7777
}
7878

79-
@Test(expected = NullPointerException.class)
80-
public void combineLatestVarargsNull() {
81-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
82-
@Override
83-
public Object apply(Object[] v) {
84-
return 1;
85-
}
86-
}, (Publisher<Object>[])null);
87-
}
88-
89-
@SuppressWarnings("unchecked")
90-
@Test(expected = NullPointerException.class)
91-
public void combineLatestVarargsOneIsNull() {
92-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
93-
@Override
94-
public Object apply(Object[] v) {
95-
return 1;
96-
}
97-
}, Flowable.never(), null).blockingLast();
98-
}
99-
10079
@Test(expected = NullPointerException.class)
10180
public void combineLatestIterableNull() {
10281
Flowable.combineLatestDelayError((Iterable<Publisher<Object>>)null, new Function<Object[], Object>() {
@@ -133,23 +112,6 @@ public Object apply(Object[] v) {
133112
}).blockingLast();
134113
}
135114

136-
@SuppressWarnings("unchecked")
137-
@Test(expected = NullPointerException.class)
138-
public void combineLatestVarargsFunctionNull() {
139-
Flowable.combineLatestDelayError(null, Flowable.never());
140-
}
141-
142-
@SuppressWarnings("unchecked")
143-
@Test(expected = NullPointerException.class)
144-
public void combineLatestVarargsFunctionReturnsNull() {
145-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
146-
@Override
147-
public Object apply(Object[] v) {
148-
return null;
149-
}
150-
}, just1).blockingLast();
151-
}
152-
153115
@SuppressWarnings("unchecked")
154116
@Test(expected = NullPointerException.class)
155117
public void combineLatestIterableFunctionNull() {
@@ -2759,12 +2721,6 @@ public void combineLatestDelayErrorIterableFunctionNull() {
27592721
Flowable.combineLatestDelayError(Arrays.asList(just1), null, 128);
27602722
}
27612723

2762-
@SuppressWarnings("unchecked")
2763-
@Test(expected = NullPointerException.class)
2764-
public void combineLatestDelayErrorVarargsFunctionNull() {
2765-
Flowable.combineLatestDelayError(null, 128, Flowable.never());
2766-
}
2767-
27682724
@Test(expected = NullPointerException.class)
27692725
public void zipFlowableNull() {
27702726
Flowable.zip((Flowable<Flowable<Object>>)null, new Function<Object[], Object>() {
@@ -2795,27 +2751,6 @@ public void concatFlowableNull() {
27952751
Flowable.concat((Flowable<Flowable<Object>>)null);
27962752
}
27972753

2798-
@Test(expected = NullPointerException.class)
2799-
public void combineLatestDelayErrorVarargsNull() {
2800-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
2801-
@Override
2802-
public Object apply(Object[] v) {
2803-
return 1;
2804-
}
2805-
}, 128, (Flowable<Object>[])null);
2806-
}
2807-
2808-
@SuppressWarnings("unchecked")
2809-
@Test(expected = NullPointerException.class)
2810-
public void combineLatestDelayErrorVarargsOneIsNull() {
2811-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
2812-
@Override
2813-
public Object apply(Object[] v) {
2814-
return 1;
2815-
}
2816-
}, 128, Flowable.never(), null).blockingLast();
2817-
}
2818-
28192754
@Test(expected = NullPointerException.class)
28202755
public void combineLatestDelayErrorIterableNull() {
28212756
Flowable.combineLatestDelayError((Iterable<Flowable<Object>>)null, new Function<Object[], Object>() {
@@ -2876,15 +2811,4 @@ public void delaySubscriptionOtherNull() {
28762811
public void sampleFlowableNull() {
28772812
just1.sample(null);
28782813
}
2879-
2880-
@SuppressWarnings("unchecked")
2881-
@Test(expected = NullPointerException.class)
2882-
public void combineLatestDelayErrorVarargsFunctionReturnsNull() {
2883-
Flowable.combineLatestDelayError(new Function<Object[], Object>() {
2884-
@Override
2885-
public Object apply(Object[] v) {
2886-
return null;
2887-
}
2888-
}, 128, just1).blockingLast();
2889-
}
28902814
}

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -1305,15 +1305,14 @@ public Object apply(Object a, Object b) throws Exception {
13051305
@Test
13061306
public void errorDelayed() {
13071307
Flowable.combineLatestDelayError(
1308+
new Publisher[] { Flowable.error(new TestException()), Flowable.just(1) },
13081309
new Function<Object[], Object>() {
13091310
@Override
13101311
public Object apply(Object[] a) throws Exception {
13111312
return a;
13121313
}
13131314
},
1314-
128,
1315-
Flowable.error(new TestException()),
1316-
Flowable.just(1)
1315+
128
13171316
)
13181317
.test()
13191318
.assertFailure(TestException.class);
@@ -1323,15 +1322,14 @@ public Object apply(Object[] a) throws Exception {
13231322
@Test
13241323
public void errorDelayed2() {
13251324
Flowable.combineLatestDelayError(
1325+
new Publisher[] { Flowable.error(new TestException()).startWithItem(1), Flowable.empty() },
13261326
new Function<Object[], Object>() {
13271327
@Override
13281328
public Object apply(Object[] a) throws Exception {
13291329
return a;
13301330
}
13311331
},
1332-
128,
1333-
Flowable.error(new TestException()).startWithItem(1),
1334-
Flowable.empty()
1332+
128
13351333
)
13361334
.test()
13371335
.assertFailure(TestException.class);

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -926,15 +926,14 @@ public Object apply(Object a, Object b) throws Exception {
926926
@Test
927927
public void errorDelayed() {
928928
Observable.combineLatestDelayError(
929+
new ObservableSource[] { Observable.error(new TestException()), Observable.just(1) },
929930
new Function<Object[], Object>() {
930931
@Override
931932
public Object apply(Object[] a) throws Exception {
932933
return a;
933934
}
934935
},
935-
128,
936-
Observable.error(new TestException()),
937-
Observable.just(1)
936+
128
938937
)
939938
.test()
940939
.assertFailure(TestException.class);
@@ -944,15 +943,14 @@ public Object apply(Object[] a) throws Exception {
944943
@Test
945944
public void errorDelayed2() {
946945
Observable.combineLatestDelayError(
946+
new ObservableSource[] { Observable.error(new TestException()).startWithItem(1), Observable.empty() },
947947
new Function<Object[], Object>() {
948948
@Override
949949
public Object apply(Object[] a) throws Exception {
950950
return a;
951951
}
952952
},
953-
128,
954-
Observable.error(new TestException()).startWithItem(1),
955-
Observable.empty()
953+
128
956954
)
957955
.test()
958956
.assertFailure(TestException.class);

0 commit comments

Comments
 (0)