Skip to content

Commit 479d1e0

Browse files
author
Aaron Tull
committed
Creating Observable#create overloads for SyncOnSubscribe and AsyncOnSubscribe
1 parent 02e6903 commit 479d1e0

File tree

3 files changed

+95
-24
lines changed

3 files changed

+95
-24
lines changed

src/main/java/rx/Observable.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,77 @@ public static <T> Observable<T> create(OnSubscribe<T> f) {
9595
return new Observable<T>(hook.onCreate(f));
9696
}
9797

98+
/**
99+
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
100+
* subscribed to it will initiate the given {@link SyncOnSubscribe}'s life cycle for
101+
* generating events.
102+
*
103+
* <p><b>Note:</b> the {@code SyncOnSubscribe} provides a generic way to fulfill data by iterating
104+
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser, ). If your
105+
* data comes directly from an asyrchronous/potentially concurrent source then consider using the
106+
* {@link Observable#create(AsyncOnSubscribe) asynchronous overload}.
107+
*
108+
* <p>
109+
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create-sync.png" alt="">
110+
* <p>
111+
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
112+
* information.
113+
* <dl>
114+
* <dt><b>Scheduler:</b></dt>
115+
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
116+
* </dl>
117+
*
118+
* @param <T>
119+
* the type of the items that this Observable emits
120+
* @param syncOnSubscribe
121+
* an implementation of {@link SyncOnSubscribe}. There are many static creation methods
122+
* on the class for convenience.
123+
* @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified
124+
* function
125+
* @see {@link SyncOnSubscribe} {@code static create*} methods
126+
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
127+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
128+
*/
129+
@Experimental
130+
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
131+
return new Observable<T>(hook.onCreate(syncOnSubscribe));
132+
}
133+
134+
/**
135+
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
136+
* subscribed to it will initiate the given {@link AsyncOnSubscribe}'s life cycle for
137+
* generating events.
138+
*
139+
* <p><b>Note:</b> the {@code AsyncOnSubscribe} is useful for observable sources of data that are
140+
* necessarily asynchronous (RPC, external services, etc). Typically most use cases can be solved
141+
* with the {@link Observable#create(SyncOnSubscribe) synchronous overload}.
142+
*
143+
* <p>
144+
* <img width="640" height="200" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/create-async.png" alt="">
145+
* <p>
146+
* See <a href="http://go.microsoft.com/fwlink/?LinkID=205219">Rx Design Guidelines (PDF)</a> for detailed
147+
* information.
148+
* <dl>
149+
* <dt><b>Scheduler:</b></dt>
150+
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
151+
* </dl>
152+
*
153+
* @param <T>
154+
* the type of the items that this Observable emits
155+
* @param asyncOnSubscribe
156+
* an implementation of {@link AsyncOnSubscribe}. There are many static creation methods
157+
* on the class for convenience.
158+
* @return an Observable that, when a {@link Subscriber} subscribes to it, will execute the specified
159+
* function
160+
* @see {@link AsyncOnSubscribe AsyncOnSubscribe} {@code static create*} methods
161+
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
162+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
163+
*/
164+
@Experimental
165+
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
166+
return new Observable<T>(hook.onCreate(asyncOnSubscribe));
167+
}
168+
98169
/**
99170
* Invoked when Observable.subscribe is called.
100171
*/

