Skip to content

Commit 0fa142f

Browse files
committed
1.x: fix takeLast() backpressure (#3839)
1 parent 53c31cd commit 0fa142f

File tree

6 files changed

+396
-242
lines changed

6 files changed

+396
-242
lines changed

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.util.concurrent.atomic.*;
2020

2121
import rx.Subscriber;
22+
import rx.functions.Func1;
23+
import rx.internal.util.UtilityFunctions;
2224

2325
/**
2426
* Utility functions for use with backpressure.
@@ -140,6 +142,59 @@ public static long addCap(long a, long b) {
140142
* @param actual the subscriber to receive the values
141143
*/
142144
public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) {
145+
postCompleteDone(requested, queue, actual, UtilityFunctions.<T>identity());
146+
}
147+
148+
/**
149+
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
150+
*
151+
* <p>
152+
* Post-completion backpressure handles the case when a source produces values based on
153+
* requests when it is active but more values are available even after its completion.
154+
* In this case, the onCompleted() can't just emit the contents of the queue but has to
155+
* coordinate with the requested amounts. This requires two distinct modes: active and
156+
* completed. In active mode, requests flow through and the queue is not accessed but
157+
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
158+
*
159+
* @param <T> the value type to emit
160+
* @param requested the holder of current requested amount
161+
* @param n the value requested;
162+
* @param queue the queue holding values to be emitted after completion
163+
* @param actual the subscriber to receive the values
164+
* @return true if in the active mode and the request amount of n can be relayed to upstream, false if
165+
* in the post-completed mode and the queue is draining.
166+
*/
167+
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
168+
return postCompleteRequest(requested, n, queue, actual, UtilityFunctions.<T>identity());
169+
}
170+
171+
/**
172+
* Signals the completion of the main sequence and switches to post-completion replay mode
173+
* and allows exit transformation on the queued values.
174+
*
175+
* <p>
176+
* Don't modify the queue after calling this method!
177+
*
178+
* <p>
179+
* Post-completion backpressure handles the case when a source produces values based on
180+
* requests when it is active but more values are available even after its completion.
181+
* In this case, the onCompleted() can't just emit the contents of the queue but has to
182+
* coordinate with the requested amounts. This requires two distinct modes: active and
183+
* completed. In active mode, requests flow through and the queue is not accessed but
184+
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
185+
* <p>
186+
* The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
187+
* request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
188+
* allowed.
189+
*
190+
* @param <T> the value type in the queue
191+
* @param <R> the value type to emit
192+
* @param requested the holder of current requested amount
193+
* @param queue the queue holding values to be emitted after completion
194+
* @param actual the subscriber to receive the values
195+
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
196+
*/
197+
public static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
143198
for (;;) {
144199
long r = requested.get();
145200

@@ -156,15 +211,16 @@ public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Su
156211
// are requests available start draining the queue
157212
if (r != 0L) {
158213
// if the switch happened when there was outstanding requests, start draining
159-
postCompleteDrain(requested, queue, actual);
214+
postCompleteDrain(requested, queue, actual, exitTransform);
160215
}
161216
return;
162217
}
163218
}
164219
}
165220

166221
/**
167-
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
222+
* Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests
223+
* and allows exit transformation on the queued values.
168224
*
169225
* <p>
170226
* Post-completion backpressure handles the case when a source produces values based on
@@ -174,15 +230,17 @@ public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Su
174230
* completed. In active mode, requests flow through and the queue is not accessed but
175231
* in completed mode, requests no-longer reach the upstream but help in draining the queue.
176232
*
177-
* @param <T> the value type to emit
233+
* @param <T> the value type in the queue
234+
* @param <R> the value type to emit
178235
* @param requested the holder of current requested amount
179236
* @param n the value requested;
180237
* @param queue the queue holding values to be emitted after completion
181238
* @param actual the subscriber to receive the values
239+
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
182240
* @return true if in the active mode and the request amount of n can be relayed to upstream, false if
183241
* in the post-completed mode and the queue is draining.
184242
*/
185-
public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
243+
public static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
186244
if (n < 0L) {
187245
throw new IllegalArgumentException("n >= 0 required but it was " + n);
188246
}
@@ -209,7 +267,7 @@ public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queu
209267
// if there was no outstanding request before and in
210268
// the post-completed state, start draining
211269
if (r == COMPLETED_MASK) {
212-
postCompleteDrain(requested, queue, actual);
270+
postCompleteDrain(requested, queue, actual, exitTransform);
213271
return false;
214272
}
215273
// returns true for active mode and false if the completed flag was set
@@ -219,16 +277,37 @@ public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queu
219277
}
220278

