Skip to content

New operators: concatEmptyWith and mergeEmptyWith. #3060

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 83 commits into from

Conversation

NiteshKant
Copy link
Member

As discussed in issue #3037, the primary use of these operators is to be applied to Observable<Void> so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.

@NiteshKant
Copy link
Member Author

Marked the new operators as @Experimental, once the review is complete, I will squash the commits into one.

@NiteshKant
Copy link
Member Author

Renamed operators to mergeEmptyWith and concatEmptyWith. Based on offline discussions with @abersnaze and @benjchristensen these names better suit the intent of these operators.

Also, removed the swithMap() + mergeWith/concatWith implementations with custom operators that are much lighter weight for this case (source never emits an item) vis-a-vis concat and merge.

@benjchristensen
Copy link
Member

Waiting on confirmation of names: #3037 (comment)

* an item. So, this usually is useful for {@code Observable<Void>} and results in cleaner code as opposed to using
* a {@link #cast(Class)}, something like:
*
{@code Observable.<Void>empty().cast(String.class).concatWith(Observable.just("Hello"))}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no * before the line

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@NiteshKant NiteshKant changed the title New operators: continueWith and mergeError. New operators: concatEmptyWith and mergeEmptyWith. Jul 14, 2015
/*Nothing from this producer will ever be requested as we never expect any items to be emitted from the
parent. The only thing w.r.t backpressure that is required is to store the requested count from the
child till the time the alternate observable is subscribed. So, this producer is just ignored and the
configured EmptyProducer on the child stores any buffered requested items till subscription to alternate.*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does the subscriber know the producer doesn't emit if we never request it too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if the this should call producer.request(Long.MAX_VALUE);.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the operator uses ProducerArbiter, the child request will reach the main source and there is no need for extra requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added producer.request(Long.MAX_VALUE) to the source Observable for both the operators.
There was a thought to send the request to the source only after the subscriber requested anything. Although, this made sense but in this context, it was not making much of a difference but adding complexity of synchronization between request(n) and setProducer(). The argument against being that for a source that never emits an item, should the terminal events be delayed till request(n) or a subscription indicates start of processing.

@benjchristensen
Copy link
Member

Thanks @akarnokd for the review.

As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.

Review comments
@NiteshKant
Copy link
Member Author

Apart from the open discussions, I think I am ready for another round of review.

The travis build failed because of the test rx.BackpressureTests > testMergeAsyncThenObserveOn failing. I don't think my changes have anything to do with that, is that a flaky test?

}

if (requestToAlternate) {
this.altProducer.request(missedRequested);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You access missedRequested and altProducer outside of their guard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@benjchristensen
Copy link
Member

@akarnokd It seems you have open questions/issues on this PR still, is that correct?

public void onCompleted() {
synchronized (this) {
if (parentCompleted) {
delegate.onCompleted();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't call onXXX methods while holding a lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duhh .. that was a bad oversight at my end. Thanks for catching it!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@akarnokd
Copy link
Member

@benjchristensen Found a few problems with this PR in the meantime.

@NiteshKant
Copy link
Member Author

@akarnokd I would wait for your further comments to make all changes together. So, get the review comments coming, they are good :)

@benjchristensen benjchristensen modified the milestone: 1.0.x Aug 28, 2015
@abersnaze
Copy link
Contributor

@NiteshKant is this ready to be merged?

stevegury and others added 25 commits October 9, 2015 13:00
sigificent -> significant
alreay -> already
…ed commit)

Squashed commits:
[c6e43fc] 1.0.15. Beta/Deprecation of Subject state peeking methods.

This should give users one release to prepare for the class structure
changes.
This came up in a
[Stackoverflow](http://stackoverflow.com/questions/32889008/do-operators-instead-of-a-whole-subscriber)
answer. If the doOnError's callback or the doOnEach's onError method
throws, any non-fatal exception replaced the original error which got
lost. This PR will wrap them both into a CompositeException.

2.x note: since Java 8 supports `addSuppressed` all callbacks in this
situation either attach to the original exception or the original
exception is attached to the callback's exception.
Slight change to make the distinction between `@Beta` and `@Experimental` explicit and meaningful.
As discussed in issue ReactiveX#3037, the primary use of these operators is to be applied to `Observable<Void>` so that they can be merged and concatenated with an Observable of a different type.

Both these operators raise an error if the source Observable emits any item.
Conflicts:
	src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java
	src/main/java/rx/internal/operators/OperatorMergeEmptyWith.java
@NiteshKant
Copy link
Member Author

I addressed the open comments apart from the suggestion of calling unsubscribe() post onError(), which I do not understand/agree with.

I see that my rebase caused a bazillion other commits to show here, may be I should close this and issue a new PR?

@akarnokd
Copy link
Member

akarnokd commented Oct 9, 2015

Please do create a new and clean PR.

@NiteshKant
Copy link
Member Author

@akarnokd created #3430 as a replacement for this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.