src/main/java/rx/observables/AsyncOnSubscribe.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,10 @@ protected void onUnsubscribe(S state) {
106106
* @param next
107107
* produces data to the downstream subscriber (see
108108
* {@link #next(Object, long, Observer) next(S, long, Observer)})
109-
* @return an OnSubscribe that emits data in a protocol compatible with back-pressure.
109+
* @return an AsyncOnSubscribe that emits data in a protocol compatible with back-pressure.
110110
*/
111111
@Experimental
112-
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
112+
public static <S, T> AsyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
113113
final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next) {
114114
Func3<S, Long, ? super Observer<Observable<? extends T>>, S> nextFunc =
115115
new Func3<S, Long, Observer<Observable<? extends T>>, S>() {
@@ -134,11 +134,11 @@ public S call(S state, Long requested, Observer<Observable<? extends T>> subscri
134134
* {@link #next(Object, long, Observer) next(S, long, Observer)})
135135
* @param onUnsubscribe
136136
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
137-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
137+
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
138138
* back-pressure.
139139
*/
140140
@Experimental
141-
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
141+
public static <S, T> AsyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
142142
final Action3<? super S, Long, ? super Observer<Observable<? extends T>>> next,
143143
final Action1<? super S> onUnsubscribe) {
144144
Func3<S, Long, Observer<Observable<? extends T>>, S> nextFunc =
@@ -162,11 +162,11 @@ public S call(S state, Long requested, Observer<Observable<? extends T>> subscri
162162
* {@link #next(Object, long, Observer) next(S, long, Observer)})
163163
* @param onUnsubscribe
164164
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
165-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
165+
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
166166
* back-pressure.
167167
*/
168168
@Experimental
169-
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
169+
public static <S, T> AsyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
170170
Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next,
171171
Action1<? super S> onUnsubscribe) {
172172
return new AsyncOnSubscribeImpl<S, T>(generator, next, onUnsubscribe);
@@ -181,11 +181,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
181181
* @param next
182182
* produces data to the downstream subscriber (see
183183
* {@link #next(Object, long, Observer) next(S, long, Observer)})
184-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
184+
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
185185
* back-pressure.
186186
*/
187187
@Experimental
188-
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
188+
public static <S, T> AsyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
189189
Func3<? super S, Long, ? super Observer<Observable<? extends T>>, ? extends S> next) {
190190
return new AsyncOnSubscribeImpl<S, T>(generator, next);
191191
}
@@ -200,11 +200,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
200200
* @param next
201201
* produces data to the downstream subscriber (see
202202
* {@link #next(Object, long, Observer) next(S, long, Observer)})
203-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
203+
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
204204
* back-pressure.
205205
*/
206206
@Experimental
207-
public static <T> OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) {
207+
public static <T> AsyncOnSubscribe<Void, T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next) {
208208
Func3<Void, Long, Observer<Observable<? extends T>>, Void> nextFunc =
209209
new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() {
210210
@Override
@@ -227,11 +227,11 @@ public Void call(Void state, Long requested, Observer<Observable<? extends T>> s
227227
* {@link #next(Object, long, Observer) next(S, long, Observer)})
228228
* @param onUnsubscribe
229229
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
230-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
230+
* @return an AsyncOnSubscribe that emits data downstream in a protocol compatible with
231231
* back-pressure.
232232
*/
233233
@Experimental
234-
public static <T> OnSubscribe<T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next,
234+
public static <T> AsyncOnSubscribe<Void, T> createStateless(final Action2<Long, ? super Observer<Observable<? extends T>>> next,
235235
final Action0 onUnsubscribe) {
236236
Func3<Void, Long, Observer<Observable<? extends T>>, Void> nextFunc =
237237
new Func3<Void, Long, Observer<Observable<? extends T>>, Void>() {

src/main/java/rx/observables/SyncOnSubscribe.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,10 @@ protected void onUnsubscribe(S state) {
124124
* @param next
125125
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
126126
* next(S, Subscriber)})
127-
* @return an OnSubscribe that emits data in a protocol compatible with back-pressure.
127+
* @return a SyncOnSubscribe that emits data in a protocol compatible with back-pressure.
128128
*/
129129
@Experimental
130-
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
130+
public static <S, T> SyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
131131
final Action2<? super S, ? super Observer<? super T>> next) {
132132
Func2<S, ? super Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() {
133133
@Override
@@ -152,11 +152,11 @@ public S call(S state, Observer<? super T> subscriber) {
152152
* next(S, Subscriber)})
153153
* @param onUnsubscribe
154154
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
155-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
155+
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
156156
* back-pressure.
157157
*/
158158
@Experimental
159-
public static <S, T> OnSubscribe<T> createSingleState(Func0<? extends S> generator,
159+
public static <S, T> SyncOnSubscribe<S, T> createSingleState(Func0<? extends S> generator,
160160
final Action2<? super S, ? super Observer<? super T>> next,
161161
final Action1<? super S> onUnsubscribe) {
162162
Func2<S, Observer<? super T>, S> nextFunc = new Func2<S, Observer<? super T>, S>() {
@@ -180,11 +180,11 @@ public S call(S state, Observer<? super T> subscriber) {
180180
* next(S, Subscriber)})
181181
* @param onUnsubscribe
182182
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
183-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
183+
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
184184
* back-pressure.
185185
*/
186186
@Experimental
187-
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
187+
public static <S, T> SyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
188188
Func2<? super S, ? super Observer<? super T>, ? extends S> next,
189189
Action1<? super S> onUnsubscribe) {
190190
return new SyncOnSubscribeImpl<S, T>(generator, next, onUnsubscribe);
@@ -199,11 +199,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
199199
* @param next
200200
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
201201
* next(S, Subscriber)})
202-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
202+
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
203203
* back-pressure.
204204
*/
205205
@Experimental
206-
public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
206+
public static <S, T> SyncOnSubscribe<S, T> createStateful(Func0<? extends S> generator,
207207
Func2<? super S, ? super Observer<? super T>, ? extends S> next) {
208208
return new SyncOnSubscribeImpl<S, T>(generator, next);
209209
}
@@ -218,11 +218,11 @@ public static <S, T> OnSubscribe<T> createStateful(Func0<? extends S> generator,
218218
* @param next
219219
* produces data to the downstream subscriber (see {@link #next(Object, Subscriber)
220220
* next(S, Subscriber)})
221-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
221+
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
222222
* back-pressure.
223223
*/
224224
@Experimental
225-
public static <T> OnSubscribe<T> createStateless(final Action1<? super Observer<? super T>> next) {
225+
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<? super Observer<? super T>> next) {
226226
Func2<Void, Observer<? super T>, Void> nextFunc = new Func2<Void, Observer<? super T>, Void>() {
227227
@Override
228228
public Void call(Void state, Observer<? super T> subscriber) {
@@ -245,11 +245,11 @@ public Void call(Void state, Observer<? super T> subscriber) {
245245
* next(S, Subscriber)})
246246
* @param onUnsubscribe
247247
* clean up behavior (see {@link #onUnsubscribe(Object) onUnsubscribe(S)})
248-
* @return an OnSubscribe that emits data downstream in a protocol compatible with
248+
* @return a SyncOnSubscribe that emits data downstream in a protocol compatible with
249249
* back-pressure.
250250
*/
251251
@Experimental
252-
public static <T> OnSubscribe<T> createStateless(final Action1<? super Observer<? super T>> next,
252+
public static <T> SyncOnSubscribe<Void, T> createStateless(final Action1<? super Observer<? super T>> next,
253253
final Action0 onUnsubscribe) {
254254
Func2<Void, Observer<? super T>, Void> nextFunc = new Func2<Void, Observer<? super T>, Void>() {
255255
@Override

0 commit comments

Comments
 (0)