From 6bd2033ae7778e27a6dff20a66c370092132772b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 31 Aug 2013 15:34:28 -0700 Subject: [PATCH 1/3] UnitTest confirming compilation failure without super/extends and success with them. - only testing zip operator at this time --- .../src/test/java/rx/CovarianceTest.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/CovarianceTest.java diff --git a/rxjava-core/src/test/java/rx/CovarianceTest.java b/rxjava-core/src/test/java/rx/CovarianceTest.java new file mode 100644 index 0000000000..2346b0804a --- /dev/null +++ b/rxjava-core/src/test/java/rx/CovarianceTest.java @@ -0,0 +1,99 @@ +package rx; + +import org.junit.Test; + +import rx.util.functions.Action1; +import rx.util.functions.Func2; + +/** + * Test super/extends of generics. + * + * See https://github.com/Netflix/RxJava/pull/331 + */ +public class CovarianceTest { + + /** + * This won't compile if super/extends isn't done correctly on generics + */ + @Test + public void testCovarianceOfZip() { + Observable horrors = Observable.from(new HorrorMovie()); + Observable ratings = Observable.from(new CoolRating()); + + Func2 combine = new Func2() { + @Override + public ExtendedResult call(Media m, Rating r) { + return new ExtendedResult(); + } + }; + + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Result t1) { + System.out.println("Result: " + t1); + } + + }); + + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Result t1) { + System.out.println("Result: " + t1); + } + + }); + + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(ExtendedResult t1) { + System.out.println("Result: " + t1); + } + + }); + + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Result t1) { + System.out.println("Result: " + t1); + } + + }); + + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(Result t1) { + System.out.println("Result: " + t1); + } + + }); + + Observable. zip(horrors, ratings, combine); + + } + + static class Media { + } + + static class Movie extends Media { + } + + static class HorrorMovie extends Movie { + } + + static class Rating { + } + + static class CoolRating extends Rating { + } + + static class Result { + } + + static class ExtendedResult extends Result { + } +} From 98cd7115ec2babfe9b98baebce6bf0b267614a47 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 31 Aug 2013 15:36:23 -0700 Subject: [PATCH 2/3] =?UTF-8?q?Need=20to=20stay=20pinned=20on=20Scala=202.?= =?UTF-8?q?10.1=20still=20=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit I can't get through release process to maven central on 2.10.2 for some reason so pinning until that is solved. --- language-adaptors/rxjava-scala/build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle index bac27a0c5b..e1e615519c 100644 --- a/language-adaptors/rxjava-scala/build.gradle +++ b/language-adaptors/rxjava-scala/build.gradle @@ -21,7 +21,8 @@ sourceSets { } dependencies { - compile 'org.scala-lang:scala-library:2.10.2' + // pinning to 2.10.1 as having issues with 2.10.2 + compile 'org.scala-lang:scala-library:2.10.1' compile project(':rxjava-core') From ac6a0a1ad8edd25b3c17a6fbca6960e9bcdda020 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Sat, 31 Aug 2013 16:08:54 -0700 Subject: [PATCH 3/3] Zip: Order of Generics and Artities 5-9 Changed order of generics on zip (and combineLatest) to match the rest of the project. Added arties 5, 6, 7, 8, 9 to zip operator. https://github.com/Netflix/RxJava/issues/333 Order of Generics on Zip Operator https://github.com/Netflix/RxJava/issues/103 Add more arities to zip operator --- rxjava-core/src/main/java/rx/Observable.java | 257 +++++++++++++++--- .../main/java/rx/operators/OperationZip.java | 100 ++++++- .../src/test/java/rx/CovarianceTest.java | 12 +- 3 files changed, 316 insertions(+), 53 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 68935749ba..179fbabbcf 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -83,6 +83,11 @@ import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; import rx.util.functions.FuncN; import rx.util.functions.Function; @@ -908,18 +913,17 @@ public static Observable from(Future future, long timeout, TimeUnit un * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. * - * @param w0 + * @param o1 * one source Observable - * @param w1 + * @param o2 * another source Observable - * @param reduceFunction - * a function that, when applied to a pair of items, each emitted by one of the two - * source Observables, results in an item that will be emitted by the resulting - * Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { - return create(OperationZip.zip(w0, w1, reduceFunction)); + public static Observable zip(Observable o1, Observable o2, Func2 zipFunction) { + return create(OperationZip.zip(o1, o2, zipFunction)); } /** @@ -981,19 +985,19 @@ public static Observable sequenceEqual(Observable first, Observa * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. * - * @param w0 + * @param o1 * one source Observable - * @param w1 - * another source Observable - * @param w2 + * @param o2 + * a second source Observable + * @param o3 * a third source Observable - * @param function + * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { - return create(OperationZip.zip(w0, w1, w2, function)); + public static Observable zip(Observable o1, Observable o2, Observable o3, Func3 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, zipFunction)); } /** @@ -1010,21 +1014,210 @@ public static Observable zip(Observable w0, Obs * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations * of the source Observable that emits the fewest items. * - * @param w0 + * @param o1 * one source Observable - * @param w1 - * another source Observable - * @param w2 + * @param o2 + * a second source Observable + * @param o3 * a third source Observable - * @param w3 + * @param o4 * a fourth source Observable - * @param reduceFunction + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + */ + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Func4 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, zipFunction)); + } + + /** + * Returns an Observable that emits the results of a function of your choosing applied to + * combinations of four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by + * each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * of the source Observable that emits the fewest items. + * + * @param o1 + * one source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + */ + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, o5, zipFunction)); + } + + /** + * Returns an Observable that emits the results of a function of your choosing applied to + * combinations of four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by + * each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * of the source Observable that emits the fewest items. + * + * @param o1 + * one source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + */ + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, + Func6 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, zipFunction)); + } + + /** + * Returns an Observable that emits the results of a function of your choosing applied to + * combinations of four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by + * each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * of the source Observable that emits the fewest items. + * + * @param o1 + * one source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { - return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, + Func7 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, zipFunction)); + } + + /** + * Returns an Observable that emits the results of a function of your choosing applied to + * combinations of four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by + * each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * of the source Observable that emits the fewest items. + * + * @param o1 + * one source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + */ + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, + Func8 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, zipFunction)); + } + + /** + * Returns an Observable that emits the results of a function of your choosing applied to + * combinations of four items emitted, in sequence, by four other Observables. + *