221279
/**
222-
* Drains the queue based on the outstanding requests in post-completed mode (only!).
280+
* Drains the queue based on the outstanding requests in post-completed mode (only!)
281+
* and allows exit transformation on the queued values.
223282
*
224-
* @param <T> the value type to emit
283+
* @param <T> the value type in the queue
284+
* @param <R> the value type to emit
225285
* @param requested the holder of current requested amount
226286
* @param queue the queue holding values to be emitted after completion
227-
* @param actual the subscriber to receive the values
287+
* @param subscriber the subscriber to receive the values
288+
* @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
228289
*/
229-
static <T> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super T> subscriber) {
290+
static <T, R> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T, ? extends R> exitTransform) {
230291

231292
long r = requested.get();
293+
294+
// Run on a fast-path if the downstream is unbounded
295+
if (r == Long.MAX_VALUE) {
296+
for (;;) {
297+
if (subscriber.isUnsubscribed()) {
298+
return;
299+
}
300+
301+
T v = queue.poll();
302+
303+
if (v == null) {
304+
subscriber.onCompleted();
305+
return;
306+
}
307+
308+
subscriber.onNext(exitTransform.call(v));
309+
}
310+
}
232311
/*
233312
* Since we are supposed to be in the post-complete state,
234313
* requested will have its top bit set.
@@ -264,7 +343,7 @@ static <T> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscrib
264343
return;
265344
}
266345

267-
subscriber.onNext(v);
346+
subscriber.onNext(exitTransform.call(v));
268347

269348
e++;
270349
}

src/main/java/rx/internal/operators/OperatorTakeLast.java

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616
package rx.internal.operators;
1717

1818
import java.util.ArrayDeque;
19-
import java.util.Deque;
19+
import java.util.concurrent.atomic.AtomicLong;
2020

21+
import rx.*;
2122
import rx.Observable.Operator;
22-
import rx.Subscriber;
23+
import rx.functions.Func1;
2324

2425
/**
2526
* Returns an Observable that emits the at most the last <code>count</code> items emitted by the source Observable.
2627
* <p>
2728
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/last.png" alt="">
29+
*
30+
* @param <T> the value type
2831
*/
2932
public final class OperatorTakeLast<T> implements Operator<T, T> {
3033

@@ -39,44 +42,62 @@ public OperatorTakeLast(int count) {
3942

4043
@Override
4144
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
42-
final Deque<Object> deque = new ArrayDeque<Object>();
43-
final NotificationLite<T> notification = NotificationLite.instance();
44-
final TakeLastQueueProducer<T> producer = new TakeLastQueueProducer<T>(notification, deque, subscriber);
45-
subscriber.setProducer(producer);
46-
47-
return new Subscriber<T>(subscriber) {
48-
49-
// no backpressure up as it wants to receive and discard all but the last
45+
final TakeLastSubscriber<T> parent = new TakeLastSubscriber<T>(subscriber, count);
46+
47+
subscriber.add(parent);
48+
subscriber.setProducer(new Producer() {
5049
@Override
51-
public void onStart() {
52-
// we do this to break the chain of the child subscriber being passed through
53-
request(Long.MAX_VALUE);
50+
public void request(long n) {
51+
parent.requestMore(n);
5452
}
55-
56-
@Override
57-
public void onCompleted() {
58-
deque.offer(notification.completed());
59-
producer.startEmitting();
60-
}
61-
62-
@Override
63-
public void onError(Throwable e) {
64-
deque.clear();
65-
subscriber.onError(e);
53+
});
54+
55+
return parent;
56+
}
57+
58+
static final class TakeLastSubscriber<T> extends Subscriber<T> implements Func1<Object, T> {
59+
final Subscriber<? super T> actual;
60+
final AtomicLong requested;
61+
final ArrayDeque<Object> queue;
62+
final int count;
63+
final NotificationLite<T> nl;
64+
65+
public TakeLastSubscriber(Subscriber<? super T> actual, int count) {
66+
this.actual = actual;
67+
this.count = count;
68+
this.requested = new AtomicLong();
69+
this.queue = new ArrayDeque<Object>();
70+
this.nl = NotificationLite.instance();
71+
}
72+
73+
@Override
74+
public void onNext(T t) {
75+
if (queue.size() == count) {
76+
queue.poll();
6677
}
67-
68-
@Override
69-
public void onNext(T value) {
70-
if (count == 0) {
71-
// If count == 0, we do not need to put value into deque and
72-
// remove it at once. We can ignore the value directly.
73-
return;
74-
}
75-
if (deque.size() == count) {
76-
deque.removeFirst();
77-
}
78-
deque.offerLast(notification.next(value));
78+
queue.offer(nl.next(t));
79+
}
80+
81+
@Override
82+
public void onError(Throwable e) {
83+
queue.clear();
84+
actual.onError(e);
85+
}
86+
87+
@Override
88+
public void onCompleted() {
89+
BackpressureUtils.postCompleteDone(requested, queue, actual, this);
90+
}
91+
92+
@Override
93+
public T call(Object t) {
94+
return nl.getValue(t);
95+
}
96+
97+
void requestMore(long n) {
98+
if (n > 0L) {
99+
BackpressureUtils.postCompleteRequest(requested, n, queue, actual, this);
79100
}
80-
};
101+
}
81102
}
82103
}

0 commit comments

Comments
 (0)