Skip to content

visually distinguish operators that use schedulers, etc. #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 3, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 25 additions & 19 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,8 @@ public static <T> Observable<T> just(T value) {
* Returns an Observable that emits a single item and then completes on a
* specified scheduler.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/just.s.png">
* <p>
* This is a scheduler version of {@link Observable#just(Object)}.
*
* @param value the item to pass to the {@link Observer}'s
Expand Down Expand Up @@ -1387,7 +1389,6 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
* @param t1 an Observable to be concatenated
* @param t2 an Observable to be concatenated
* @param t3 an Observable to be concatenated
*
* @return an Observable that emits items that are the result of combining
* the items emitted by the {@code source} Observables, one after
* the other
Expand Down Expand Up @@ -1972,7 +1973,7 @@ public static Observable<Long> interval(long interval, TimeUnit unit) {
/**
* Emits an item each time interval (containing a sequential number).
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/interval.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/interval.s.png">
*
* @param interval interval size in time units (see below)
* @param unit time units to use for the interval size
Expand Down Expand Up @@ -2021,7 +2022,7 @@ public Observable<T> debounce(long timeout, TimeUnit unit) {
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/debounce.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/debounce.s.png">
* <p>
* Information on debounce vs throttle:
* <p>
Expand Down Expand Up @@ -2081,7 +2082,7 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
* Note: If events keep firing faster than the timeout then no data will be
* emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleWithTimeout.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleWithTimeout.s.png">
* <p>
* Information on debounce vs throttle:
* <p>
Expand Down Expand Up @@ -2131,7 +2132,7 @@ public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
* This differs from {@link #throttleLast} in that this only tracks passage
* of time whereas {@link #throttleLast} ticks at scheduled intervals.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleFirst.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleFirst.s.png">
*
* @param skipDuration time to wait before sending another item after
* emitting the last item
Expand Down Expand Up @@ -2174,11 +2175,13 @@ public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
* scheduled interval whereas {@link #throttleFirst} does not tick, it just
* tracks passage of time.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleLast.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/throttleLast.s.png">
*
* @param intervalDuration duration of windows within which the last item
* will be emitted
* @param unit the unit of time for the specified interval
* @param scheduler the {@link Scheduler} to use internally to manage the
* timers that handle timeout for each event
* @return an Observable that performs the throttle operation
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#takelast">RxJava Wiki: throttleLast()</a>
* @see #sample(long, TimeUnit, Scheduler)
Expand Down Expand Up @@ -2227,7 +2230,7 @@ public static <T> Observable<T> from(Future<? extends T> future) {
/**
* Converts a {@link Future} into an Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.Future.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.Future.s.png">
* <p>
* You can convert any object that supports the {@link Future} interface
* into an Observable that emits the return value of the {@link Future#get}
Expand Down Expand Up @@ -2913,7 +2916,7 @@ public Observable<List<T>> buffer(long timespan, TimeUnit unit) {
/**
* Creates an Observable that produces buffers of collected values.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer5.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer5.s.png">
* <p>
* This Observable produces connected, non-overlapping buffers, each of a
* fixed duration specified by the <code>timespan</code> argument. When the
Expand Down Expand Up @@ -2966,7 +2969,7 @@ public Observable<List<T>> buffer(long timespan, TimeUnit unit, int count) {
* first). When the source Observable completes or encounters an error, the
* current buffer is emitted and the event is propagated.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer6.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer6.s.png">
*
* @param timespan the period of time each buffer collects values before it
* should be emitted and replaced with a new buffer
Expand Down Expand Up @@ -3016,7 +3019,7 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit)
* source Observable completes or encounters an error, the current buffer is
* emitted and the event is propagated.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer7.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/buffer7.s.png">
*
* @param timespan the period of time each buffer collects values before it
* should be emitted
Expand Down Expand Up @@ -3153,7 +3156,7 @@ public Observable<Observable<T>> window(long timespan, TimeUnit unit) {
* source Observable completes or encounters an error, the current window is
* emitted and the event is propagated.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window5.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window5.s.png">
*
* @param timespan the period of time each window collects items before it
* should be emitted and replaced with a new window
Expand Down Expand Up @@ -3201,7 +3204,7 @@ public Observable<Observable<T>> window(long timespan, TimeUnit unit, int count)
* first). When the source Observable completes or encounters an error, the
* current window is emitted and the event is propagated.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window6.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window6.s.png">
*
* @param timespan the period of time each window collects values before it
* should be emitted and replaced with a new window
Expand Down Expand Up @@ -3251,7 +3254,7 @@ public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit
* source Observable completes or encounters an error, the current window is
* emitted and the event is propagated.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window7.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/window7.s.png">
*
* @param timespan the period of time each window collects values before it
* should be emitted
Expand Down Expand Up @@ -4239,7 +4242,6 @@ public <R> Observable<R> parallel(Func1<Observable<T>, Observable<R>> f) {
* {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#parallel">RxJava Wiki: parallel()</a>
*/

public <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>> f, final Scheduler s) {
return OperationParallel.parallel(this, f, s);
}
Expand Down Expand Up @@ -4288,6 +4290,7 @@ public static <T> Observable<Observable<T>> parallelMerge(Observable<Observable<
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/parallelMerge.png">
*
* @param parallelObservables the number of Observables to merge into
* @param scheduler
* @return an Observable of Observables constrained to number defined by
* <code>parallelObservables</code>.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#parallelmerge">RxJava Wiki: parallelMerge()</a>
Expand Down Expand Up @@ -4427,7 +4430,7 @@ public Observable<T> sample(long period, TimeUnit unit) {
* Returns an Observable that emits the results of sampling the items
* emitted by the source Observable at a specified time interval.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sample.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sample.s.png">
*
* @param period the sampling rate
* @param unit the {@link TimeUnit} in which <code>period</code> is defined
Expand Down Expand Up @@ -5233,7 +5236,7 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? exten
* isn't received within the specified timeout duration starting from its
* predecessor, a TimeoutException is propagated to the observer.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.1s.png">
*
* @param timeout maximum duration between values before a timeout occurs
* @param timeUnit the unit of time which applies to the
Expand All @@ -5255,7 +5258,7 @@ public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler schedule
* predecessor, the other observable sequence is used to produce future
* messages from that point on.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeout.2s.png">
*
* @param timeout maximum duration between values before a timeout occurs
* @param timeUnit the unit of time which applies to the
Expand Down Expand Up @@ -5289,7 +5292,7 @@ public Observable<TimeInterval<T>> timeInterval() {
* Records the time interval between consecutive items emitted by an
* Observable, using the specified Scheduler to compute time intervals.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeInterval.png">
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/timeInterval.s.png">
*
* @param scheduler Scheduler used to compute time intervals
* @return an Observable that emits time interval information items
Expand Down Expand Up @@ -5595,6 +5598,8 @@ public void onNext(T args) { }

/**
* Invokes an action for each item emitted by an Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.e.png">
*
* @param onNext the action to invoke for each item in the source sequence
* @param onError the action to invoke when the source Observable calls
Expand Down Expand Up @@ -5626,6 +5631,8 @@ public void onNext(T args) {

/**
* Invokes an action for each item emitted by an Observable.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/doOnEach.ce.png">
*
* @param onNext the action to invoke for each item in the source sequence
* @param onError the action to invoke when the source Observable calls
Expand Down Expand Up @@ -5655,7 +5662,6 @@ public void onNext(T args) {

};


return create(OperationDoOnEach.doOnEach(this, observer));
}

Expand Down