Skip to content

Performance optimizations. #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 72 additions & 21 deletions rxjava-core/src/main/java/rx/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import rx.util.Func2;
import rx.util.Func3;
import rx.util.Func4;
import rx.util.FuncN;
import rx.util.Functions;

/**
Expand Down Expand Up @@ -301,7 +302,7 @@ private static void handleError(Exception e) {
* @param args
*/
private void executeCallback(final Object callback, Object... args) {
Functions.execute(callback, args);
Functions.from(callback).call(args);
}

/**
Expand Down Expand Up @@ -395,11 +396,13 @@ public static <T> Observable<T> create(Func1<Observer<T>, Subscription> func) {
* @return a Observable that, when a Observer subscribes to it, will execute the given function
*/
public static <T> Observable<T> create(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return create(new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(Observer<T> t1) {
return Functions.execute(callback, t1);
return (Subscription) _f.call(t1);
}

});
Expand Down Expand Up @@ -469,11 +472,13 @@ public static <T> Observable<T> filter(Observable<T> that, Func1<T, Boolean> pre
* evaluates as true
*/
public static <T> Observable<T> filter(Observable<T> that, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return filter(that, new Func1<T, Boolean>() {

@Override
public Boolean call(T t1) {
return Functions.execute(function, t1);
return (Boolean) _f.call(t1);

}

Expand Down Expand Up @@ -597,11 +602,14 @@ public static <T, R> Observable<R> map(Observable<T> sequence, Func1<T, R> func)
* in the sequence emitted by the source Observable
*/
public static <T, R> Observable<R> map(Observable<T> sequence, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return map(sequence, new Func1<T, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return Functions.execute(function, t1);
return (R) _f.call(t1);
}

});
Expand Down Expand Up @@ -656,11 +664,14 @@ public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<T, Obse
* Observables obtained from this transformation
*/
public static <T, R> Observable<R> mapMany(Observable<T> sequence, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return mapMany(sequence, new Func1<T, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T t1) {
return Functions.execute(function, t1);
return (R) _f.call(t1);
}

});
Expand Down Expand Up @@ -876,11 +887,14 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(that, new Func1<Exception, Observable<T>>() {

@SuppressWarnings("unchecked")
@Override
public Observable<T> call(Exception e) {
return Functions.execute(resumeFunction, e);
return (Observable<T>) _f.call(e);
}
});
}
Expand Down Expand Up @@ -999,11 +1013,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> ac
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(final Observable<T> sequence, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1071,11 +1088,14 @@ public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, F
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(final Observable<T> sequence, final T initialValue, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return reduce(sequence, initialValue, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1126,11 +1146,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accu
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(final Observable<T> sequence, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return scan(sequence, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1185,11 +1208,14 @@ public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Fun
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(final Observable<T> sequence, final T initialValue, final Object accumulator) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(accumulator);
return scan(sequence, initialValue, new Func2<T, T, T>() {

@SuppressWarnings("unchecked")
@Override
public T call(T t1, T t2) {
return Functions.execute(accumulator, t1, t2);
return (T) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1359,11 +1385,13 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(sortFunction);
return OperationToObservableSortedList.toSortedList(sequence, new Func2<T, T, Integer>() {

@Override
public Integer call(T t1, T t2) {
return Functions.execute(sortFunction, t1, t2);
return (Integer) _f.call(t1, t2);
}

});
Expand Down Expand Up @@ -1422,11 +1450,14 @@ public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, new Func2<T0, T1, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1) {
return Functions.execute(function, t0, t1);
return (R) _f.call(t0, t1);
}

});
Expand Down Expand Up @@ -1493,11 +1524,14 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, w2, new Func3<T0, T1, T2, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1, T2 t2) {
return Functions.execute(function, t0, t1, t2);
return (R) _f.call(t0, t1, t2);
}

});
Expand Down Expand Up @@ -1566,11 +1600,14 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, final Object function) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(function);
return zip(w0, w1, w2, w3, new Func4<T0, T1, T2, T3, R>() {

@SuppressWarnings("unchecked")
@Override
public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
return Functions.execute(function, t0, t1, t2, t3);
return (R) _f.call(t0, t1, t2, t3);
}

});
Expand All @@ -1588,7 +1625,7 @@ public R call(T0 t0, T1 t1, T2 t2, T3 t3) {
* @return a Observable that emits only those items in the original Observable that the filter
* evaluates as <code>true</code>
*/
public Observable<T> filter(Func1<Boolean, T> predicate) {
public Observable<T> filter(Func1<T, Boolean> predicate) {
return filter(this, predicate);
}

Expand All @@ -1605,10 +1642,12 @@ public Observable<T> filter(Func1<Boolean, T> predicate) {
* evaluates as "true"
*/
public Observable<T> filter(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return filter(this, new Func1<T, Boolean>() {

public Boolean call(T t1) {
return Functions.execute(callback, t1);
return (Boolean) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1638,7 +1677,7 @@ public Observable<T> last() {
* @return a Observable that emits a sequence that is the result of applying the transformation
* closure to each item in the sequence emitted by the input Observable.
*/
public <R> Observable<R> map(Func1<R, T> func) {
public <R> Observable<R> map(Func1<T, R> func) {
return map(this, func);
}

Expand All @@ -1655,10 +1694,13 @@ public <R> Observable<R> map(Func1<R, T> func) {
* closure to each item in the sequence emitted by the input Observable.
*/
public <R> Observable<R> map(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return map(this, new Func1<T, R>() {

@SuppressWarnings("unchecked")
public R call(T t1) {
return Functions.execute(callback, t1);
return (R) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1698,10 +1740,13 @@ public <R> Observable<R> mapMany(Func1<T, Observable<R>> func) {
* Observables obtained from this transformation.
*/
public <R> Observable<R> mapMany(final Object callback) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(callback);
return mapMany(this, new Func1<T, Observable<R>>() {

@SuppressWarnings("unchecked")
public Observable<R> call(T t1) {
return Functions.execute(callback, t1);
return (Observable<R>) _f.call(t1);
}
});
}
Expand Down Expand Up @@ -1771,10 +1816,13 @@ public Observable<T> onErrorResumeNext(final Func1<Exception, Observable<T>> res
* @return the original Observable with appropriately modified behavior
*/
public Observable<T> onErrorResumeNext(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {

@SuppressWarnings("unchecked")
public Observable<T> call(Exception e) {
return Functions.execute(resumeFunction, e);
return (Observable<T>) _f.call(e);
}
});
}
Expand Down Expand Up @@ -1857,10 +1905,13 @@ public Observable<T> onErrorReturn(Func1<Exception, T> resumeFunction) {
* @return the original Observable with appropriately modified behavior
*/
public Observable<T> onErrorReturn(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorReturn(this, new Func1<Exception, T>() {

@SuppressWarnings("unchecked")
public T call(Exception e) {
return Functions.execute(resumeFunction, e);
return (T) _f.call(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
/* package */final class AtomicObserver<T> implements Observer<T> {

/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
private static boolean allowMultiThreaded = true;
private static boolean allowMultiThreaded = false;
static {
String v = System.getProperty("rx.onNext.multithreaded.enabled");
if (v != null) {
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action0.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action0 {
public interface Action0 extends Function {
public void call();
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action1.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action1<T1> {
public interface Action1<T1> extends Function {
public void call(T1 t1);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action2<T1, T2> {
public interface Action2<T1, T2> extends Function {
public void call(T1 t1, T2 t2);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Action3.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Action3<T1, T2, T3> {
public interface Action3<T1, T2, T3> extends Function {
public void call(T1 t1, T2 t2, T3 t3);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func0.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func0<R> {
public interface Func0<R> extends Function {
public R call();
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func1.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func1<T1, R> {
public interface Func1<T1, R> extends Function {
public R call(T1 t1);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func2.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func2<T1, T2, R> {
public interface Func2<T1, T2, R> extends Function {
public R call(T1 t1, T2 t2);
}
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/util/Func3.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@
*/
package rx.util;

public interface Func3<T1, T2, T3, R> {
public interface Func3<T1, T2, T3, R> extends Function {
public R call(T1 t1, T2 t2, T3 t3);
}
Loading