Skip to content

Commit 6c87f8f

Browse files
Merge pull request #231 from mairbek/multicast
Multicast
2 parents d8181f5 + 9d48d21 commit 6c87f8f

File tree

6 files changed

+462
-119
lines changed

6 files changed

+462
-119
lines changed

rxjava-core/src/main/java/rx/Observable.java

+28-1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.mockito.Mockito;
3737
import org.mockito.MockitoAnnotations;
3838

39+
import rx.observables.ConnectableObservable;
3940
import rx.observables.GroupedObservable;
4041
import rx.operators.OperationAll;
4142
import rx.operators.OperationConcat;
@@ -49,6 +50,7 @@
4950
import rx.operators.OperationMerge;
5051
import rx.operators.OperationMergeDelayError;
5152
import rx.operators.OperationMostRecent;
53+
import rx.operators.OperatorMulticast;
5254
import rx.operators.OperationNext;
5355
import rx.operators.OperationObserveOn;
5456
import rx.operators.OperationOnErrorResumeNextViaFunction;
@@ -72,6 +74,7 @@
7274
import rx.plugins.RxJavaErrorHandler;
7375
import rx.plugins.RxJavaObservableExecutionHook;
7476
import rx.plugins.RxJavaPlugins;
77+
import rx.subjects.Subject;
7578
import rx.subscriptions.BooleanSubscription;
7679
import rx.subscriptions.Subscriptions;
7780
import rx.util.AtomicObservableSubscription;
@@ -585,6 +588,17 @@ public void call(Object args) {
585588
});
586589
}
587590

