Skip to content

add Subscribers.wrap #3065

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeDefer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.observers.Subscribers;

/**
* Do not create the Observable until an Observer subscribes; create a fresh Observable on each
Expand Down Expand Up @@ -46,20 +47,7 @@ public void call(final Subscriber<? super T> s) {
s.onError(t);
return;
}
o.unsafeSubscribe(new Subscriber<T>(s) {
@Override
public void onNext(T t) {
s.onNext(t);
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onCompleted() {
s.onCompleted();
}
});
o.unsafeSubscribe(Subscribers.wrap(s));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.Observable.OnSubscribe;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.observers.Subscribers;

/**
* Delays the subscription to the source by the given amount, running on the given scheduler.
Expand Down Expand Up @@ -49,20 +50,7 @@ public void call(final Subscriber<? super T> s) {
@Override
public void call() {
if (!s.isUnsubscribed()) {
source.unsafeSubscribe(new Subscriber<T>(s) {
@Override
public void onNext(T t) {
s.onNext(t);
}
@Override
public void onError(Throwable e) {
s.onError(e);
}
@Override
public void onCompleted() {
s.onCompleted();
}
});
source.unsafeSubscribe(Subscribers.wrap(s));
}
}
}, time, unit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.*;
import rx.Observable.OnSubscribe;
import rx.functions.Func0;
import rx.observers.Subscribers;

/**
* Delays the subscription until the Observable<U> emits an event.
Expand All @@ -42,20 +43,7 @@ public void call(final Subscriber<? super T> child) {
@Override
public void onCompleted() {
// subscribe to actual source
source.unsafeSubscribe(new Subscriber<T>(child) {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
});
source.unsafeSubscribe(Subscribers.wrap(child));
}

@Override
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeUsing.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import rx.Observable.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.functions.*;
import rx.observers.Subscribers;

/**
* Constructs an observable sequence that depends on a resource object.
Expand Down Expand Up @@ -68,20 +69,7 @@ public void call(final Subscriber<? super T> subscriber) {
observable = source;
try {
// start
observable.unsafeSubscribe(new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
});
observable.unsafeSubscribe(Subscribers.wrap(subscriber));
} catch (Throwable e) {
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
if (disposeError != null)
Expand Down
16 changes: 2 additions & 14 deletions src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.Subscribers;

/**
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is subscribed.
Expand All @@ -39,19 +40,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
subscribe.call();
// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return new Subscriber<T>(child) {
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
return Subscribers.wrap(child);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is less obvious as to why it needs to be wrapped. The duplicate onStart problem seems like something we should fix inside Subscriber rather than needing to object allocate and wrap.

}
}
19 changes: 2 additions & 17 deletions src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import rx.Observable.Operator;
import rx.*;
import rx.functions.Action0;
import rx.observers.Subscribers;
import rx.subscriptions.Subscriptions;

/**
Expand All @@ -41,22 +42,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

// Pass through since this operator is for notification only, there is
// no change to the stream whatsoever.
return new Subscriber<T>(child) {
@Override
public void onStart() {
}
@Override
public void onNext(T t) {
child.onNext(t);
}
@Override
public void onError(Throwable e) {
child.onError(e);
}
@Override
public void onCompleted() {
child.onCompleted();
}
};
return Subscribers.wrap(child);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}
38 changes: 38 additions & 0 deletions src/main/java/rx/observers/Subscribers.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import rx.Observer;
import rx.Subscriber;
import rx.annotations.Experimental;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -198,4 +199,41 @@ public final void onNext(T args) {
};
}

/**
* Returns a new {@link Subscriber} that passes all events to
* <code>subscriber</code>, has backpressure controlled by
* <code>subscriber</code> and uses the subscription list of
* <code>subscriber</code> when {@link Subscriber#add(rx.Subscription)} is
* called.
*
* @param subscriber
* the Subscriber to wrap.
*
* @return a new Subscriber that passes all events to
* <code>subscriber</code>, has backpressure controlled by
* <code>subscriber</code> and uses <code>subscriber</code> to
* manage unsubscription.
*
*/
@Experimental
public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

};
}
}