Skip to content

Commit bcb2a60

Browse files
committed
Factored a pure interface out of Observable: IObservable.
The IObservable interface supports the core contract of observability: handling of each element, detection of completion, and detection of errors. The concrete Observable class is now focused on its rightful role: providing a set of highly usable fluent utilities for working with IObservables. For another example of this pattern, see e.g. Guava’s FluentIterable vs. the JDK’s Iterable. (If not for backward compatibility, Observable might have been called “FluentObservable” and IObservable simply “Observable”.)
1 parent 13cd6a9 commit bcb2a60

File tree

2 files changed

+100
-4
lines changed

2 files changed

+100
-4
lines changed
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
19+
/**
20+
* The Observable interface that implements the Reactive Pattern.
21+
* <p>
22+
* The documentation for this interface and its implementations makes use of
23+
* marble diagrams. The following legend explains these diagrams:
24+
* <p>
25+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/legend.png">
26+
* <p>
27+
* For more information see the
28+
* <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
29+
*
30+
* @param <T> the type of the item emitted by the Observable.
31+
*/
32+
public interface IObservable<T> {
33+
34+
/**
35+
* An {@link Observer} must call an Observable's {@code subscribe} method in
36+
* order to receive items and notifications from the Observable.
37+
* <p>
38+
* A typical implementation of {@code subscribe} does the following:
39+
* <ol>
40+
* <li>It stores a reference to the Observer in a collection object, such as
41+
* a {@code List<T>} object.</li>
42+
* <li>It returns a reference to the {@link Subscription} interface. This
43+
* enables Observers to unsubscribe, that is, to stop receiving items
44+
* and notifications before the Observable stops sending them, which
45+
* also invokes the Observer's {@link Observer#onCompleted onCompleted}
46+
* method.</li>
47+
* </ol><p>
48+
* An <code>IObservable&lt;T&gt;</code> instance is responsible for accepting
49+
* all subscriptions and notifying all Observers. Unless the documentation
50+
* for a particular <code>IObservable&lt;T&gt;</code> implementation
51+
* indicates otherwise, Observers should make no assumptions about the order
52+
* in which multiple Observers will receive their notifications.
53+
* <p>
54+
* For more information see the
55+
* <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
56+
*
57+
* @param observer the Observer
58+
* @return a {@link Subscription} reference with which the {@link Observer}
59+
* can stop receiving items before the Observable has finished
60+
* sending them
61+
* @throws IllegalArgumentException if the {@link Observer} provided as the
62+
* argument to {@code subscribe()} is
63+
* {@code null}.
64+
*/
65+
public abstract Subscription subscribe(Observer<? super T> observer);
66+
67+
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,14 @@
130130
import rx.util.functions.Function;
131131

132132
/**
133-
* The Observable interface that implements the Reactive Pattern.
133+
* An implementation of the {@link IObservable} interface that provides
134+
* overloaded methods for subscribing as well as delegate methods to the
135+
* various operators in a fluent style.
134136
* <p>
135-
* This interface provides overloaded methods for subscribing as well as
136-
* delegate methods to the various operators.
137+
* It is expected that most applications will lean heavily upon this
138+
* particular IObservable implementation. To simplify the documentation,
139+
* instances of the IObservable interface generally, whether or not of this
140+
* class in particular, will be referred to simply as "Observables".
137141
* <p>
138142
* The documentation for this interface makes use of marble diagrams. The
139143
* following legend explains these diagrams:
@@ -145,7 +149,7 @@
145149
*
146150
* @param <T> the type of the item emitted by the Observable
147151
*/
148-
public class Observable<T> {
152+
public class Observable<T> implements IObservable<T> {
149153

150154
private final static ConcurrentHashMap<Class, Boolean> internalClassMap = new ConcurrentHashMap<Class, Boolean>();
151155

@@ -213,6 +217,7 @@ protected Observable(OnSubscribeFunc<T> onSubscribe) {
213217
* argument to {@code subscribe()} is
214218
* {@code null}
215219
*/
220+
@Override
216221
public Subscription subscribe(Observer<? super T> observer) {
217222
// allow the hook to intercept and/or decorate
218223
OnSubscribeFunc<T> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
@@ -2201,6 +2206,30 @@ public Observable<Timestamped<T>> timestamp() {
22012206
return create(OperationTimestamp.timestamp(this));
22022207
}
22032208

2209+
/**
2210+
* If the given {@link IObservable} is an {@link Observable} already,
2211+
* simply return it, cast to its concrete type. If not, wrap it in a new
2212+
* Observable that will delegate its own {@link #subscribe(Observer)} to
2213+
* the given Observable.
2214+
*
2215+
* @return the given Observable if it is of the correct type, or a new
2216+
* one that delegates to it if not.
2217+
*/
2218+
public static <T> Observable<T> from(final IObservable<T> observable) {
2219+
if (null == observable) {
2220+
throw new NullPointerException("IObservable argument");
2221+
} else if (observable instanceof Observable<?>) {
2222+
return (Observable<T>) observable;
2223+
} else {
2224+
return create(new OnSubscribeFunc<T>() {
2225+
@Override
2226+
public Subscription onSubscribe(Observer<? super T> observer) {
2227+
return observable.subscribe(observer);
2228+
}
2229+
});
2230+
}
2231+
}
2232+
22042233
/**
22052234
* Converts a {@link Future} into an Observable.
22062235
* <p>

0 commit comments

Comments
 (0)