+ * + *

{@code zip} applies this function in strict sequence, so the first item emitted by the + * new Observable will be the result of the function applied to the first item emitted by {@code w0}, the first item emitted by {@code w1}, the first item emitted by {@code w2}, and the first item + * emitted by {@code w3}; the second item emitted by + * the new Observable will be the result of the function applied to the second item emitted by + * each of those Observables; and so forth. + *

+ * The resulting {@code Observable} returned from {@code zip} will invoke {@link Observer#onNext onNext} as many times as the number of {@code onNext} invocations + * of the source Observable that emits the fewest items. + * + * @param o1 + * one source Observable + * @param o2 + * a second source Observable + * @param o3 + * a third source Observable + * @param o4 + * a fourth source Observable + * @param o5 + * a fifth source Observable + * @param o6 + * a sixth source Observable + * @param o7 + * a seventh source Observable + * @param o8 + * an eighth source Observable + * @param o9 + * a ninth source Observable + * @param zipFunction + * a function that, when applied to an item emitted by each of the source + * Observables, results in an item that will be emitted by the resulting Observable + * @return an Observable that emits the zipped results + */ + public static Observable zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, + Observable o9, Func9 zipFunction) { + return create(OperationZip.zip(o1, o2, o3, o4, o5, o6, o7, o8, o9, zipFunction)); } /** @@ -1033,30 +1226,30 @@ public static Observable zip(Observable w0, *

* * - * @param w0 + * @param o1 * The first source observable. - * @param w1 + * @param o2 * The second source observable. * @param combineFunction * The aggregation function used to combine the source observable values. * @return An Observable that combines the source Observables with the given combine function */ - public static Observable combineLatest(Observable w0, Observable w1, Func2 combineFunction) { - return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction)); + public static Observable combineLatest(Observable o1, Observable o2, Func2 combineFunction) { + return create(OperationCombineLatest.combineLatest(o1, o2, combineFunction)); } /** * @see #combineLatest(Observable, Observable, Func2) */ - public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineFunction) { - return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction)); + public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Func3 combineFunction) { + return create(OperationCombineLatest.combineLatest(o1, o2, o3, combineFunction)); } /** * @see #combineLatest(Observable, Observable, Func2) */ - public static Observable combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineFunction) { - return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction)); + public static Observable combineLatest(Observable o1, Observable o2, Observable o3, Observable o4, Func4 combineFunction) { + return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, combineFunction)); } /** @@ -1307,7 +1500,7 @@ public Observable call(List> wsList) { * * @param ws * A collection of source Observables - * @param reduceFunction + * @param zipFunction * a function that, when applied to an item emitted by each of the source * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 54d5d64d03..c7bdd451bc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -35,6 +35,11 @@ import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; +import rx.util.functions.Func5; +import rx.util.functions.Func6; +import rx.util.functions.Func7; +import rx.util.functions.Func8; +import rx.util.functions.Func9; import rx.util.functions.FuncN; import rx.util.functions.Functions; @@ -54,34 +59,99 @@ */ public final class OperationZip { - public static Func1, Subscription> zip(Observable w0, Observable w1, Func2 zipFunction) { + public static Func1, Subscription> zip(Observable o1, Observable o2, Func2 zipFunction) { Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); - a.addObserver(new ZipObserver(a, w0)); - a.addObserver(new ZipObserver(a, w1)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); return a; } - public static Func1, Subscription> zip(Observable w0, Observable w1, Observable w2, Func3 zipFunction) { + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Func3 zipFunction) { Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); - a.addObserver(new ZipObserver(a, w0)); - a.addObserver(new ZipObserver(a, w1)); - a.addObserver(new ZipObserver(a, w2)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); return a; } - public static Func1, Subscription> zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 zipFunction) { + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Func4 zipFunction) { Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); - a.addObserver(new ZipObserver(a, w0)); - a.addObserver(new ZipObserver(a, w1)); - a.addObserver(new ZipObserver(a, w2)); - a.addObserver(new ZipObserver(a, w3)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + return a; + } + + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Func5 zipFunction) { + Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + a.addObserver(new ZipObserver(a, o5)); + return a; + } + + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, + Func6 zipFunction) { + Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + a.addObserver(new ZipObserver(a, o5)); + a.addObserver(new ZipObserver(a, o6)); + return a; + } + + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, + Func7 zipFunction) { + Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + a.addObserver(new ZipObserver(a, o5)); + a.addObserver(new ZipObserver(a, o6)); + a.addObserver(new ZipObserver(a, o7)); + return a; + } + + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, + Func8 zipFunction) { + Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + a.addObserver(new ZipObserver(a, o5)); + a.addObserver(new ZipObserver(a, o6)); + a.addObserver(new ZipObserver(a, o7)); + a.addObserver(new ZipObserver(a, o8)); + return a; + } + + public static Func1, Subscription> zip(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, + Observable o9, Func9 zipFunction) { + Aggregator a = new Aggregator(Functions.fromFunc(zipFunction)); + a.addObserver(new ZipObserver(a, o1)); + a.addObserver(new ZipObserver(a, o2)); + a.addObserver(new ZipObserver(a, o3)); + a.addObserver(new ZipObserver(a, o4)); + a.addObserver(new ZipObserver(a, o5)); + a.addObserver(new ZipObserver(a, o6)); + a.addObserver(new ZipObserver(a, o7)); + a.addObserver(new ZipObserver(a, o8)); + a.addObserver(new ZipObserver(a, o9)); return a; } @SuppressWarnings("unchecked") public static Func1, Subscription> zip(Collection> ws, FuncN zipFunction) { Aggregator a = new Aggregator(zipFunction); - for (@SuppressWarnings("rawtypes") Observable w : ws) { + for (@SuppressWarnings("rawtypes") + Observable w : ws) { ZipObserver zipObserver = new ZipObserver(a, w); a.addObserver(zipObserver); } @@ -155,7 +225,7 @@ public Aggregator(FuncN zipFunction) { /** * Receive notification of a Observer starting (meaning we should require it for aggregation) - * + * * Thread Safety => Invoke ONLY from the static factory methods at top of this class which are always an atomic execution by a single thread. * * @param w @@ -295,7 +365,7 @@ private void stop() { } public static class UnitTest { - + @SuppressWarnings("unchecked") @Test public void testCollectionSizeDifferentThanFunction() { diff --git a/rxjava-core/src/test/java/rx/CovarianceTest.java b/rxjava-core/src/test/java/rx/CovarianceTest.java index 2346b0804a..3d6db74783 100644 --- a/rxjava-core/src/test/java/rx/CovarianceTest.java +++ b/rxjava-core/src/test/java/rx/CovarianceTest.java @@ -27,7 +27,7 @@ public ExtendedResult call(Media m, Rating r) { } }; - Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { @Override public void call(Result t1) { @@ -36,7 +36,7 @@ public void call(Result t1) { }); - Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { @Override public void call(Result t1) { @@ -45,7 +45,7 @@ public void call(Result t1) { }); - Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { @Override public void call(ExtendedResult t1) { @@ -54,7 +54,7 @@ public void call(ExtendedResult t1) { }); - Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { @Override public void call(Result t1) { @@ -63,7 +63,7 @@ public void call(Result t1) { }); - Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { + Observable. zip(horrors, ratings, combine).toBlockingObservable().forEach(new Action1() { @Override public void call(Result t1) { @@ -72,7 +72,7 @@ public void call(Result t1) { }); - Observable. zip(horrors, ratings, combine); + Observable. zip(horrors, ratings, combine); }