diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 78780ab31b..ab0c8d3746 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -95,6 +95,77 @@ public static Observable create(OnSubscribe f) { return new Observable(hook.onCreate(f)); } + /** + * Returns an Observable that respects the back-pressure semantics. When the returned Observable is + * subscribed to it will initiate the given {@link SyncOnSubscribe}'s life cycle for + * generating events. + * + *

Note: the {@code SyncOnSubscribe} provides a generic way to fulfill data by iterating + * over a (potentially stateful) function (e.g. reading data off of a channel, a parser, ). If your + * data comes directly from an asyrchronous/potentially concurrent source then consider using the + * {@link Observable#create(AsyncOnSubscribe) asynchronous overload}. + * + *

+ * + *

+ * See Rx Design Guidelines (PDF) for detailed + * information. + *

+ *
Scheduler:
+ *
{@code create} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of the items that this Observable emits + * @param syncOnSubscribe + * an implementation of {@link SyncOnSubscribe}. There are many static creation methods + * on the class for convenience. + * @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified + * function + * @see {@link SyncOnSubscribe} {@code static create*} methods + * @see ReactiveX operators documentation: Create + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public static Observable create(SyncOnSubscribe syncOnSubscribe) { + return new Observable(hook.onCreate(syncOnSubscribe)); + } + + /** + * Returns an Observable that respects the back-pressure semantics. When the returned Observable is + * subscribed to it will initiate the given {@link AsyncOnSubscribe}'s life cycle for + * generating events. + * + *

Note: the {@code AsyncOnSubscribe} is useful for observable sources of data that are + * necessarily asynchronous (RPC, external services, etc). Typically most use cases can be solved + * with the {@link Observable#create(SyncOnSubscribe) synchronous overload}. + * + *

+ * + *

+ * See Rx Design Guidelines (PDF) for detailed + * information. + *

+ *
Scheduler:
+ *
{@code create} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param + * the type of the items that this Observable emits + * @param asyncOnSubscribe + * an implementation of {@link AsyncOnSubscribe}. There are many static creation methods + * on the class for convenience. + * @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified + * function + * @see {@link AsyncOnSubscribe AsyncOnSubscribe} {@code static create*} methods + * @see ReactiveX operators documentation: Create + * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) + */ + @Experimental + public static Observable create(AsyncOnSubscribe asyncOnSubscribe) { + return new Observable(hook.onCreate(asyncOnSubscribe)); + } + /** * Invoked when Observable.subscribe is called. */ diff --git a/src/main/java/rx/observables/AsyncOnSubscribe.java b/src/main/java/rx/observables/AsyncOnSubscribe.java index d95dc82b9d..24de19c149 100644 --- a/src/main/java/rx/observables/AsyncOnSubscribe.java +++ b/src/main/java/rx/observables/AsyncOnSubscribe.java @@ -106,10 +106,10 @@ protected void onUnsubscribe(S state) { * @param next * produces data to the downstream subscriber (see * {@link #next(Object, long, Observer) next(S, long, Observer)}) - * @return an OnSubscribe that emits data in a protocol compatible with back-pressure. + * @return an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure. */ @Experimental - public static OnSubscribe createSingleState(Func0 generator, + public static AsyncOnSubscribe createSingleState(Func0 generator, final Action3>> next) { Func3>, S> nextFunc = new Func3>, S>() { @@ -134,11 +134,11 @@ public S call(S state, Long requested, Observer> subscri * {@link #next(Object, long, Observer) next(S, long, Observer)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createSingleState(Func0 generator, + public static AsyncOnSubscribe createSingleState(Func0 generator, final Action3>> next, final Action1 onUnsubscribe) { Func3>, S> nextFunc = @@ -162,11 +162,11 @@ public S call(S state, Long requested, Observer> subscri * {@link #next(Object, long, Observer) next(S, long, Observer)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateful(Func0 generator, + public static AsyncOnSubscribe createStateful(Func0 generator, Func3>, ? extends S> next, Action1 onUnsubscribe) { return new AsyncOnSubscribeImpl(generator, next, onUnsubscribe); @@ -181,11 +181,11 @@ public static OnSubscribe createStateful(Func0 generator, * @param next * produces data to the downstream subscriber (see * {@link #next(Object, long, Observer) next(S, long, Observer)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateful(Func0 generator, + public static AsyncOnSubscribe createStateful(Func0 generator, Func3>, ? extends S> next) { return new AsyncOnSubscribeImpl(generator, next); } @@ -200,11 +200,11 @@ public static OnSubscribe createStateful(Func0 generator, * @param next * produces data to the downstream subscriber (see * {@link #next(Object, long, Observer) next(S, long, Observer)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateless(final Action2>> next) { + public static AsyncOnSubscribe createStateless(final Action2>> next) { Func3>, Void> nextFunc = new Func3>, Void>() { @Override @@ -227,11 +227,11 @@ public Void call(Void state, Long requested, Observer> s * {@link #next(Object, long, Observer) next(S, long, Observer)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateless(final Action2>> next, + public static AsyncOnSubscribe createStateless(final Action2>> next, final Action0 onUnsubscribe) { Func3>, Void> nextFunc = new Func3>, Void>() { diff --git a/src/main/java/rx/observables/SyncOnSubscribe.java b/src/main/java/rx/observables/SyncOnSubscribe.java index f8cda8dde0..910a5acddb 100644 --- a/src/main/java/rx/observables/SyncOnSubscribe.java +++ b/src/main/java/rx/observables/SyncOnSubscribe.java @@ -124,10 +124,10 @@ protected void onUnsubscribe(S state) { * @param next * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) * next(S, Subscriber)}) - * @return an OnSubscribe that emits data in a protocol compatible with back-pressure. + * @return a SyncOnSubscribe that emits data in a protocol compatible with back-pressure. */ @Experimental - public static OnSubscribe createSingleState(Func0 generator, + public static SyncOnSubscribe createSingleState(Func0 generator, final Action2> next) { Func2, S> nextFunc = new Func2, S>() { @Override @@ -152,11 +152,11 @@ public S call(S state, Observer subscriber) { * next(S, Subscriber)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return a SyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createSingleState(Func0 generator, + public static SyncOnSubscribe createSingleState(Func0 generator, final Action2> next, final Action1 onUnsubscribe) { Func2, S> nextFunc = new Func2, S>() { @@ -180,11 +180,11 @@ public S call(S state, Observer subscriber) { * next(S, Subscriber)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return a SyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateful(Func0 generator, + public static SyncOnSubscribe createStateful(Func0 generator, Func2, ? extends S> next, Action1 onUnsubscribe) { return new SyncOnSubscribeImpl(generator, next, onUnsubscribe); @@ -199,11 +199,11 @@ public static OnSubscribe createStateful(Func0 generator, * @param next * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) * next(S, Subscriber)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return a SyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateful(Func0 generator, + public static SyncOnSubscribe createStateful(Func0 generator, Func2, ? extends S> next) { return new SyncOnSubscribeImpl(generator, next); } @@ -218,11 +218,11 @@ public static OnSubscribe createStateful(Func0 generator, * @param next * produces data to the downstream subscriber (see {@link #next(Object, Subscriber) * next(S, Subscriber)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return a SyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateless(final Action1> next) { + public static SyncOnSubscribe createStateless(final Action1> next) { Func2, Void> nextFunc = new Func2, Void>() { @Override public Void call(Void state, Observer subscriber) { @@ -245,11 +245,11 @@ public Void call(Void state, Observer subscriber) { * next(S, Subscriber)}) * @param onUnsubscribe * clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)}) - * @return an OnSubscribe that emits data downstream in a protocol compatible with + * @return a SyncOnSubscribe that emits data downstream in a protocol compatible with * back-pressure. */ @Experimental - public static OnSubscribe createStateless(final Action1> next, + public static SyncOnSubscribe createStateless(final Action1> next, final Action0 onUnsubscribe) { Func2, Void> nextFunc = new Func2, Void>() { @Override