Skip to content

Zip and CombineLatest Operators: Generic Order and More Arities #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ sourceSets {
}

dependencies {
compile 'org.scala-lang:scala-library:2.10.2'
// pinning to 2.10.1 as having issues with 2.10.2
compile 'org.scala-lang:scala-library:2.10.1'

compile project(':rxjava-core')

Expand Down
257 changes: 225 additions & 32 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@
import rx.util.functions.Func2;
import rx.util.functions.Func3;
import rx.util.functions.Func4;
import rx.util.functions.Func5;
import rx.util.functions.Func6;
import rx.util.functions.Func7;
import rx.util.functions.Func8;
import rx.util.functions.Func9;
import rx.util.functions.FuncN;
import rx.util.functions.Function;

Expand Down Expand Up @@ -908,18 +913,17 @@ public static <T> Observable<T> from(Future<T> future, long timeout, TimeUnit un
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* @param o2
* another source Observable
* @param reduceFunction
* a function that, when applied to a pair of items, each emitted by one of the two
* source Observables, results in an item that will be emitted by the resulting
* Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Func2<? super T0, ? super T1, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, reduceFunction));
public static <T1, T2, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Func2<? super T1, ? super T2, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, zipFunction));
}

/**
Expand Down Expand Up @@ -981,19 +985,19 @@ public static <T> Observable<Boolean> sequenceEqual(Observable<T> first, Observa
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param function
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> function) {
return create(OperationZip.zip(w0, w1, w2, function));
public static <T1, T2, T3, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, zipFunction));
}

/**
Expand All @@ -1010,21 +1014,210 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Obs
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param w0
* @param o1
* one source Observable
* @param w1
* another source Observable
* @param w2
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param w3
* @param o4
* a fourth source Observable
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Func5<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6,
Func6<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7,
Func7<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Func8<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction));
}

/**
* Returns an Observable that emits the results of a function of your choosing applied to
* combinations of four items emitted, in sequence, by four other Observables.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/zip.png">
* <p> {@code zip} applies this function in strict sequence, so the first item emitted by the
* new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item
* emitted by {@code w3}; the second item emitted by
* the new Observable will be the result of the function applied to the second item emitted by
* each of those Observables; and so forth.
* <p>
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations
* of the source Observable that emits the fewest items.
*
* @param o1
* one source Observable
* @param o2
* a second source Observable
* @param o3
* a third source Observable
* @param o4
* a fourth source Observable
* @param o5
* a fifth source Observable
* @param o6
* a sixth source Observable
* @param o7
* a seventh source Observable
* @param o8
* an eighth source Observable
* @param o9
* a ninth source Observable
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Observable<? extends T9> o9, Func9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipFunction) {
return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction));
}

/**
Expand All @@ -1033,30 +1226,30 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0,
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
*
* @param w0
* @param o1
* The first source observable.
* @param w1
* @param o2
* The second source observable.
* @param combineFunction
* The aggregation function used to combine the source observable values.
* @return An Observable that combines the source Observables with the given combine function
*/
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
public static <T1, T2, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Func2<? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<? super T0, ? super T1, ? super T2, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
public static <T1, T2, T3, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Func3<? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction));
}

/**
* @see #combineLatest(Observable, Observable, Func2)
*/
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
public static <T1, T2, T3, T4, R> Observable<R> combineLatest(Observable<T1> o1, Observable<T2> o2, Observable<T3> o3, Observable<T4> o4, Func4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> combineFunction) {
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction));
}

/**
Expand Down Expand Up @@ -1307,7 +1500,7 @@ public Observable<R> call(List<Observable<?>> wsList) {
*
* @param ws
* A collection of source Observables
* @param reduceFunction
* @param zipFunction
* a function that, when applied to an item emitted by each of the source
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
Expand Down
Loading