Skip to content

Commit 009b3e2

Browse files
author
Nitesh Kant
committed
New operators: concatEmptyWith and mergeEmptyWith.
As discussed in issue #3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type. Both these operators raise an error if the source Observable emits any item. Review comments
1 parent cb9d1eb commit 009b3e2

File tree

5 files changed

+701
-0
lines changed

5 files changed

+701
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9467,6 +9467,75 @@ public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2
94679467
return zip(this, other, zipFunction);
94689468
}
94699469

9470+
/**
9471+
* Returns an Observable that upon completion of the source Observable subscribes to the passed {@code other}
9472+
* Observable and then emits all items emitted by that Observable. This function does not expect the source
9473+
* Observable to emit any item, in case, the source Observable, emits any item, an {@link IllegalStateException}
9474+
* is raised.
9475+
* <p>
9476+
*
9477+
* This is different than {@link #concatWith(Observable)} as it does not expect the source Observable to ever emit
9478+
* an item. So, this usually is useful for {@code Observable<Void>} and results in cleaner code as opposed to using
9479+
* a {@link #cast(Class)}, something like:
9480+
*
9481+
* {@code Observable.<Void>empty().cast(String.class).concatWith(Observable.just("Hello"))}
9482+
*
9483+
* <dl>
9484+
* <dt><b>Scheduler:</b></dt>
9485+
* <dd>{@code concatEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
9486+
* </dl>
9487+
*
9488+
* <dt><b>Backpressure:</b></dt>
9489+
* <dd>{@code concatEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
9490+
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
9491+
* {@code Observable}.</dd>
9492+
*
9493+
* @return an Observable that upon completion of the source, starts emitting items from the {@code other}
9494+
* Observable.
9495+
* @throws IllegalStateException If the source emits any item.
9496+
*
9497+
* @see #mergeEmptyWith(Observable)
9498+
*/
9499+
@Experimental
9500+
public final <R> Observable<R> concatEmptyWith(Observable<R> other) {
9501+
return lift(new OperatorConcatEmptyWith<T, R>(other));
9502+
}
9503+
9504+
/**
9505+
* Returns an Observable that only listens for error from the source Observable and emit items only from the passed
9506+
* {@code other} Observable. This function does not expect the source Observable to emit any item, in case, the
9507+
* source Observable, emits any item, an {@link IllegalStateException} is raised.
9508+
* <p>
9509+
*
9510+
* This is different than {@link #mergeWith(Observable)} as it does not expect the source Observable to ever emit
9511+
* an item. So, this usually is useful for using on {@code Observable<Void>} and results in cleaner code as opposed
9512+
* to using a {@link #cast(Class)}, something like:
9513+
* {@code Observable.<Void>empty().cast(String.class).mergeWith(Observable.just("Hello"))}
9514+
*
9515+
* <dl>
9516+
* <dt><b>Scheduler:</b></dt>
9517+
* <dd>{@code mergeEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
9518+
* </dl>
9519+
*
9520+
* <dl>
9521+
* <dt><b>Backpressure:</b></dt>
9522+
* <dd>{@code mergeEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
9523+
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
9524+
* {@code Observable}.</dd>
9525+
* </dl>
9526+
*
9527+
* @return an Observable that only listens for errors from the source and starts emitting items from the
9528+
* {@code other} Observable on subscription.
9529+
* Observable.
9530+
* @throws IllegalStateException If the source emits any item.
9531+
*
9532+
* @see #concatEmptyWith(Observable)
9533+
*/
9534+
@Experimental
9535+
public final <R> Observable<R> mergeEmptyWith(Observable<R> other) {
9536+
return lift(new OperatorMergeEmptyWith<T, R>(other));
9537+
}
9538+
94709539
/**
94719540
* An Observable that never sends any information to an {@link Observer}.
94729541
* This Observable is useful primarily for testing purposes.
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Producer;
21+
import rx.Subscriber;
22+
import rx.internal.producers.ProducerArbiter;
23+
import rx.subscriptions.SerialSubscription;
24+
25+
/**
26+
* Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied
27+
* alternate {@code Observable} after the source completes.
28+
*
29+
* @param <T> the source value type
30+
* @param <R> the result value type
31+
*/
32+
public final class OperatorConcatEmptyWith<T, R> implements Operator<R, T> {
33+
34+
private final Observable<? extends R> alternate;
35+
36+
public OperatorConcatEmptyWith(Observable<? extends R> alternate) {
37+
this.alternate = alternate;
38+
}
39+
40+
@Override
41+
public Subscriber<? super T> call(Subscriber<? super R> child) {
42+
final SerialSubscription ssub = new SerialSubscription();
43+
final ParentSubscriber parent = new ParentSubscriber(child, ssub, alternate);
44+
ssub.set(parent);
45+
child.add(ssub);
46+
child.setProducer(parent.emptyProducer);
47+
return parent;
48+
}
49+
50+
private final class ParentSubscriber extends Subscriber<T> {
51+
52+
private final Subscriber<? super R> child;
53+
private final SerialSubscription ssub;
54+
private final EmptyProducer emptyProducer;
55+
private final Observable<? extends R> alternate;
56+
57+
ParentSubscriber(Subscriber<? super R> child, final SerialSubscription ssub, Observable<? extends R> alternate) {
58+
this.child = child;
59+
this.ssub = ssub;
60+
this.emptyProducer = new EmptyProducer();
61+
this.alternate = alternate;
62+
}
63+
64+
@Override
65+
public void setProducer(final Producer producer) {
66+
/*
67+
* Always request Max from the parent as we never really expect the parent to emit an item, so the
68+
* actual value does not matter. However, if the parent producer is waiting for a request to emit
69+
* a terminal event, not requesting the same will cause a deadlock of the parent never completing and
70+
* the child never subscribed.
71+
*/
72+
producer.request(Long.MAX_VALUE);
73+
}
74+
75+
@Override
76+
public void onCompleted() {
77+
if (!child.isUnsubscribed()) {
78+
AlternateSubscriber as = new AlternateSubscriber(child, emptyProducer);
79+
ssub.set(as);
80+
alternate.unsafeSubscribe(as);
81+
}
82+
}
83+
84+
@Override
85+
public void onError(Throwable e) {
86+
child.onError(e);
87+
}
88+
89+
@Override
90+
public void onNext(T t) {
91+
onError(new IllegalStateException("Concat empty with source emitted an item: " + t));
92+
}
93+
}
94+
95+
private final class AlternateSubscriber extends Subscriber<R> {
96+
97+
private final EmptyProducer emptyProducer;
98+
private final Subscriber<? super R> child;
99+
100+
AlternateSubscriber(Subscriber<? super R> child, EmptyProducer emptyProducer) {
101+
this.child = child;
102+
this.emptyProducer = emptyProducer;
103+
}
104+
105+
@Override
106+
public void setProducer(final Producer producer) {
107+
emptyProducer.setAltProducer(producer);
108+
}
109+
110+
@Override
111+
public void onCompleted() {
112+
child.onCompleted();
113+
}
114+
115+
@Override
116+
public void onError(Throwable e) {
117+
child.onError(e);
118+
}
119+
120+
@Override
121+
public void onNext(R r) {
122+
child.onNext(r);
123+
}
124+
}
125+
126+
/**
127+
* This is a producer implementation that does the following:
128+
*
129+
* <ul>
130+
* <li>If the alternate producer has not yet arrived, store the total requested count from downstream.</li>
131+
* <li>If the alternate producer has arrived, then relay the request demand to it.</li>
132+
* <li>Request {@link Long#MAX_VALUE} from the parent producer, the first time the child requests anything.</li>
133+
* </ul>
134+
*
135+
* Since, this is only applicable to this operator, it does not check for emissions from the source, as the source
136+
* is never expected to emit any item. Thus it is "lighter" weight than {@link ProducerArbiter}
137+
*/
138+
private static final class EmptyProducer implements Producer {
139+
140+
/*Total requested items till the time the alternate producer arrives.*/
141+
private long missedRequested; /*Guarded by this*/
142+
/*Producer from the alternate Observable for this operator*/
143+
private Producer altProducer; /*Guarded by this*/
144+
145+
@Override
146+
public void request(final long requested) {
147+
if (requested < 0) {
148+
throw new IllegalArgumentException("Requested items can not be negative.");
149+
}
150+
151+
if (requested == 0) {
152+
return;
153+
}
154+
155+
boolean requestToAlternate = false;
156+
157+
synchronized (this) {
158+
if (null == altProducer) {
159+
/*Accumulate requested till the time an alternate producer arrives.*/
160+
long r = this.missedRequested;
161+
long u = r + requested;
162+
if (u < 0) {
163+
u = Long.MAX_VALUE;
164+
}
165+
this.missedRequested = u;
166+
} else {
167+
/*If the alternate producer exists, then relay a valid request. The missed requested will be
168+
requested from the alt producer on setProducer()*/
169+
requestToAlternate = true;
170+
}
171+
}
172+
173+
if (requestToAlternate) {
174+
altProducer.request(requested);
175+
}
176+
}
177+
178+
private void setAltProducer(Producer altProducer) {
179+
if (null == altProducer) {
180+
throw new IllegalArgumentException("Producer can not be null.");
181+
}
182+
183+
boolean requestToAlternate = false;
184+
185+
synchronized (this) {
186+
if (0 != missedRequested) {
187+
/*Something was requested from the source Observable, relay that to the new producer*/
188+
requestToAlternate = true;
189+
}
190+
this.altProducer = altProducer;
191+
}
192+
193+
if (requestToAlternate) {
194+
this.altProducer.request(missedRequested);
195+
}
196+
}
197+
}
198+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable;
19+
import rx.Observable.Operator;
20+
import rx.Producer;
21+
import rx.Subscriber;
22+
import rx.observers.SerializedSubscriber;
23+
24+
/**
25+
* Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied
26+
* alternate {@code Observable}. The errors from source are propagated as-is.
27+
*
28+
* @param <T> the source value type
29+
* @param <R> the result value type
30+
*/
31+
public final class OperatorMergeEmptyWith<T, R> implements Operator<R, T> {
32+
33+
private final Observable<? extends R> alternate;
34+
35+
public OperatorMergeEmptyWith(Observable<? extends R> alternate) {
36+
this.alternate = alternate;
37+
}
38+
39+
@Override
40+
public Subscriber<? super T> call(final Subscriber<? super R> child) {
41+
final ChildSubscriber wrappedChild = new ChildSubscriber(child);
42+
final ParentSubscriber parent = new ParentSubscriber(wrappedChild);
43+
wrappedChild.add(parent);
44+
alternate.unsafeSubscribe(wrappedChild);
45+
return parent;
46+
}
47+
48+
private final class ParentSubscriber extends Subscriber<T> {
49+
50+
private final ChildSubscriber child;
51+
52+
ParentSubscriber(ChildSubscriber child) {
53+
this.child = child;
54+
}
55+
56+
@Override
57+
public void setProducer(final Producer producer) {
58+
/*
59+
* Always request Max from the parent as we never really expect the parent to emit an item, so the
60+
* actual value does not matter. However, if the parent producer is waiting for a request to emit
61+
* a terminal event, not requesting the same will cause the merged Observable to never complete.
62+
*/
63+
producer.request(Long.MAX_VALUE);
64+
}
65+
66+
@Override
67+
public void onCompleted() {
68+
child.parentCompleted();
69+
}
70+
71+
@Override
72+
public void onError(Throwable e) {
73+
child.onError(e);
74+
}
75+
76+
@Override
77+
public void onNext(T t) {
78+
onError(new IllegalStateException("Merge empty with source emitted an item: " + t));
79+
}
80+
}
81+
82+
private final class ChildSubscriber extends SerializedSubscriber<R> {
83+
84+
private final Subscriber<? super R> delegate;
85+
private boolean parentCompleted; /*Guarded by this*/
86+
private boolean childCompleted; /*Guarded by this*/
87+
88+
ChildSubscriber(Subscriber<? super R> delegate) {
89+
super(delegate);
90+
this.delegate = delegate;
91+
}
92+
93+
@Override
94+
public void onCompleted() {
95+
synchronized (this) {
96+
if (parentCompleted) {
97+
delegate.onCompleted();
98+
}
99+
childCompleted = true;
100+
}
101+
}
102+
103+
@Override
104+
public void onError(Throwable e) {
105+
delegate.onError(e);
106+
}
107+
108+
@Override
109+
public void onNext(R r) {
110+
delegate.onNext(r);
111+
}
112+
113+
public void parentCompleted() {
114+
synchronized (this) {
115+
if (childCompleted) {
116+
delegate.onCompleted();
117+
}
118+
parentCompleted = true;
119+
}
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)