591+
/**
592+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
593+
*
594+
* @param subject the subject to push source elements into.
595+
* @param <R> result type
596+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
597+
*/
598+
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
599+
return multicast(this, subject);
600+
}
601+
588602
/**
589603
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
590604
*
@@ -2072,9 +2086,22 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
20722086
return OperationMostRecent.mostRecent(source, initialValue);
20732087
}
20742088

2089+
/**
2090+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2091+
*
2092+
* @param source the source sequence whose elements will be pushed into the specified subject.
2093+
* @param subject the subject to push source elements into.
2094+
* @param <T> source type
2095+
* @param <R> result type
2096+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2097+
*/
2098+
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
2099+
return OperatorMulticast.multicast(source, subject);
2100+
}
2101+
20752102
/**
20762103
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
2077-
*
2104+
*
20782105
* @param that
20792106
* the source Observable
20802107
* @return The single element in the observable sequence.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.observables;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Subscription;
21+
import rx.util.functions.Func1;
22+
23+
public abstract class ConnectableObservable<T> extends Observable<T> {
24+
25+
protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
26+
super(onSubscribe);
27+
}
28+
29+
public abstract Subscription connect();
30+
31+
}

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import rx.Observable;
2727
import rx.Observer;
2828
import rx.Subscription;
29-
import rx.subjects.Subject;
29+
import rx.subjects.DefaultSubject;
3030
import rx.subscriptions.Subscriptions;
3131
import rx.util.AtomicObservableSubscription;
3232
import rx.util.AtomicObserver;
@@ -174,7 +174,7 @@ public Boolean call(Integer input)
174174

175175
@Test
176176
public void testTakeWhileOnSubject1() {
177-
Subject<Integer> s = Subject.create();
177+
DefaultSubject<Integer> s = DefaultSubject.create();
178178
Observable<Integer> w = (Observable<Integer>) s;
179179
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
180180
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.observables.ConnectableObservable;
23+
import rx.subjects.DefaultSubject;
24+
import rx.subjects.Subject;
25+
import rx.util.functions.Func1;
26+
27+
import static org.mockito.Mockito.*;
28+
29+
public class OperatorMulticast {
30+
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
31+
return new MulticastConnectableObservable<T, R>(source, subject);
32+
}
33+
34+
private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
35+
private final Object lock = new Object();
36+
37+
private final Observable<T> source;
38+
private final Subject<T, R> subject;
39+
40+
private Subscription subscription;
41+
42+
public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
43+
super(new Func1<Observer<R>, Subscription>() {
44+
@Override
45+
public Subscription call(Observer<R> observer) {
46+
return subject.subscribe(observer);
47+
}
48+
});
49+
this.source = source;
50+
this.subject = subject;
51+
}
52+
53+
public Subscription connect() {
54+
synchronized (lock) {
55+
if (subscription == null) {
56+
subscription = source.subscribe(new Observer<T>() {
57+
@Override
58+
public void onCompleted() {
59+
subject.onCompleted();
60+
}
61+
62+
@Override
63+
public void onError(Exception e) {
64+
subject.onError(e);
65+
}
66+
67+
@Override
68+
public void onNext(T args) {
69+
subject.onNext(args);
70+
}
71+
});
72+
}
73+
}
74+
75+
76+
return new Subscription() {
77+
@Override
78+
public void unsubscribe() {
79+
synchronized (lock) {
80+
if (subscription != null) {
81+
subscription.unsubscribe();
82+
subscription = null;
83+
}
84+
}
85+
}
86+
};
87+
}
88+
89+
90+
}
91+
92+
public static class UnitTest {
93+
94+
@Test
95+
public void testMulticast() {
96+
TestObservable source = new TestObservable();
97+
98+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
99+
DefaultSubject.<String>create());
100+
101+
Observer<String> observer = mock(Observer.class);
102+
multicasted.subscribe(observer);
103+
104+
source.sendOnNext("one");
105+
source.sendOnNext("two");
106+
107+
multicasted.connect();
108+
109+
source.sendOnNext("three");
110+
source.sendOnNext("four");
111+
source.sendOnCompleted();
112+
113+
verify(observer, never()).onNext("one");
114+
verify(observer, never()).onNext("two");
115+
verify(observer, times(1)).onNext("three");
116+
verify(observer, times(1)).onNext("four");
117+
verify(observer, times(1)).onCompleted();
118+
119+
}
120+
121+
@Test
122+
public void testMulticastConnectTwice() {
123+
TestObservable source = new TestObservable();
124+
125+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
126+
DefaultSubject.<String>create());
127+
128+
Observer<String> observer = mock(Observer.class);
129+
multicasted.subscribe(observer);
130+
131+
source.sendOnNext("one");
132+
133+
multicasted.connect();
134+
multicasted.connect();
135+
136+
source.sendOnNext("two");
137+
source.sendOnCompleted();
138+
139+
verify(observer, never()).onNext("one");
140+
verify(observer, times(1)).onNext("two");
141+
verify(observer, times(1)).onCompleted();
142+
143+
}
144+
145+
@Test
146+
public void testMulticastDisconnect() {
147+
TestObservable source = new TestObservable();
148+
149+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
150+
DefaultSubject.<String>create());
151+
152+
Observer<String> observer = mock(Observer.class);
153+
multicasted.subscribe(observer);
154+
155+
source.sendOnNext("one");
156+
157+
Subscription connection = multicasted.connect();
158+
source.sendOnNext("two");
159+
160+
connection.unsubscribe();
161+
source.sendOnNext("three");
162+
163+
multicasted.connect();
164+
source.sendOnNext("four");
165+
source.sendOnCompleted();
166+
167+
verify(observer, never()).onNext("one");
168+
verify(observer, times(1)).onNext("two");
169+
verify(observer, never()).onNext("three");
170+
verify(observer, times(1)).onNext("four");
171+
verify(observer, times(1)).onCompleted();
172+
173+
}
174+
175+
176+
private static class TestObservable extends Observable<String> {
177+
178+
Observer<String> observer = new Observer<String>() {
179+
@Override
180+
public void onCompleted() {
181+
// Do nothing
182+
}
183+
184+
@Override
185+
public void onError(Exception e) {
186+
// Do nothing
187+
}
188+
189+
@Override
190+
public void onNext(String args) {
191+
// Do nothing
192+
}
193+
};
194+
Subscription s = new Subscription() {
195+
@Override
196+
public void unsubscribe() {
197+
observer = new Observer<String>() {
198+
@Override
199+
public void onCompleted() {
200+
// Do nothing
201+
}
202+
203+
@Override
204+
public void onError(Exception e) {
205+
// Do nothing
206+
}
207+
208+
@Override
209+
public void onNext(String args) {
210+
// Do nothing
211+
}
212+
};
213+
}
214+
};
215+
216+
public TestObservable() {
217+
}
218+
219+
/* used to simulate subscription */
220+
public void sendOnCompleted() {
221+
observer.onCompleted();
222+
}
223+
224+
/* used to simulate subscription */
225+
public void sendOnNext(String value) {
226+
observer.onNext(value);
227+
}
228+
229+
/* used to simulate subscription */
230+
public void sendOnError(Exception e) {
231+
observer.onError(e);
232+
}
233+
234+
@Override
235+
public Subscription subscribe(final Observer<String> observer) {
236+
this.observer = observer;
237+
return s;
238+
}
239+
240+
}
241+
242+
}
243+
}

0 commit comments

Comments
 (0)