Skip to content

1.x: fix takeLast() backpressure #3839

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 89 additions & 10 deletions src/main/java/rx/internal/operators/BackpressureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.util.concurrent.atomic.*;

import rx.Subscriber;
import rx.functions.Func1;
import rx.internal.util.UtilityFunctions;

/**
* Utility functions for use with backpressure.
Expand Down Expand Up @@ -140,6 +142,59 @@ public static long addCap(long a, long b) {
* @param actual the subscriber to receive the values
*/
public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) {
postCompleteDone(requested, queue, actual, UtilityFunctions.<T>identity());
}

/**
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
*
* <p>
* Post-completion backpressure handles the case when a source produces values based on
* requests when it is active but more values are available even after its completion.
* In this case, the onCompleted() can't just emit the contents of the queue but has to
* coordinate with the requested amounts. This requires two distinct modes: active and
* completed. In active mode, requests flow through and the queue is not accessed but
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
*
* @param <T> the value type to emit
* @param requested the holder of current requested amount
* @param n the value requested;
* @param queue the queue holding values to be emitted after completion
* @param actual the subscriber to receive the values
* @return true if in the active mode and the request amount of n can be relayed to upstream, false if
* in the post-completed mode and the queue is draining.
*/
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
return postCompleteRequest(requested, n, queue, actual, UtilityFunctions.<T>identity());
}

/**
* Signals the completion of the main sequence and switches to post-completion replay mode
* and allows exit transformation on the queued values.
*
* <p>
* Don't modify the queue after calling this method!
*
* <p>
* Post-completion backpressure handles the case when a source produces values based on
* requests when it is active but more values are available even after its completion.
* In this case, the onCompleted() can't just emit the contents of the queue but has to
* coordinate with the requested amounts. This requires two distinct modes: active and
* completed. In active mode, requests flow through and the queue is not accessed but
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
* <p>
* The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
* request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
* allowed.
*
* @param <T> the value type in the queue
* @param <R> the value type to emit
* @param requested the holder of current requested amount
* @param queue the queue holding values to be emitted after completion
* @param actual the subscriber to receive the values
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
*/
public static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
for (;;) {
long r = requested.get();

Expand All @@ -156,15 +211,16 @@ public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Su
// are requests available start draining the queue
if (r != 0L) {
// if the switch happened when there was outstanding requests, start draining
postCompleteDrain(requested, queue, actual);
postCompleteDrain(requested, queue, actual, exitTransform);
}
return;
}
}
}

/**
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests
* and allows exit transformation on the queued values.
*
* <p>
* Post-completion backpressure handles the case when a source produces values based on
Expand All @@ -174,15 +230,17 @@ public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Su
* completed. In active mode, requests flow through and the queue is not accessed but
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
*
* @param <T> the value type to emit
* @param <T> the value type in the queue
* @param <R> the value type to emit
* @param requested the holder of current requested amount
* @param n the value requested;
* @param queue the queue holding values to be emitted after completion
* @param actual the subscriber to receive the values
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
* @return true if in the active mode and the request amount of n can be relayed to upstream, false if
* in the post-completed mode and the queue is draining.
*/
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
public static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
if (n < 0L) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
Expand All @@ -209,7 +267,7 @@ public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queu
// if there was no outstanding request before and in
// the post-completed state, start draining
if (r == COMPLETED_MASK) {
postCompleteDrain(requested, queue, actual);
postCompleteDrain(requested, queue, actual, exitTransform);
return false;
}
// returns true for active mode and false if the completed flag was set
Expand All @@ -219,16 +277,37 @@ public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queu
}

/**
* Drains the queue based on the outstanding requests in post-completed mode (only!).
* Drains the queue based on the outstanding requests in post-completed mode (only!)
* and allows exit transformation on the queued values.
*
* @param <T> the value type to emit
* @param <T> the value type in the queue
* @param <R> the value type to emit
* @param requested the holder of current requested amount
* @param queue the queue holding values to be emitted after completion
* @param actual the subscriber to receive the values
* @param subscriber the subscriber to receive the values
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
*/
static <T> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super T> subscriber) {
static <T, R> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T, ? extends R> exitTransform) {

long r = requested.get();

// Run on a fast-path if the downstream is unbounded
if (r == Long.MAX_VALUE) {
for (;;) {
if (subscriber.isUnsubscribed()) {
return;
}

T v = queue.poll();

if (v == null) {
subscriber.onCompleted();
return;
}

subscriber.onNext(exitTransform.call(v));
}
}
/*
* Since we are supposed to be in the post-complete state,
* requested will have its top bit set.
Expand Down Expand Up @@ -264,7 +343,7 @@ static <T> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscrib
return;
}

subscriber.onNext(v);
subscriber.onNext(exitTransform.call(v));

e++;
}
Expand Down
95 changes: 58 additions & 37 deletions src/main/java/rx/internal/operators/OperatorTakeLast.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
package rx.internal.operators;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Func1;

/**
* Returns an Observable that emits the at most the last <code>count</code> items emitted by the source Observable.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/last.png" alt="">
*
* @param <T> the value type
*/
public final class OperatorTakeLast<T> implements Operator<T, T> {

Expand All @@ -39,44 +42,62 @@ public OperatorTakeLast(int count) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final Deque<Object> deque = new ArrayDeque<Object>();
final NotificationLite<T> notification = NotificationLite.instance();
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, deque, subscriber);
subscriber.setProducer(producer);

return new Subscriber<T>(subscriber) {

// no backpressure up as it wants to receive and discard all but the last
final TakeLastSubscriber<T> parent = new TakeLastSubscriber<T>(subscriber, count);

subscriber.add(parent);
subscriber.setProducer(new Producer() {
@Override
public void onStart() {
// we do this to break the chain of the child subscriber being passed through
request(Long.MAX_VALUE);
public void request(long n) {
parent.requestMore(n);
}

@Override
public void onCompleted() {
deque.offer(notification.completed());
producer.startEmitting();
}

@Override
public void onError(Throwable e) {
deque.clear();
subscriber.onError(e);
});

return parent;
}

static final class TakeLastSubscriber<T> extends Subscriber<T> implements Func1<Object, T> {
final Subscriber<? super T> actual;
final AtomicLong requested;
final ArrayDeque<Object> queue;
final int count;
final NotificationLite<T> nl;

public TakeLastSubscriber(Subscriber<? super T> actual, int count) {
this.actual = actual;
this.count = count;
this.requested = new AtomicLong();
this.queue = new ArrayDeque<Object>();
this.nl = NotificationLite.instance();
}

@Override
public void onNext(T t) {
if (queue.size() == count) {
queue.poll();
}

@Override
public void onNext(T value) {
if (count == 0) {
// If count == 0, we do not need to put value into deque and
// remove it at once. We can ignore the value directly.
return;
}
if (deque.size() == count) {
deque.removeFirst();
}
deque.offerLast(notification.next(value));
queue.offer(nl.next(t));
}

@Override
public void onError(Throwable e) {
queue.clear();
actual.onError(e);
}

@Override
public void onCompleted() {
BackpressureUtils.postCompleteDone(requested, queue, actual, this);
}

@Override
public T call(Object t) {
return nl.getValue(t);
}

void requestMore(long n) {
if (n > 0L) {
BackpressureUtils.postCompleteRequest(requested, n, queue, actual, this);
}
};
}
}
}
Loading