Skip to content

Change Lift to use rx.Observable.Operator #857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package rx.operators;

import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.plugins.DebugNotification;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.operators.DebugSubscriber;
import rx.operators.Operator;
import rx.util.functions.Action1;
import rx.util.functions.Actions;
import rx.util.functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@

import rx.Notification;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.observers.SafeSubscriber;
import rx.operators.DebugSubscriber;
import rx.operators.Operator;
import rx.plugins.DebugNotification.Kind;

public class DebugNotification<T> {
public static enum Kind {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import rx.Notification;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Observer;
import rx.observers.SafeSubscriber;
import rx.operators.DebugSubscriber;
import rx.operators.Operator;

public class NotificationEvent<T> {
public static enum Kind {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.operators.Operator;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

Expand Down
24 changes: 16 additions & 8 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationUsing;
import rx.operators.OperationWindow;
import rx.operators.Operator;
import rx.operators.OperatorCast;
import rx.operators.OperatorDoOnEach;
import rx.operators.OperatorFilter;
Expand Down Expand Up @@ -174,7 +173,7 @@ public class Observable<T> {
* {@link OnSubscribe} to be executed when {@link #subscribe(Subscriber)} is called
*/
protected Observable(OnSubscribe<T> f) {
this.f = f;
this.f = hook.onCreate(f);
}

private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
Expand Down Expand Up @@ -206,19 +205,28 @@ protected Observable(OnSubscribe<T> f) {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.create.aspx">MSDN: Observable.Create</a>
*/
public final static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
return new Observable<T>(f);
}

/**
*
* Invoked when Obserable.subscribe is called.
*/
public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {

// cover for generics insanity
}

/**
* Operator function for lifting into an Observable.
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}


/**
*
* @Deprecated
*/
@Deprecated
public final static <T> Observable<T> create(final OnSubscribeFunc<T> f) {
return new Observable<T>(new OnSubscribe<T>() {

Expand Down Expand Up @@ -253,11 +261,11 @@ public static interface OnSubscribeFunc<T> extends Function {
* @param bind
* @return an Observable that emits values that are the result of applying the bind function to the values of the current Observable
*/
public <R> Observable<R> lift(final Func1<Subscriber<? super R>, Subscriber<? super T>> bind) {
public <R> Observable<R> lift(final Operator<R, T> bind) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
subscribe(hook.onLift((Operator<R, T>) bind).call(o));
subscribe(hook.onLift(bind).call(o));
}
});
}
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import rx.Observable;
import rx.Subscriber;
import rx.observers.SafeSubscriber;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action1;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.SafeObservableSubscription;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/observers/Observers.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package rx.observers;

import rx.Observer;
import rx.Subscriber;
import rx.util.OnErrorNotImplementedException;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/observers/TestObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.observers;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationDelay.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import rx.observers.SynchronizedObserver;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
import rx.util.functions.Action0;

/**
* This behaves like {@link OperatorMerge} except that if any of the merged Observables notify of
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import rx.Scheduler.Inner;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
Expand Down
1 change: 0 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationTimer.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
Expand Down
8 changes: 0 additions & 8 deletions rxjava-core/src/main/java/rx/operators/Operator.java

This file was deleted.

1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorCast.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Subscriber;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;

Expand Down
5 changes: 1 addition & 4 deletions rxjava-core/src/main/java/rx/operators/OperatorFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
*/
package rx.operators;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.observables.GroupedObservable;
import rx.util.functions.Func1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.observables.GroupedObservable;
import rx.subjects.PublishSubject;
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.util.functions.Func1;

Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.observers.SynchronizedSubscriber;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.observables.GroupedObservable;
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorRepeat.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.SynchronizedSubscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.util.Timestamped;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.ArrayList;
import java.util.List;

import rx.Observable.Operator;
import rx.Subscriber;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Comparator;
import java.util.List;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.util.functions.Func2;

Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorZip.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable;
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Iterator;

import rx.Observable.Operator;
import rx.Subscriber;
import rx.observers.Subscribers;
import rx.util.functions.Func2;
Expand Down
1 change: 1 addition & 0 deletions rxjava-core/src/main/java/rx/operators/SafeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import rx.Observer;
import rx.Subscription;
import rx.observers.SynchronizedObserver;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import rx.util.CompositeException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.OnSubscribeFunc;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.Subscription;
import rx.operators.Operator;
import rx.util.functions.Func1;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.Observable.OnSubscribe;
import rx.operators.SafeObservableSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package rx.subscriptions;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

Expand Down
Loading