From 852e870eb0892c1f9c1587d06b2aafaf90245e35 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Jan 2013 14:15:06 -0800 Subject: [PATCH 1/5] Performance optimizations. - memoize dynamic function constructions so executions are statically typed after a single dynamic lookup instead of dynamic on every execution - use chained observable to reduce composition costs (execute list of functions rather than compositionally invoking onNext) --- .../main/java/rx/observables/Observable.java | 163 +++++- .../operations/AtomicObserver.java | 2 +- .../src/main/java/rx/util/Action0.java | 2 +- .../src/main/java/rx/util/Action1.java | 2 +- .../src/main/java/rx/util/Action2.java | 2 +- .../src/main/java/rx/util/Action3.java | 2 +- rxjava-core/src/main/java/rx/util/Func0.java | 2 +- rxjava-core/src/main/java/rx/util/Func1.java | 2 +- rxjava-core/src/main/java/rx/util/Func2.java | 2 +- rxjava-core/src/main/java/rx/util/Func3.java | 2 +- rxjava-core/src/main/java/rx/util/Func4.java | 2 +- rxjava-core/src/main/java/rx/util/Func5.java | 2 +- rxjava-core/src/main/java/rx/util/Func6.java | 2 +- rxjava-core/src/main/java/rx/util/Func7.java | 2 +- rxjava-core/src/main/java/rx/util/Func8.java | 2 +- rxjava-core/src/main/java/rx/util/Func9.java | 2 +- rxjava-core/src/main/java/rx/util/FuncN.java | 2 +- .../src/main/java/rx/util/Function.java | 10 + .../src/main/java/rx/util/Functions.java | 500 ++++++++++++++---- 19 files changed, 561 insertions(+), 144 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/util/Function.java diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 9d41cef5d6..7e130e77fb 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Before; import org.junit.Test; @@ -52,6 +53,7 @@ import rx.util.Func2; import rx.util.Func3; import rx.util.Func4; +import rx.util.FuncN; import rx.util.Functions; /** @@ -301,7 +303,7 @@ private static void handleError(Exception e) { * @param args */ private void executeCallback(final Object callback, Object... args) { - Functions.execute(callback, args); + Functions.from(callback).call(args); } /** @@ -395,11 +397,13 @@ public static Observable create(Func1, Subscription> func) { * @return a Observable that, when a Observer subscribes to it, will execute the given function */ public static Observable create(final Object callback) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(callback); return create(new Func1, Subscription>() { @Override public Subscription call(Observer t1) { - return Functions.execute(callback, t1); + return (Subscription) _f.call(t1); } }); @@ -469,11 +473,13 @@ public static Observable filter(Observable that, Func1 pre * evaluates as true */ public static Observable filter(Observable that, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return filter(that, new Func1() { @Override public Boolean call(T t1) { - return Functions.execute(function, t1); + return (Boolean) _f.call(t1); } @@ -575,7 +581,9 @@ public static Observable last(final Observable that) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - return OperationMap.map(sequence, func); + // return OperationMap.map(sequence, func); + + return (Observable) ChainedObservable.chain(sequence).addFunction(func); } /** @@ -597,11 +605,13 @@ public static Observable map(Observable sequence, Func1 func) * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return map(sequence, new Func1() { @Override public R call(T t1) { - return Functions.execute(function, t1); + return (R) _f.call(t1); } }); @@ -656,11 +666,13 @@ public static Observable mapMany(Observable sequence, Func1 Observable mapMany(Observable sequence, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return mapMany(sequence, new Func1() { @Override public R call(T t1) { - return Functions.execute(function, t1); + return (R) _f.call(t1); } }); @@ -876,11 +888,14 @@ public static Observable onErrorResumeNext(final Observable that, fina * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Object resumeFunction) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(that, new Func1>() { + @SuppressWarnings("unchecked") @Override public Observable call(Exception e) { - return Functions.execute(resumeFunction, e); + return (Observable) _f.call(e); } }); } @@ -999,11 +1014,14 @@ public static Observable reduce(Observable sequence, Func2 ac * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(final Observable sequence, final Object accumulator) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(accumulator); return reduce(sequence, new Func2() { + @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { - return Functions.execute(accumulator, t1, t2); + return (T) _f.call(t1, t2); } }); @@ -1071,11 +1089,14 @@ public static Observable reduce(Observable sequence, T initialValue, F * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(accumulator); return reduce(sequence, initialValue, new Func2() { + @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { - return Functions.execute(accumulator, t1, t2); + return (T) _f.call(t1, t2); } }); @@ -1126,11 +1147,14 @@ public static Observable scan(Observable sequence, Func2 accu * @see MSDN: Observable.Scan */ public static Observable scan(final Observable sequence, final Object accumulator) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(accumulator); return scan(sequence, new Func2() { + @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { - return Functions.execute(accumulator, t1, t2); + return (T) _f.call(t1, t2); } }); @@ -1185,11 +1209,14 @@ public static Observable scan(Observable sequence, T initialValue, Fun * @see MSDN: Observable.Scan */ public static Observable scan(final Observable sequence, final T initialValue, final Object accumulator) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(accumulator); return scan(sequence, initialValue, new Func2() { + @SuppressWarnings("unchecked") @Override public T call(T t1, T t2) { - return Functions.execute(accumulator, t1, t2); + return (T) _f.call(t1, t2); } }); @@ -1359,11 +1386,13 @@ public static Observable> toSortedList(Observable sequence, Func2 * @return */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(sortFunction); return OperationToObservableSortedList.toSortedList(sequence, new Func2() { @Override public Integer call(T t1, T t2) { - return Functions.execute(sortFunction, t1, t2); + return (Integer) _f.call(t1, t2); } }); @@ -1422,11 +1451,14 @@ public static Observable zip(Observable w0, Observable w1 * @return a Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return zip(w0, w1, new Func2() { + @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1) { - return Functions.execute(function, t0, t1); + return (R) _f.call(t0, t1); } }); @@ -1493,11 +1525,14 @@ public static Observable zip(Observable w0, Observable Observable zip(Observable w0, Observable w1, Observable w2, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return zip(w0, w1, w2, new Func3() { + @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1, T2 t2) { - return Functions.execute(function, t0, t1, t2); + return (R) _f.call(t0, t1, t2); } }); @@ -1566,11 +1601,14 @@ public static Observable zip(Observable w0, Observabl * @return a Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, final Object function) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(function); return zip(w0, w1, w2, w3, new Func4() { + @SuppressWarnings("unchecked") @Override public R call(T0 t0, T1 t1, T2 t2, T3 t3) { - return Functions.execute(function, t0, t1, t2, t3); + return (R) _f.call(t0, t1, t2, t3); } }); @@ -1605,10 +1643,12 @@ public Observable filter(Func1 predicate) { * evaluates as "true" */ public Observable filter(final Object callback) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(callback); return filter(this, new Func1() { public Boolean call(T t1) { - return Functions.execute(callback, t1); + return (Boolean) _f.call(t1); } }); } @@ -1638,7 +1678,7 @@ public Observable last() { * @return a Observable that emits a sequence that is the result of applying the transformation * closure to each item in the sequence emitted by the input Observable. */ - public Observable map(Func1 func) { + public Observable map(Func1 func) { return map(this, func); } @@ -1655,10 +1695,13 @@ public Observable map(Func1 func) { * closure to each item in the sequence emitted by the input Observable. */ public Observable map(final Object callback) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(callback); return map(this, new Func1() { + @SuppressWarnings("unchecked") public R call(T t1) { - return Functions.execute(callback, t1); + return (R) _f.call(t1); } }); } @@ -1698,10 +1741,13 @@ public Observable mapMany(Func1> func) { * Observables obtained from this transformation. */ public Observable mapMany(final Object callback) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(callback); return mapMany(this, new Func1>() { + @SuppressWarnings("unchecked") public Observable call(T t1) { - return Functions.execute(callback, t1); + return (Observable) _f.call(t1); } }); } @@ -1771,10 +1817,13 @@ public Observable onErrorResumeNext(final Func1> res * @return the original Observable with appropriately modified behavior */ public Observable onErrorResumeNext(final Object resumeFunction) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, new Func1>() { + @SuppressWarnings("unchecked") public Observable call(Exception e) { - return Functions.execute(resumeFunction, e); + return (Observable) _f.call(e); } }); } @@ -1857,10 +1906,13 @@ public Observable onErrorReturn(Func1 resumeFunction) { * @return the original Observable with appropriately modified behavior */ public Observable onErrorReturn(final Object resumeFunction) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, new Func1() { + @SuppressWarnings("unchecked") public T call(Exception e) { - return Functions.execute(resumeFunction, e); + return (T) _f.call(e); } }); } @@ -2171,6 +2223,75 @@ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } + /** + * Used to chain functions together rather than compose them so we reduce the size of the stack and number of onNext calls. + * + * @param + */ + private static class ChainedObservable extends Observable { + + private final Observable actual; + // we use rawtypes as we can have a list of functions each converting to different types + @SuppressWarnings("rawtypes") + private final ConcurrentLinkedQueue functions; + + @SuppressWarnings({ "rawtypes" }) + private ChainedObservable(Observable actual) { + if (actual instanceof ChainedObservable) { + throw new IllegalStateException("You shouldn't wrap a ChainedObservable with another ChainedObservable"); + } else { + // or use the existing one + this.actual = actual; + this.functions = new ConcurrentLinkedQueue(); + } + } + + public static ChainedObservable chain(Observable observable) { + if (observable instanceof ChainedObservable) { + return (ChainedObservable) observable; + } else { + return new ChainedObservable(observable); + } + } + + public ChainedObservable addFunction(Object function) { + // get an implementation of FuncN to memoize the function and then + // add to the functions list to be applied when this observable it subscribed to + this.functions.add(Functions.from(function)); + return this; + } + + @Override + public Subscription subscribe(final Observer observer) { + return actual.subscribe(new Observer() { + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void onNext(T v) { + // perform all functions + Object value = (Integer) v; + for (FuncN f : functions) { + value = f.call(value); + } + // now that we've performed all functions pass the resulting value to the actual observer + observer.onNext((T) value); + } + + }); + } + + } + public static class UnitTest { @Mock diff --git a/rxjava-core/src/main/java/rx/observables/operations/AtomicObserver.java b/rxjava-core/src/main/java/rx/observables/operations/AtomicObserver.java index 40ec4f09ae..e99ca40745 100644 --- a/rxjava-core/src/main/java/rx/observables/operations/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/observables/operations/AtomicObserver.java @@ -47,7 +47,7 @@ /* package */final class AtomicObserver implements Observer { /** Allow changing between forcing single or allowing multi-threaded execution of onNext */ - private static boolean allowMultiThreaded = true; + private static boolean allowMultiThreaded = false; static { String v = System.getProperty("rx.onNext.multithreaded.enabled"); if (v != null) { diff --git a/rxjava-core/src/main/java/rx/util/Action0.java b/rxjava-core/src/main/java/rx/util/Action0.java index b8cc6d9c99..7d3693fc03 100644 --- a/rxjava-core/src/main/java/rx/util/Action0.java +++ b/rxjava-core/src/main/java/rx/util/Action0.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Action0 { +public interface Action0 extends Function { public void call(); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Action1.java b/rxjava-core/src/main/java/rx/util/Action1.java index efdc4daa44..13ac3923cd 100644 --- a/rxjava-core/src/main/java/rx/util/Action1.java +++ b/rxjava-core/src/main/java/rx/util/Action1.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Action1 { +public interface Action1 extends Function { public void call(T1 t1); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Action2.java b/rxjava-core/src/main/java/rx/util/Action2.java index 21461f7cb5..09706aa3d1 100644 --- a/rxjava-core/src/main/java/rx/util/Action2.java +++ b/rxjava-core/src/main/java/rx/util/Action2.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Action2 { +public interface Action2 extends Function { public void call(T1 t1, T2 t2); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Action3.java b/rxjava-core/src/main/java/rx/util/Action3.java index 6630b124ba..a59c461412 100644 --- a/rxjava-core/src/main/java/rx/util/Action3.java +++ b/rxjava-core/src/main/java/rx/util/Action3.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Action3 { +public interface Action3 extends Function { public void call(T1 t1, T2 t2, T3 t3); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func0.java b/rxjava-core/src/main/java/rx/util/Func0.java index 1934902c91..f224d6b5e6 100644 --- a/rxjava-core/src/main/java/rx/util/Func0.java +++ b/rxjava-core/src/main/java/rx/util/Func0.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func0 { +public interface Func0 extends Function { public R call(); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func1.java b/rxjava-core/src/main/java/rx/util/Func1.java index ae67de59c5..ee01451df8 100644 --- a/rxjava-core/src/main/java/rx/util/Func1.java +++ b/rxjava-core/src/main/java/rx/util/Func1.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func1 { +public interface Func1 extends Function { public R call(T1 t1); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func2.java b/rxjava-core/src/main/java/rx/util/Func2.java index 5c9fa7936d..7969722ab6 100644 --- a/rxjava-core/src/main/java/rx/util/Func2.java +++ b/rxjava-core/src/main/java/rx/util/Func2.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func2 { +public interface Func2 extends Function { public R call(T1 t1, T2 t2); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func3.java b/rxjava-core/src/main/java/rx/util/Func3.java index 8147d35706..10197d825a 100644 --- a/rxjava-core/src/main/java/rx/util/Func3.java +++ b/rxjava-core/src/main/java/rx/util/Func3.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func3 { +public interface Func3 extends Function { public R call(T1 t1, T2 t2, T3 t3); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func4.java b/rxjava-core/src/main/java/rx/util/Func4.java index 2a4df7212b..2c4c408c8a 100644 --- a/rxjava-core/src/main/java/rx/util/Func4.java +++ b/rxjava-core/src/main/java/rx/util/Func4.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func4 { +public interface Func4 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func5.java b/rxjava-core/src/main/java/rx/util/Func5.java index 888334f4be..854d530ca4 100644 --- a/rxjava-core/src/main/java/rx/util/Func5.java +++ b/rxjava-core/src/main/java/rx/util/Func5.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func5 { +public interface Func5 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func6.java b/rxjava-core/src/main/java/rx/util/Func6.java index 1e036a5aff..a5a682c48b 100644 --- a/rxjava-core/src/main/java/rx/util/Func6.java +++ b/rxjava-core/src/main/java/rx/util/Func6.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func6 { +public interface Func6 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func7.java b/rxjava-core/src/main/java/rx/util/Func7.java index 6c6559013a..47b5f7f5aa 100644 --- a/rxjava-core/src/main/java/rx/util/Func7.java +++ b/rxjava-core/src/main/java/rx/util/Func7.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func7 { +public interface Func7 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func8.java b/rxjava-core/src/main/java/rx/util/Func8.java index 2263823a7b..533ec972c9 100644 --- a/rxjava-core/src/main/java/rx/util/Func8.java +++ b/rxjava-core/src/main/java/rx/util/Func8.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func8 { +public interface Func8 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Func9.java b/rxjava-core/src/main/java/rx/util/Func9.java index 2de318b122..af362640fa 100644 --- a/rxjava-core/src/main/java/rx/util/Func9.java +++ b/rxjava-core/src/main/java/rx/util/Func9.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface Func9 { +public interface Func9 extends Function { public R call(T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/FuncN.java b/rxjava-core/src/main/java/rx/util/FuncN.java index b1cedff38e..c94bb140cf 100644 --- a/rxjava-core/src/main/java/rx/util/FuncN.java +++ b/rxjava-core/src/main/java/rx/util/FuncN.java @@ -15,6 +15,6 @@ */ package rx.util; -public interface FuncN { +public interface FuncN extends Function { public R call(Object... args); } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/Function.java b/rxjava-core/src/main/java/rx/util/Function.java new file mode 100644 index 0000000000..5ec8206d98 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/Function.java @@ -0,0 +1,10 @@ +package rx.util; + +/** + * All Func and Action interfaces extend from this. + *

+ * Marker interface to allow isntanceof checks. + */ +public interface Function { + +} diff --git a/rxjava-core/src/main/java/rx/util/Functions.java b/rxjava-core/src/main/java/rx/util/Functions.java index 33c89dc350..9cb9da15fc 100644 --- a/rxjava-core/src/main/java/rx/util/Functions.java +++ b/rxjava-core/src/main/java/rx/util/Functions.java @@ -79,23 +79,17 @@ public static Collection getRegisteredLanguageAdaptors( * @param function * @param args */ - @SuppressWarnings("unchecked") - public static R execute(Object function, Object... args) { - // if we have a tracer then log the start - long startTime = -1; - if (tracer != null && tracer.isTraceEnabled()) { - try { - startTime = System.nanoTime(); - tracer.traceStart(function, args); - } catch (Exception e) { - logger.warn("Failed to trace log.", e); - } + @SuppressWarnings({ "rawtypes" }) + public static FuncN from(final Object function) { + if (function == null) { + throw new RuntimeException("function is null. Can't send arguments to null function."); } - // perform controller logic to determine what type of function we received and execute it - try { - if (function == null) { - throw new RuntimeException("function is null. Can't send arguments to null function."); - } + + /* check for typed Rx Function implementation first */ + if (function instanceof Function) { + return fromFunction((Function) function); + } else { + /* not an Rx Function so try language adaptors */ /* * TODO the following code needs to be evaluated for performance @@ -104,98 +98,219 @@ public static R execute(Object function, Object... args) { */ // check for language adaptor - for (@SuppressWarnings("rawtypes") - Class c : languageAdaptors.keySet()) { + for (final Class c : languageAdaptors.keySet()) { if (c.isInstance(function)) { - // found the language adaptor so execute - return (R) languageAdaptors.get(c).call(function, args); + final FunctionLanguageAdaptor la = languageAdaptors.get(c); + // found the language adaptor so wrap in FuncN and return + return new FuncN() { + + @Override + public Object call(Object... args) { + return la.call(function, args); + } + + }; } } // no language adaptor found + } + + // no support found + throw new RuntimeException("Unsupported closure type: " + function.getClass().getSimpleName()); + } + + // + // @SuppressWarnings("unchecked") + // private static R executionRxFunction(Function function, Object... args) { + // // check Func* classes + // if (function instanceof Func0) { + // Func0 f = (Func0) function; + // if (args.length != 0) { + // throw new RuntimeException("The closure was Func0 and expected no arguments, but we received: " + args.length); + // } + // return (R) f.call(); + // } else if (function instanceof Func1) { + // Func1 f = (Func1) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func1 and expected 1 argument, but we received: " + args.length); + // } + // return f.call(args[0]); + // } else if (function instanceof Func2) { + // Func2 f = (Func2) function; + // if (args.length != 2) { + // throw new RuntimeException("The closure was Func2 and expected 2 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1]); + // } else if (function instanceof Func3) { + // Func3 f = (Func3) function; + // if (args.length != 3) { + // throw new RuntimeException("The closure was Func3 and expected 3 arguments, but we received: " + args.length); + // } + // return (R) f.call(args[0], args[1], args[2]); + // } else if (function instanceof Func4) { + // Func4 f = (Func4) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func4 and expected 4 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3]); + // } else if (function instanceof Func5) { + // Func5 f = (Func5) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func5 and expected 5 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3], args[4]); + // } else if (function instanceof Func6) { + // Func6 f = (Func6) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func6 and expected 6 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3], args[4], args[5]); + // } else if (function instanceof Func7) { + // Func7 f = (Func7) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func7 and expected 7 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3], args[4], args[5], args[6]); + // } else if (function instanceof Func8) { + // Func8 f = (Func8) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func8 and expected 8 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]); + // } else if (function instanceof Func9) { + // Func9 f = (Func9) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Func9 and expected 9 arguments, but we received: " + args.length); + // } + // return f.call(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8]); + // } else if (function instanceof FuncN) { + // FuncN f = (FuncN) function; + // return f.call(args); + // } else if (function instanceof Action0) { + // Action0 f = (Action0) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Action0 and expected 0 arguments, but we received: " + args.length); + // } + // f.call(); + // return null; + // } else if (function instanceof Action1) { + // Action1 f = (Action1) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Action1 and expected 1 argument, but we received: " + args.length); + // } + // f.call(args[0]); + // return null; + // } else if (function instanceof Action2) { + // Action2 f = (Action2) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Action2 and expected 2 argument, but we received: " + args.length); + // } + // f.call(args[0], args[1]); + // return null; + // } else if (function instanceof Action3) { + // Action3 f = (Action3) function; + // if (args.length != 1) { + // throw new RuntimeException("The closure was Action1 and expected 1 argument, but we received: " + args.length); + // } + // f.call(args[0], args[1], args[2]); + // return null; + // } + // + // throw new RuntimeException("Unknown implementation of Function: " + function.getClass().getSimpleName()); + // } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private static FuncN fromFunction(Function function) { + // check Func* classes + if (function instanceof Func0) { + return fromFunc((Func0) function); + } else if (function instanceof Func1) { + return fromFunc((Func1) function); + } else if (function instanceof Func2) { + return fromFunc((Func2) function); + } else if (function instanceof Func3) { + return fromFunc((Func3) function); + } else if (function instanceof Func4) { + return fromFunc((Func4) function); + } else if (function instanceof Func5) { + return fromFunc((Func5) function); + } else if (function instanceof Func6) { + return fromFunc((Func6) function); + } else if (function instanceof Func7) { + return fromFunc((Func7) function); + } else if (function instanceof Func8) { + return fromFunc((Func8) function); + } else if (function instanceof Func9) { + return fromFunc((Func9) function); + } else if (function instanceof FuncN) { + return (FuncN) function; + } else if (function instanceof Action0) { + return fromAction((Action0) function); + } else if (function instanceof Action1) { + return fromAction((Action1) function); + } else if (function instanceof Action2) { + return fromAction((Action2) function); + } else if (function instanceof Action3) { + return fromAction((Action3) function); + } - // check Func* classes - if (function instanceof Func0) { - Func0 f = (Func0) function; + throw new RuntimeException("Unknown implementation of Function: " + function.getClass().getSimpleName()); + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func0 f) { + return new FuncN() { + + @Override + public R call(Object... args) { if (args.length != 0) { - throw new RuntimeException("The closure was Func0 and expected no arguments, but we received: " + args.length); - } - return (R) f.call(); - } else if (function instanceof Func1) { - Func1 f = (Func1) function; - if (args.length != 1) { - throw new RuntimeException("The closure was Func1 and expected 1 argument, but we received: " + args.length); - } - return f.call(args[0]); - } else if (function instanceof Func2) { - Func2 f = (Func2) function; - if (args.length != 2) { - throw new RuntimeException("The closure was Func2 and expected 2 arguments, but we received: " + args.length); + throw new RuntimeException("Func0 expecting 0 arguments."); } - return f.call(args[0], args[1]); - } else if (function instanceof Func3) { - Func3 f = (Func3) function; - if (args.length != 3) { - throw new RuntimeException("The closure was Func3 and expected 3 arguments, but we received: " + args.length); - } - return (R) f.call(args[0], args[1], args[2]); - } else if (function instanceof Func4) { - Func4 f = (Func4) function; - if (args.length != 1) { - throw new RuntimeException("The closure was Func4 and expected 4 arguments, but we received: " + args.length); - } - return f.call(args[0], args[1], args[2], args[3]); - } else if (function instanceof FuncN) { - FuncN f = (FuncN) function; - return f.call(args); + return f.call(); } - // no support found - throw new RuntimeException("Unsupported closure type: " + function.getClass().getSimpleName()); - } finally { - // if we have a tracer then log the end - if (tracer != null && tracer.isTraceEnabled()) { - try { - tracer.traceEnd(startTime, System.nanoTime(), function, args); - } catch (Exception e) { - logger.warn("Failed to trace log.", e); - } - } - } + }; } + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ public static FuncN fromFunc(final Func1 f) { return new FuncN() { - /** - * If it can't cast to this it should throw an exception as that means code is using this wrong. - *

- * We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked - */ @SuppressWarnings("unchecked") @Override public R call(Object... args) { - if (args.length == 0) { - return f.call(null); - } else { - return f.call((T0) args[0]); + if (args.length != 1) { + throw new RuntimeException("Func1 expecting 1 argument."); } + return f.call((T0) args[0]); } }; } + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ public static FuncN fromFunc(final Func2 f) { return new FuncN() { - /** - * If it can't cast to this it should throw an exception as that means code is using this wrong. - *

- * We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked - */ @SuppressWarnings("unchecked") @Override public R call(Object... args) { - if (args.length < 2) { + if (args.length != 2) { throw new RuntimeException("Func2 expecting 2 arguments."); } return f.call((T0) args[0], (T1) args[1]); @@ -204,18 +319,19 @@ public R call(Object... args) { }; } + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ public static FuncN fromFunc(final Func3 f) { return new FuncN() { - /** - * If it can't cast to this it should throw an exception as that means code is using this wrong. - *

- * We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked - */ @SuppressWarnings("unchecked") @Override public R call(Object... args) { - if (args.length < 3) { + if (args.length != 3) { throw new RuntimeException("Func3 expecting 3 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2]); @@ -224,18 +340,19 @@ public R call(Object... args) { }; } + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ public static FuncN fromFunc(final Func4 f) { return new FuncN() { - /** - * If it can't cast to this it should throw an exception as that means code is using this wrong. - *

- * We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked - */ @SuppressWarnings("unchecked") @Override public R call(Object... args) { - if (args.length < 4) { + if (args.length != 4) { throw new RuntimeException("Func4 expecting 4 arguments."); } return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3]); @@ -244,27 +361,196 @@ public R call(Object... args) { }; } - private static volatile FunctionTraceLogger tracer = null; + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func5 f) { + return new FuncN() { - public static interface FunctionTraceLogger { - public boolean isTraceEnabled(); + @SuppressWarnings("unchecked") + @Override + public R call(Object... args) { + if (args.length != 5) { + throw new RuntimeException("Func5 expecting 5 arguments."); + } + return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4]); + } - public void traceStart(Object closure, Object... args); + }; + } - /** - * - * @param start - * nanoTime - * @param end - * nanoTime - * @param closure - * @param args - */ - public void traceEnd(long start, long end, Object closure, Object... args); + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func6 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public R call(Object... args) { + if (args.length != 6) { + throw new RuntimeException("Func6 expecting 6 arguments."); + } + return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5]); + } + + }; } - public static void registerTraceLogger(FunctionTraceLogger tracer) { - Functions.tracer = tracer; + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func7 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public R call(Object... args) { + if (args.length != 7) { + throw new RuntimeException("Func7 expecting 7 arguments."); + } + return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6]); + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func8 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public R call(Object... args) { + if (args.length != 8) { + throw new RuntimeException("Func8 expecting 8 arguments."); + } + return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6], (T7) args[7]); + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromFunc(final Func9 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public R call(Object... args) { + if (args.length != 9) { + throw new RuntimeException("Func9 expecting 9 arguments."); + } + return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3], (T4) args[4], (T5) args[5], (T6) args[6], (T7) args[7], (T8) args[8]); + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromAction(final Action0 f) { + return new FuncN() { + + @Override + public Void call(Object... args) { + if (args.length != 0) { + throw new RuntimeException("Action0 expecting 0 arguments."); + } + f.call(); + return null; + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromAction(final Action1 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public Void call(Object... args) { + if (args.length != 1) { + throw new RuntimeException("Action1 expecting 1 argument."); + } + f.call((T0) args[0]); + return null; + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromAction(final Action2 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public Void call(Object... args) { + if (args.length != 2) { + throw new RuntimeException("Action3 expecting 2 arguments."); + } + f.call((T0) args[0], (T1) args[1]); + return null; + } + + }; + } + + /** + * Convert a function to FuncN to allow heterogeneous handling of functions with different arities. + * + * @param f + * @return {@link FuncN} + */ + public static FuncN fromAction(final Action3 f) { + return new FuncN() { + + @SuppressWarnings("unchecked") + @Override + public Void call(Object... args) { + if (args.length != 3) { + throw new RuntimeException("Action3 expecting 3 arguments."); + } + f.call((T0) args[0], (T1) args[1], (T2) args[2]); + return null; + } + + }; } } From ed722adf2afaee1b76e7035ef2bb5661a91f668e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Jan 2013 16:13:00 -0800 Subject: [PATCH 2/5] predicate order was wrong after last round of refactoring --- rxjava-core/src/main/java/rx/observables/Observable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 7e130e77fb..3ed31a19f0 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -1626,7 +1626,7 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) { * @return a Observable that emits only those items in the original Observable that the filter * evaluates as true */ - public Observable filter(Func1 predicate) { + public Observable filter(Func1 predicate) { return filter(this, predicate); } From 79b0d583964037330736e6ffa84e4e3df82b768d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Jan 2013 16:13:25 -0800 Subject: [PATCH 3/5] disable chaining --- rxjava-core/src/main/java/rx/observables/Observable.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 3ed31a19f0..df5f8df0c8 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -581,9 +581,9 @@ public static Observable last(final Observable that) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - // return OperationMap.map(sequence, func); + return OperationMap.map(sequence, func); - return (Observable) ChainedObservable.chain(sequence).addFunction(func); +// return (Observable) ChainedObservable.chain(sequence).addFunction(func); } /** From 937bdabeef962b626bc7eb4d206f6202483b6069 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Jan 2013 18:55:20 -0800 Subject: [PATCH 4/5] =?UTF-8?q?Remove=20ChainedObservable=20prototype=20?= =?UTF-8?q?=E2=80=A6=20leave=20just=20the=20function=20optimizations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/rx/observables/Observable.java | 74 +------------------ 1 file changed, 1 insertion(+), 73 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index df5f8df0c8..711cdeaea1 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -22,7 +22,6 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Before; import org.junit.Test; @@ -581,9 +580,7 @@ public static Observable last(final Observable that) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - return OperationMap.map(sequence, func); - -// return (Observable) ChainedObservable.chain(sequence).addFunction(func); + return OperationMap.map(sequence, func); } /** @@ -2223,75 +2220,6 @@ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } - /** - * Used to chain functions together rather than compose them so we reduce the size of the stack and number of onNext calls. - * - * @param - */ - private static class ChainedObservable extends Observable { - - private final Observable actual; - // we use rawtypes as we can have a list of functions each converting to different types - @SuppressWarnings("rawtypes") - private final ConcurrentLinkedQueue functions; - - @SuppressWarnings({ "rawtypes" }) - private ChainedObservable(Observable actual) { - if (actual instanceof ChainedObservable) { - throw new IllegalStateException("You shouldn't wrap a ChainedObservable with another ChainedObservable"); - } else { - // or use the existing one - this.actual = actual; - this.functions = new ConcurrentLinkedQueue(); - } - } - - public static ChainedObservable chain(Observable observable) { - if (observable instanceof ChainedObservable) { - return (ChainedObservable) observable; - } else { - return new ChainedObservable(observable); - } - } - - public ChainedObservable addFunction(Object function) { - // get an implementation of FuncN to memoize the function and then - // add to the functions list to be applied when this observable it subscribed to - this.functions.add(Functions.from(function)); - return this; - } - - @Override - public Subscription subscribe(final Observer observer) { - return actual.subscribe(new Observer() { - - @Override - public void onCompleted() { - observer.onCompleted(); - } - - @Override - public void onError(Exception e) { - observer.onError(e); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void onNext(T v) { - // perform all functions - Object value = (Integer) v; - for (FuncN f : functions) { - value = f.call(value); - } - // now that we've performed all functions pass the resulting value to the actual observer - observer.onNext((T) value); - } - - }); - } - - } - public static class UnitTest { @Mock From 55aa7fa59f3fb173649c6323fb4e78d0a20a3541 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 21 Jan 2013 18:56:36 -0800 Subject: [PATCH 5/5] suppress warnings --- rxjava-core/src/main/java/rx/observables/Observable.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 711cdeaea1..18918e24d9 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -606,6 +606,7 @@ public static Observable map(Observable sequence, final Object func final FuncN _f = Functions.from(function); return map(sequence, new Func1() { + @SuppressWarnings("unchecked") @Override public R call(T t1) { return (R) _f.call(t1); @@ -667,6 +668,7 @@ public static Observable mapMany(Observable sequence, final Object final FuncN _f = Functions.from(function); return mapMany(sequence, new Func1() { + @SuppressWarnings("unchecked") @Override public R call(T t1) { return (R) _f.call(t1);