Skip to content

Commit e5d9d9c

Browse files
Performance optimizations for dynamic function execution.
- memoize dynamic function constructions so executions are statically typed after a single dynamic lookup instead of dynamic on every execution
1 parent aba4cbb commit e5d9d9c

File tree

19 files changed

+491
-144
lines changed

19 files changed

+491
-144
lines changed

rxjava-core/src/main/java/rx/observables/Observable.java

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import rx.util.Func2;
5353
import rx.util.Func3;
5454
import rx.util.Func4;
55+
import rx.util.FuncN;
5556
import rx.util.Functions;
5657

5758
/**
@@ -301,7 +302,7 @@ private static void handleError(Exception e) {
301302
* @param args
302303
*/
303304
private void executeCallback(final Object callback, Object... args) {
304-
Functions.execute(callback, args);
305+
Functions.from(callback).call(args);
305306
}
306307

307308
/**
@@ -395,11 +396,13 @@ public static <T> Observable<T> create(Func1<Observer<T>, Subscription> func) {
395396
* @return a Observable that, when a Observer subscribes to it, will execute the given function
396397
*/
397398
public static <T> Observable<T> create(final Object callback) {
399+
@SuppressWarnings("rawtypes")
400+
final FuncN _f = Functions.from(callback);
398401
return create(new Func1<Observer<T>, Subscription>() {
399402

400403
@Override
401404
public Subscription call(Observer<T> t1) {
402-
return Functions.execute(callback, t1);
405+
return (Subscription) _f.call(t1);
403406
}
404407

405408
});
@@ -469,11 +472,13 @@ public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> pre
469472
* evaluates as true
470473
*/
471474
public static <T> Observable<T> filter(Observable<T> that, final Object function) {
475+
@SuppressWarnings("rawtypes")
476+
final FuncN _f = Functions.from(function);
472477
return filter(that, new Func1<T, Boolean>() {
473478

474479
@Override
475480
public Boolean call(T t1) {
476-
return Functions.execute(function, t1);
481+
return (Boolean) _f.call(t1);
477482

478483
}
479484

@@ -597,11 +602,14 @@ public static <T, R> Observable<R> map(Observable<T> sequence, Func1<T, R> func)
597602
* in the sequence emitted by the source Observable
598603
*/
599604
public static <T, R> Observable<R> map(Observable<T> sequence, final Object function) {
605+
@SuppressWarnings("rawtypes")
606+
final FuncN _f = Functions.from(function);
600607
return map(sequence, new Func1<T, R>() {
601608

609+
@SuppressWarnings("unchecked")
602610
@Override
603611
public R call(T t1) {
604-
return Functions.execute(function, t1);
612+
return (R) _f.call(t1);
605613
}
606614

607615
});
@@ -656,11 +664,14 @@ public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<T, Obse
656664
* Observables obtained from this transformation
657665
*/
658666
public static <T, R> Observable<R> mapMany(Observable<T> sequence, final Object function) {
667+
@SuppressWarnings("rawtypes")
668+
final FuncN _f = Functions.from(function);
659669
return mapMany(sequence, new Func1<T, R>() {
660670

671+
@SuppressWarnings("unchecked")
661672
@Override
662673
public R call(T t1) {
663-
return Functions.execute(function, t1);
674+
return (R) _f.call(t1);
664675
}
665676

666677
});
@@ -876,11 +887,14 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
876887
* @return the source Observable, with its behavior modified as described
877888
*/
878889
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Object resumeFunction) {
890+
@SuppressWarnings("rawtypes")
891+
final FuncN _f = Functions.from(resumeFunction);
879892
return onErrorResumeNext(that, new Func1<Exception, Observable<T>>() {
880893

894+
@SuppressWarnings("unchecked")
881895
@Override
882896
public Observable<T> call(Exception e) {
883-
return Functions.execute(resumeFunction, e);
897+
return (Observable<T>) _f.call(e);
884898
}
885899
});
886900
}
@@ -999,11 +1013,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> ac
9991013
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
10001014
*/
10011015
public static <T> Observable<T> reduce(final Observable<T> sequence, final Object accumulator) {
1016+
@SuppressWarnings("rawtypes")
1017+
final FuncN _f = Functions.from(accumulator);
10021018
return reduce(sequence, new Func2<T, T, T>() {
10031019

1020+
@SuppressWarnings("unchecked")
10041021
@Override
10051022
public T call(T t1, T t2) {
1006-
return Functions.execute(accumulator, t1, t2);
1023+
return (T) _f.call(t1, t2);
10071024
}
10081025

10091026
});
@@ -1071,11 +1088,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, F
10711088
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
10721089
*/
10731090
public static <T> Observable<T> reduce(final Observable<T> sequence, final T initialValue, final Object accumulator) {
1091+
@SuppressWarnings("rawtypes")
1092+
final FuncN _f = Functions.from(accumulator);
10741093
return reduce(sequence, initialValue, new Func2<T, T, T>() {
10751094

1095+
@SuppressWarnings("unchecked")
10761096
@Override
10771097
public T call(T t1, T t2) {
1078-
return Functions.execute(accumulator, t1, t2);
1098+
return (T) _f.call(t1, t2);
10791099
}
10801100

10811101
});
@@ -1126,11 +1146,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accu
11261146
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
11271147
*/
11281148
public static <T> Observable<T> scan(final Observable<T> sequence, final Object accumulator) {
1149+
@SuppressWarnings("rawtypes")
1150+
final FuncN _f = Functions.from(accumulator);
11291151
return scan(sequence, new Func2<T, T, T>() {
11301152

1153+
@SuppressWarnings("unchecked")
11311154
@Override
11321155
public T call(T t1, T t2) {
1133-
return Functions.execute(accumulator, t1, t2);
1156+
return (T) _f.call(t1, t2);
11341157
}
11351158

11361159
});
@@ -1185,11 +1208,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Fun
11851208
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
11861209
*/
11871210
public static <T> Observable<T> scan(final Observable<T> sequence, final T initialValue, final Object accumulator) {
1211+
@SuppressWarnings("rawtypes")
1212+
final FuncN _f = Functions.from(accumulator);
11881213
return scan(sequence, initialValue, new Func2<T, T, T>() {
11891214

1215+
@SuppressWarnings("unchecked")
11901216
@Override
11911217
public T call(T t1, T t2) {
1192-
return Functions.execute(accumulator, t1, t2);
1218+
return (T) _f.call(t1, t2);
11931219
}
11941220

11951221
});
@@ -1359,11 +1385,13 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
13591385
* @return
13601386
*/
13611387
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
1388+
@SuppressWarnings("rawtypes")
1389+
final FuncN _f = Functions.from(sortFunction);
13621390
return OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {
13631391

13641392
@Override
13651393
public Integer call(T t1, T t2) {
1366-
return Functions.execute(sortFunction, t1, t2);
1394+
return (Integer) _f.call(t1, t2);
13671395
}
13681396

13691397
});
@@ -1422,11 +1450,14 @@ public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1
14221450
* @return a Observable that emits the zipped results
14231451
*/
14241452
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, final Object function) {
1453+
@SuppressWarnings("rawtypes")
1454+
final FuncN _f = Functions.from(function);
14251455
return zip(w0, w1, new Func2<T0, T1, R>() {
14261456

1457+
@SuppressWarnings("unchecked")
14271458
@Override
14281459
public R call(T0 t0, T1 t1) {
1429-
return Functions.execute(function, t0, t1);
1460+
return (R) _f.call(t0, t1);
14301461
}
14311462

14321463
});
@@ -1493,11 +1524,14 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1
14931524
* @return a Observable that emits the zipped results
14941525
*/
14951526
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, final Object function) {
1527+
@SuppressWarnings("rawtypes")
1528+
final FuncN _f = Functions.from(function);
14961529
return zip(w0, w1, w2, new Func3<T0, T1, T2, R>() {
14971530

1531+
@SuppressWarnings("unchecked")
14981532
@Override
14991533
public R call(T0 t0, T1 t1, T2 t2) {
1500-
return Functions.execute(function, t0, t1, t2);
1534+
return (R) _f.call(t0, t1, t2);
15011535
}
15021536

15031537
});
@@ -1566,11 +1600,14 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
15661600
* @return a Observable that emits the zipped results
15671601
*/
15681602
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, final Object function) {
1603+
@SuppressWarnings("rawtypes")
1604+
final FuncN _f = Functions.from(function);
15691605
return zip(w0, w1, w2, w3, new Func4<T0, T1, T2, T3, R>() {
15701606

1607+
@SuppressWarnings("unchecked")
15711608
@Override
15721609
public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
1573-
return Functions.execute(function, t0, t1, t2, t3);
1610+
return (R) _f.call(t0, t1, t2, t3);
15741611
}
15751612

15761613
});
@@ -1588,7 +1625,7 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
15881625
* @return a Observable that emits only those items in the original Observable that the filter
15891626
* evaluates as <code>true</code>
15901627
*/
1591-
public Observable<T> filter(Func1<Boolean, T> predicate) {
1628+
public Observable<T> filter(Func1<T, Boolean> predicate) {
15921629
return filter(this, predicate);
15931630
}
15941631

@@ -1605,10 +1642,12 @@ public Observable<T> filter(Func1<Boolean, T> predicate) {
16051642
* evaluates as "true"
16061643
*/
16071644
public Observable<T> filter(final Object callback) {
1645+
@SuppressWarnings("rawtypes")
1646+
final FuncN _f = Functions.from(callback);
16081647
return filter(this, new Func1<T, Boolean>() {
16091648

16101649
public Boolean call(T t1) {
1611-
return Functions.execute(callback, t1);
1650+
return (Boolean) _f.call(t1);
16121651
}
16131652
});
16141653
}
@@ -1638,7 +1677,7 @@ public Observable<T> last() {
16381677
* @return a Observable that emits a sequence that is the result of applying the transformation
16391678
* closure to each item in the sequence emitted by the input Observable.
16401679
*/
1641-
public <R> Observable<R> map(Func1<R, T> func) {
1680+
public <R> Observable<R> map(Func1<T, R> func) {
16421681
return map(this, func);
16431682
}
16441683

@@ -1655,10 +1694,13 @@ public <R> Observable<R> map(Func1<R, T> func) {
16551694
* closure to each item in the sequence emitted by the input Observable.
16561695
*/
16571696
public <R> Observable<R> map(final Object callback) {
1697+
@SuppressWarnings("rawtypes")
1698+
final FuncN _f = Functions.from(callback);
16581699
return map(this, new Func1<T, R>() {
16591700

1701+
@SuppressWarnings("unchecked")
16601702
public R call(T t1) {
1661-
return Functions.execute(callback, t1);
1703+
return (R) _f.call(t1);
16621704
}
16631705
});
16641706
}
@@ -1698,10 +1740,13 @@ public <R> Observable<R> mapMany(Func1<T, Observable<R>> func) {
16981740
* Observables obtained from this transformation.
16991741
*/
17001742
public <R> Observable<R> mapMany(final Object callback) {
1743+
@SuppressWarnings("rawtypes")
1744+
final FuncN _f = Functions.from(callback);
17011745
return mapMany(this, new Func1<T, Observable<R>>() {
17021746

1747+
@SuppressWarnings("unchecked")
17031748
public Observable<R> call(T t1) {
1704-
return Functions.execute(callback, t1);
1749+
return (Observable<R>) _f.call(t1);
17051750
}
17061751
});
17071752
}
@@ -1771,10 +1816,13 @@ public Observable<T> onErrorResumeNext(final Func1<Exception, Observable<T>> res
17711816
* @return the original Observable with appropriately modified behavior
17721817
*/
17731818
public Observable<T> onErrorResumeNext(final Object resumeFunction) {
1819+
@SuppressWarnings("rawtypes")
1820+
final FuncN _f = Functions.from(resumeFunction);
17741821
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {
17751822

1823+
@SuppressWarnings("unchecked")
17761824
public Observable<T> call(Exception e) {
1777-
return Functions.execute(resumeFunction, e);
1825+
return (Observable<T>) _f.call(e);
17781826
}
17791827
});
17801828
}
@@ -1857,10 +1905,13 @@ public Observable<T> onErrorReturn(Func1<Exception, T> resumeFunction) {
18571905
* @return the original Observable with appropriately modified behavior
18581906
*/
18591907
public Observable<T> onErrorReturn(final Object resumeFunction) {
1908+
@SuppressWarnings("rawtypes")
1909+
final FuncN _f = Functions.from(resumeFunction);
18601910
return onErrorReturn(this, new Func1<Exception, T>() {
18611911

1912+
@SuppressWarnings("unchecked")
18621913
public T call(Exception e) {
1863-
return Functions.execute(resumeFunction, e);
1914+
return (T) _f.call(e);
18641915
}
18651916
});
18661917
}

rxjava-core/src/main/java/rx/observables/operations/AtomicObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
/* package */final class AtomicObserver<T> implements Observer<T> {
4848

4949
/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
50-
private static boolean allowMultiThreaded = true;
50+
private static boolean allowMultiThreaded = false;
5151
static {
5252
String v = System.getProperty("rx.onNext.multithreaded.enabled");
5353
if (v != null) {

rxjava-core/src/main/java/rx/util/Action0.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Action0 {
18+
public interface Action0 extends Function {
1919
public void call();
2020
}

rxjava-core/src/main/java/rx/util/Action1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Action1<T1> {
18+
public interface Action1<T1> extends Function {
1919
public void call(T1 t1);
2020
}

rxjava-core/src/main/java/rx/util/Action2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Action2<T1, T2> {
18+
public interface Action2<T1, T2> extends Function {
1919
public void call(T1 t1, T2 t2);
2020
}

rxjava-core/src/main/java/rx/util/Action3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Action3<T1, T2, T3> {
18+
public interface Action3<T1, T2, T3> extends Function {
1919
public void call(T1 t1, T2 t2, T3 t3);
2020
}

rxjava-core/src/main/java/rx/util/Func0.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Func0<R> {
18+
public interface Func0<R> extends Function {
1919
public R call();
2020
}

rxjava-core/src/main/java/rx/util/Func1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Func1<T1, R> {
18+
public interface Func1<T1, R> extends Function {
1919
public R call(T1 t1);
2020
}

rxjava-core/src/main/java/rx/util/Func2.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Func2<T1, T2, R> {
18+
public interface Func2<T1, T2, R> extends Function {
1919
public R call(T1 t1, T2 t2);
2020
}

rxjava-core/src/main/java/rx/util/Func3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@
1515
*/
1616
package rx.util;
1717

18-
public interface Func3<T1, T2, T3, R> {
18+
public interface Func3<T1, T2, T3, R> extends Function {
1919
public R call(T1 t1, T2 t2, T3 t3);
2020
}

0 commit comments

Comments
 (0)