Skip to content

3.x: elementAt, first - constrain upstream requests #6620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9494,8 +9494,7 @@ public final Flowable<T> doOnTerminate(final Action onTerminate) {
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAt.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAt} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9507,7 +9506,7 @@ public final Flowable<T> doOnTerminate(final Action onTerminate) {
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> elementAt(long index) {
if (index < 0) {
Expand All @@ -9523,8 +9522,7 @@ public final Maybe<T> elementAt(long index) {
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAt} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9541,7 +9539,7 @@ public final Maybe<T> elementAt(long index) {
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAt(long index, T defaultItem) {
if (index < 0) {
Expand All @@ -9558,8 +9556,7 @@ public final Single<T> elementAt(long index, T defaultItem) {
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9573,7 +9570,7 @@ public final Single<T> elementAt(long index, T defaultItem) {
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> elementAtOrError(long index) {
if (index < 0) {
Expand Down Expand Up @@ -9617,8 +9614,7 @@ public final Flowable<T> filter(Predicate<? super T> predicate) {
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstElement.m.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstElement} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9627,7 +9623,7 @@ public final Flowable<T> filter(Predicate<? super T> predicate) {
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Maybe<T> firstElement() {
return elementAt(0);
Expand All @@ -9640,8 +9636,7 @@ public final Maybe<T> firstElement() {
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/first.s.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code first} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9653,7 +9648,7 @@ public final Maybe<T> firstElement() {
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> first(T defaultItem) {
return elementAt(0, defaultItem);
Expand All @@ -9666,8 +9661,7 @@ public final Single<T> first(T defaultItem) {
* <img width="640" height="237" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
* unbounded manner (i.e., without applying backpressure).</dd>
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in a bounded manner.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -9676,7 +9670,7 @@ public final Single<T> first(T defaultItem) {
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
@BackpressureSupport(BackpressureKind.FULL) // take may trigger UNBOUNDED_IN
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> firstOrError() {
return elementAtOrError(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
s.request(index + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
s.request(Long.MAX_VALUE);
s.request(index + 1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivex.disposables.Disposables;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.functions.LongConsumer;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -69,6 +70,38 @@ public void elementAt() {
assertEquals(2, Flowable.fromArray(1, 2).elementAt(1).blockingGet()
.intValue());
}

@Test
public void elementAtConstrainsUpstreamRequests() {
final List<Long> requests = new ArrayList<Long>();
Flowable.fromArray(1, 2, 3, 4)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Throwable {
requests.add(n);
}
})
.elementAt(2)
.blockingGet()
.intValue();
assertEquals(Arrays.asList(3L), requests);
}

@Test
public void elementAtWithDefaultConstrainsUpstreamRequests() {
final List<Long> requests = new ArrayList<Long>();
Flowable.fromArray(1, 2, 3, 4)
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) throws Throwable {
requests.add(n);
}
})
.elementAt(2, 100)
.blockingGet()
.intValue();
assertEquals(Arrays.asList(3L), requests);
}

@Test(expected = IndexOutOfBoundsException.class)
public void elementAtWithMinusIndex() {
Expand Down