Skip to content

New operators: concatEmptyWith and mergeEmptyWith. #3060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 83 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
009b3e2
New operators: `concatEmptyWith` and `mergeEmptyWith`.
Jul 2, 2015
d77a655
Revert "No need to allocate a new head node."
abersnaze Jul 17, 2015
fb2e540
Revert "Operator replay() now supports backpressure"
abersnaze Jul 17, 2015
09957fb
Revert "If cache() now supports backpressure, correct javadocs to ind…
abersnaze Jul 17, 2015
a641321
Revert "cache now supports backpressure"
abersnaze Jul 17, 2015
ad29ab2
Fix autoConnect calling onStart twice.
Jul 20, 2015
aa6361a
Private toObservable renamed to asObservable
benjchristensen Jul 20, 2015
048b435
Single.toObservable
benjchristensen Jul 20, 2015
59b539b
1.0.13
benjchristensen Jul 20, 2015
79c7cd2
Fix request != 0 checking in the scalar paths of merge()
Jul 20, 2015
ccddec4
reduce probability of ExecutorSchedulerTest.testOnBackpressureDrop fa…
davidmoten Jul 16, 2015
efefdb7
break tests as approach timeout so that don't fail on slow machines
davidmoten Jul 16, 2015
336327f
Add "since" annotations to javadocs for new Experimental/Beta methods
DavidMGross Jul 14, 2015
b5cec41
window() behavior changed, so did marble diagram & thus its size
DavidMGross Jul 21, 2015
56da24d
remove OperatorOnErrorFlatMap because unused
davidmoten Jul 24, 2015
e886052
Unit tests and cleanup of JCTools' queues.
akarnokd Jul 24, 2015
1aab0f7
Fix take swallowing exception if thrown by the exactly the nth onNext
akarnokd Jul 24, 2015
2f815f7
Test coverage of rx.functions utility methods.
akarnokd Jul 24, 2015
36167ff
cache() now supports backpressure (again)
akarnokd Jul 27, 2015
9263fc2
Movet LinkedArrayListTest to the test section.
akarnokd Jul 27, 2015
e0476ed
Operator replay() now supports backpressure (again)
akarnokd Jul 27, 2015
038e1ac
No InterruptedException with synchronous BlockingObservable
ypresto Jul 29, 2015
bee4f4b
Improve performance of NewThreadWorker.tryEnableCancelPolicy().
artem-zinnatullin Jul 29, 2015
8b5cbb0
Fix retry with predicate ignoring backpressure.
Aug 3, 2015
04c03fb
add backpressure to OperatorMaterialize
davidmoten Jul 23, 2015
34dfe3b
Add links to page that explains The Observable Contract
DavidMGross Aug 6, 2015
86f071c
Implemented Observable.x(ConversionFunc) to allow external extensions…
Jul 16, 2015
662c954
eliminate javadoc compiler warnings, add "since" stub
DavidMGross Aug 7, 2015
6287105
Correct scheduler memory leak test for from(Executor) and added check
Aug 9, 2015
829c6ce
Fix for BackpressureUtils method javadoc
artem-zinnatullin Aug 10, 2015
893b417
Remove redundant cast in Exceptions
artem-zinnatullin Aug 10, 2015
df9d3cb
Remove unnecessary static modifier
artem-zinnatullin Aug 10, 2015
e31c0d3
fix SynchronizedQueue.equals
davidmoten Jul 24, 2015
7980405
Remove redundant type parameter in EmptyAction
jacek-rzrz Jul 29, 2015
9004776
Test coverage for the observers package.
akarnokd Jul 27, 2015
25bbcf1
OperatorSwitch - fix lost requests race condition using ProducerArbiter
davidmoten Jul 13, 2015
adbbd61
FromIterable overhead reduction.
Aug 7, 2015
55dccd0
Range overhead reduction
Aug 11, 2015
996f366
Remove redundant final modifier from static method in Actions
artem-zinnatullin Aug 10, 2015
6808ce9
Version 1.0.14
stevegury Aug 12, 2015
3b8976d
Update README.md
benjchristensen Aug 21, 2015
fac460d
catch onCompleted unsubscribe error and report to RxJavaPlugin error …
davidmoten Aug 14, 2015
b757ece
BackpressureUtils capped add/multiply methods + tests
akarnokd Aug 22, 2015
49692e4
Fixed negative request due to unsubscription of a large requester
akarnokd Aug 20, 2015
6d4f25c
MapNotification producer NPE fix
Aug 24, 2015
5804bed
Scan backpressure and first emission fix
akarnokd Aug 21, 2015
e5ec3f0
Fix to Notification equals method.
wrightm Aug 27, 2015
619abfb
Refactored exception reporting of most operators.
Aug 24, 2015
3dcfe18
Implementing the SyncOnSubscribe
Jul 29, 2015
55fcd41
Fixing concurrent unsubscribe case of SyncOnSubscribe
Sep 2, 2015
f2a8400
Add Observable.fromCallable() as a companion for Observable.defer()
artem-zinnatullin Aug 14, 2015
2df3c64
test/subjects: Use statically imported never() methods
Turbo87 Sep 11, 2015
9cc4b1b
BehaviorSubjectTest: Fix verification in testCompletedAfterErrorIsNot…
Turbo87 Sep 11, 2015
eb945d8
BehaviorSubjectTest: Simplify testUnsubscriptionCase() test
Turbo87 Sep 11, 2015
1e56564
Implemented the AsyncOnSubscribe
Aug 27, 2015
df2d934
Lint fixes for unnecessary unboxing.
Sep 21, 2015
135312e
Use ternary for comparison in place of Long.compareTo for Java 6 supp…
Sep 21, 2015
9334b2a
Remove unused field updater from SubjectSubscriptionManager
artem-zinnatullin Sep 21, 2015
f9ab970
Fix typo in a comment inside Observable.subscribe
stevegury Sep 28, 2015
d6aabba
Make field final and remove unnecessary unboxing in OnSubscribeRedo.R…
artem-zinnatullin Sep 20, 2015
8859bcf
Fix synchronization on non-final field in BufferUntilSubscriber
artem-zinnatullin Sep 20, 2015
3dd46e6
Fix to a bunch of bugs and issues with AsyncOnSubscribe
akarnokd Sep 19, 2015
6f73a02
Fix for take() reentrancy bug.
akarnokd Sep 29, 2015
bf0f3e9
Schedulers shutdown capability.
Aug 12, 2015
bf1359f
Hiding start(), moved test to compensate.
akarnokd Sep 30, 2015
730be58
pull back the Experimental/Beta of the changes until 1.1.x (+1 squash…
akarnokd Oct 1, 2015
29775c9
Added warning to `Observable.doOnRequest` javadoc.
Sep 29, 2015
9522235
DoOnEach: report both original exception and callback exception.
akarnokd Oct 2, 2015
53c523c
Remove unused private method from CachedObservable and make "state" f…
artem-zinnatullin Sep 20, 2015
79f2ae7
Safer error handling in BlockingOperatorToFuture
artem-zinnatullin Sep 20, 2015
c929e0d
Remove unnecessary onStart in OperatorGroupBy
Sep 11, 2015
4cac9f5
Eager concatMap
akarnokd Oct 3, 2015
3bafb53
Make BlockingOperatorToIterator exert backpressure.
vqvu Sep 16, 2015
782b0e7
Request data in batches.
vqvu Sep 16, 2015
2d0ff46
BlockingObservable + subscribe methods.
akarnokd Oct 8, 2015
7aef4f8
Update README.md
abersnaze Oct 8, 2015
324514a
Renaming Observable#x to Observable#extend
Oct 8, 2015
50c2234
Add Single.doOnError()
artem-zinnatullin Oct 8, 2015
6634212
Add Single.fromCallable()
artem-zinnatullin Oct 8, 2015
ffcd875
New operators: `concatEmptyWith` and `mergeEmptyWith`.
Jul 2, 2015
22f6ba7
Add Single.doOnSuccess()
artem-zinnatullin Oct 8, 2015
8569ff2
Merge branch '1.x' of https://github.com/Netflix/RxJava into 1.x
Oct 9, 2015
3f6ed32
Merge branch '1.x' of https://github.com/NiteshKant/RxJava into 1.x
Oct 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9986,6 +9986,75 @@ public final <T2, R> Observable<R> zipWith(Observable<? extends T2> other, Func2
return zip(this, other, zipFunction);
}

/**
* Returns an Observable that upon completion of the source Observable subscribes to the passed {@code other}
* Observable and then emits all items emitted by that Observable. This function does not expect the source
* Observable to emit any item, in case, the source Observable, emits any item, an {@link IllegalStateException}
* is raised.
* <p>
*
* This is different than {@link #concatWith(Observable)} as it does not expect the source Observable to ever emit
* an item. So, this usually is useful for {@code Observable<Void>} and results in cleaner code as opposed to using
* a {@link #cast(Class)}, something like:
*
* {@code Observable.<Void>empty().cast(String.class).concatWith(Observable.just("Hello"))}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* <dt><b>Backpressure:</b></dt>
* <dd>{@code concatEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
* {@code Observable}.</dd>
*
* @return an Observable that upon completion of the source, starts emitting items from the {@code other}
* Observable.
* @throws IllegalStateException If the source emits any item.
*
* @see #mergeEmptyWith(Observable)
*/
@Experimental
public final <R> Observable<R> concatEmptyWith(Observable<R> other) {
return lift(new OperatorConcatEmptyWith<T, R>(other));
}

/**
* Returns an Observable that only listens for error from the source Observable and emit items only from the passed
* {@code other} Observable. This function does not expect the source Observable to emit any item, in case, the
* source Observable, emits any item, an {@link IllegalStateException} is raised.
* <p>
*
* This is different than {@link #mergeWith(Observable)} as it does not expect the source Observable to ever emit
* an item. So, this usually is useful for using on {@code Observable<Void>} and results in cleaner code as opposed
* to using a {@link #cast(Class)}, something like:
* {@code Observable.<Void>empty().cast(String.class).mergeWith(Observable.just("Hello"))}
*
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeEmptyWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>{@code mergeEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable}
* as it never expects the source to ever emit an item. All demands are sent to the {@code other}
* {@code Observable}.</dd>
* </dl>
*
* @return an Observable that only listens for errors from the source and starts emitting items from the
* {@code other} Observable on subscription.
* Observable.
* @throws IllegalStateException If the source emits any item.
*
* @see #concatEmptyWith(Observable)
*/
@Experimental
public final <R> Observable<R> mergeEmptyWith(Observable<R> other) {
return lift(new OperatorMergeEmptyWith<T, R>(other));
}

/**
* An Observable that never sends any information to an {@link Observer}.
* This Observable is useful primarily for testing purposes.
Expand Down
203 changes: 203 additions & 0 deletions src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.internal.producers.ProducerArbiter;
import rx.subscriptions.SerialSubscription;

/**
* Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied
* alternate {@code Observable} after the source completes.
*
* @param <T> the source value type
* @param <R> the result value type
*/
public final class OperatorConcatEmptyWith<T, R> implements Operator<R, T> {

private final Observable<? extends R> alternate;

public OperatorConcatEmptyWith(Observable<? extends R> alternate) {
this.alternate = alternate;
}

@Override
public Subscriber<? super T> call(Subscriber<? super R> child) {
final SerialSubscription ssub = new SerialSubscription();
final ParentSubscriber parent = new ParentSubscriber(child, ssub, alternate);
ssub.set(parent);
child.add(ssub);
child.setProducer(parent.emptyProducer);
return parent;
}

private final class ParentSubscriber extends Subscriber<T> {

private final Subscriber<? super R> child;
private final SerialSubscription ssub;
private final EmptyProducer emptyProducer;
private final Observable<? extends R> alternate;

ParentSubscriber(Subscriber<? super R> child, final SerialSubscription ssub, Observable<? extends R> alternate) {
this.child = child;
this.ssub = ssub;
this.emptyProducer = new EmptyProducer();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use ProducerArbiter instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my reasoning down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

this.alternate = alternate;
}

@Override
public void setProducer(final Producer producer) {
/*
* Always request Max from the parent as we never really expect the parent to emit an item, so the
* actual value does not matter. However, if the parent producer is waiting for a request to emit
* a terminal event, not requesting the same will cause a deadlock of the parent never completing and
* the child never subscribed.
*/
producer.request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
if (!child.isUnsubscribed()) {
AlternateSubscriber as = new AlternateSubscriber(child, emptyProducer);
ssub.set(as);
alternate.unsafeSubscribe(as);
}
}

@Override
public void onError(Throwable e) {
child.onError(e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest calling unsubscribe() here and or in onNext to stop a non-empty producer as soon as possible.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I was assuming that there will be a SafeSubscriber down the line which will unsubscribe post onError() and we don't need it here. Is this a general practice we follow in all operators?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, if an onError or onCompleted comes the natural way, we can be fairly sure the upstream also cleaned up. However, if you turn an onNext into an onError or onCompleted, the upstream has no idea you have lost interest in further values and it can take a long time before an unsubscribe arrives from downstream. See the take() operator for instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually do not understand the need for calling unsubscribe() here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are calling onError from onNext and the upstream may not receive an unsubscribe in time and keeps emitting onNexts. Better yet, you should call unsubscribe in onNext to stop the upstream immediately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is designed to be concatenating an Observable<Void>, which legally should not be emitting any item. Should we be guarding against such illegal sources?

}

@Override
public void onNext(T t) {
onError(new IllegalStateException("Concat empty with source emitted an item: " + t));
}
}

private final class AlternateSubscriber extends Subscriber<R> {

private final EmptyProducer emptyProducer;
private final Subscriber<? super R> child;

AlternateSubscriber(Subscriber<? super R> child, EmptyProducer emptyProducer) {
this.child = child;
this.emptyProducer = emptyProducer;
}

@Override
public void setProducer(final Producer producer) {
emptyProducer.setAltProducer(producer);
}

@Override
public void onCompleted() {
child.onCompleted();
}

@Override
public void onError(Throwable e) {
child.onError(e);
}

@Override
public void onNext(R r) {
child.onNext(r);
}
}

/**
* This is a producer implementation that does the following:
*
* <ul>
* <li>If the alternate producer has not yet arrived, store the total requested count from downstream.</li>
* <li>If the alternate producer has arrived, then relay the request demand to it.</li>
* <li>Request {@link Long#MAX_VALUE} from the parent producer, the first time the child requests anything.</li>
* </ul>
*
* Since, this is only applicable to this operator, it does not check for emissions from the source, as the source
* is never expected to emit any item. Thus it is "lighter" weight than {@link ProducerArbiter}
*/
private static final class EmptyProducer implements Producer {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is unnecessary, the functionality is already covered by ProducerArbiter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started off with ProducerArbiter and then thought that the functionality in ProducerArbiter is much larger than what is required here primarily because of the fact that the first producer here would never ever emit an item and hence the use case that ProducerArbiter is solving of resuming from an unsatisfied request from downstream, is a potential overhead. I would love to benchmark if there is a difference at all (although by sheer LOC it seems like an overhead) with this approach.

I have put this reasoning behind this class in the javadoc as a disclaimer :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.


/*Total requested items till the time the alternate producer arrives.*/
private long missedRequested; /*Guarded by this*/
/*Producer from the alternate Observable for this operator*/
private Producer altProducer; /*Guarded by this*/

@Override
public void request(final long requested) {
if (requested < 0) {
throw new IllegalArgumentException("Requested items can not be negative.");
}

if (requested == 0) {
return;
}

boolean requestToAlternate = false;
Producer _altProducer;
synchronized (this) {
if (null == altProducer) {
/*Accumulate requested till the time an alternate producer arrives.*/
long r = this.missedRequested;
long u = r + requested;
if (u < 0) {
u = Long.MAX_VALUE;
}
this.missedRequested = u;
} else {
/*If the alternate producer exists, then relay a valid request. The missed requested will be
requested from the alt producer on setProducer()*/
requestToAlternate = true;
}

_altProducer = altProducer;
}

if (requestToAlternate) {
_altProducer.request(requested);
}
}

private void setAltProducer(final Producer altProducer) {
if (null == altProducer) {
throw new IllegalArgumentException("Producer can not be null.");
}

boolean requestToAlternate = false;
long _missedRequested;

synchronized (this) {
if (0 != missedRequested) {
/*Something was requested from the source Observable, relay that to the new producer*/
requestToAlternate = true;
}

this.altProducer = altProducer;
_missedRequested = missedRequested;
}

if (requestToAlternate) {
altProducer.request(_missedRequested);
}
}
}
}
Loading