Skip to content

Creating Observable#create overloads for SyncOnSubscribe and AsyncOnSubscribe #3738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,77 @@ public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given {@link SyncOnSubscribe}'s life cycle for
* generating events.
*
* <p><b>Note:</b> the {@code SyncOnSubscribe} provides a generic way to fulfill data by iterating
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser, ). If your
* data comes directly from an asyrchronous/potentially concurrent source then consider using the
* {@link Observable#create(AsyncOnSubscribe) asynchronous overload}.
*
* <p>
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create-sync.png" alt="">
* <p>
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
* information.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T>
* the type of the items that this Observable emits
* @param syncOnSubscribe
* an implementation of {@link SyncOnSubscribe}. There are many static creation methods
* on the class for convenience.
* @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified
* function
* @see {@link SyncOnSubscribe} {@code static create*} methods
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
return new Observable<T>(hook.onCreate(syncOnSubscribe));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you couldn't call create(OnSubscribe) because of overload problems.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still call to the OnSubscribe overload and it would work just fine. The purpose behind the 2 new overloads is discoverability.

}

/**
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
* subscribed to it will initiate the given {@link AsyncOnSubscribe}'s life cycle for
* generating events.
*
* <p><b>Note:</b> the {@code AsyncOnSubscribe} is useful for observable sources of data that are
* necessarily asynchronous (RPC, external services, etc). Typically most use cases can be solved
* with the {@link Observable#create(SyncOnSubscribe) synchronous overload}.
*
* <p>
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create-async.png" alt="">
* <p>
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
* information.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T>
* the type of the items that this Observable emits
* @param asyncOnSubscribe
* an implementation of {@link AsyncOnSubscribe}. There are many static creation methods
* on the class for convenience.
* @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified
* function
* @see {@link AsyncOnSubscribe AsyncOnSubscribe} {@code static create*} methods
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
return new Observable<T>(hook.onCreate(asyncOnSubscribe));
}

/**
* Invoked when Observable.subscribe is called.
*/
Expand Down
24 changes: 12 additions & 12 deletions src/main/java/rx/observables/AsyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ protected void onUnsubscribe(S state) {
* @param next
* produces data to the downstream subscriber (see
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @return an OnSubscribe that emits data in a protocol compatible with back-pressure.
* @return an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
public static <S, T> AsyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) {
Func3<S, Long, ? super Observer<Observable<? extends T>>, S> nextFunc =
new Func3<S, Long, Observer<Observable<? extends T>>, S>() {
Expand All @@ -134,11 +134,11 @@ public S call(S state, Long requested, Observer<Observable<? extends T>> subscri
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
public static <S, T> AsyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next,
final Action1<? super S> onUnsubscribe) {
Func3<S, Long, Observer<Observable<? extends T>>, S> nextFunc =
Expand All @@ -162,11 +162,11 @@ public S call(S state, Long requested, Observer<Observable<? extends T>> subscri
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
public static <S, T> AsyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next,
Action1<? super S> onUnsubscribe) {
return new AsyncOnSubscribeImpl<S, T>(generator, next, onUnsubscribe);
Expand All @@ -181,11 +181,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
* @param next
* produces data to the downstream subscriber (see
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
public static <S, T> AsyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next) {
return new AsyncOnSubscribeImpl<S, T>(generator, next);
}
Expand All @@ -200,11 +200,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
* @param next
* produces data to the downstream subscriber (see
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) {
public static <T> AsyncOnSubscribe<Void, T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) {
Func3<Void, Long, Observer<Observable<? extends T>>, Void> nextFunc =
new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() {
@Override
Expand All @@ -227,11 +227,11 @@ public Void call(Void state, Long requested, Observer<Observable<? extends T>> s
* {@link #next(Object, long, Observer) next(S, long, Observer)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next,
public static <T> AsyncOnSubscribe<Void, T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next,
final Action0 onUnsubscribe) {
Func3<Void, Long, Observer<Observable<? extends T>>, Void> nextFunc =
new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() {
Expand Down
24 changes: 12 additions & 12 deletions src/main/java/rx/observables/SyncOnSubscribe.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ protected void onUnsubscribe(S state) {
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data in a protocol compatible with back-pressure.
* @return a SyncOnSubscribe that emits data in a protocol compatible with back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
public static <S, T> SyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
final Action2<? super S, ? super Observer<? super T>> next) {
Func2<S, ? super Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() {
@Override
Expand All @@ -152,11 +152,11 @@ public S call(S state, Observer<? super T> subscriber) {
* next(S, Subscriber)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
public static <S, T> SyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
final Action2<? super S, ? super Observer<? super T>> next,
final Action1<? super S> onUnsubscribe) {
Func2<S, Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() {
Expand All @@ -180,11 +180,11 @@ public S call(S state, Observer<? super T> subscriber) {
* next(S, Subscriber)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
public static <S, T> SyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
Func2<? super S, ? super Observer<? super T>, ? extends S> next,
Action1<? super S> onUnsubscribe) {
return new SyncOnSubscribeImpl<S, T>(generator, next, onUnsubscribe);
Expand All @@ -199,11 +199,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
public static <S, T> SyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
Func2<? super S, ? super Observer<? super T>, ? extends S> next) {
return new SyncOnSubscribeImpl<S, T>(generator, next);
}
Expand All @@ -218,11 +218,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
* @param next
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
* next(S, Subscriber)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> OnSubscribe<T> createStateless(final Action1<? super Observer<? super T>> next) {
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<? super Observer<? super T>> next) {
Func2<Void, Observer<? super T>, Void> nextFunc = new Func2<Void, Observer<? super T>, Void>() {
@Override
public Void call(Void state, Observer<? super T> subscriber) {
Expand All @@ -245,11 +245,11 @@ public Void call(Void state, Observer<? super T> subscriber) {
* next(S, Subscriber)})
* @param onUnsubscribe
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
* @return an OnSubscribe that emits data downstream in a protocol compatible with
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
* back-pressure.
*/
@Experimental
public static <T> OnSubscribe<T> createStateless(final Action1<? super Observer<? super T>> next,
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<? super Observer<? super T>> next,
final Action0 onUnsubscribe) {
Func2<Void, Observer<? super T>, Void> nextFunc = new Func2<Void, Observer<? super T>, Void>() {
@Override
Expand Down