diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 919548fd32..f9c2d98ab9 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9017,7 +9017,7 @@ public final Observable> window(int count) { * *
*
Backpressure Support:
- *
The operator has limited backpressure support. If {@code count} == {@code skip}, the operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables + *
The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables * but each of them will emit at most {@code count} elements.
*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index ed22a68bd6..62763f1948 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -48,9 +48,13 @@ public OperatorWindowWithSize(int size, int skip) { @Override public Subscriber call(Subscriber> child) { if (skip == size) { - return new ExactSubscriber(child); + ExactSubscriber e = new ExactSubscriber(child); + e.init(); + return e; } - return new InexactSubscriber(child); + InexactSubscriber ie = new InexactSubscriber(child); + ie.init(); + return ie; } /** Subscriber with exact, non-overlapping window bounds. */ final class ExactSubscriber extends Subscriber { @@ -58,7 +62,6 @@ final class ExactSubscriber extends Subscriber { int count; BufferUntilSubscriber window; volatile boolean noWindow = true; - final Subscription parentSubscription = this; public ExactSubscriber(Subscriber> child) { /** * See https://github.com/ReactiveX/RxJava/issues/1546 @@ -69,13 +72,15 @@ public ExactSubscriber(Subscriber> child) { /* * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) */ + } + void init() { child.add(Subscriptions.create(new Action0() { @Override public void call() { // if no window we unsubscribe up otherwise wait until window ends if (noWindow) { - parentSubscription.unsubscribe(); + unsubscribe(); } } @@ -111,7 +116,7 @@ public void onNext(T t) { window = null; noWindow = true; if (child.isUnsubscribed()) { - parentSubscription.unsubscribe(); + unsubscribe(); return; } } @@ -139,7 +144,7 @@ final class InexactSubscriber extends Subscriber { final Subscriber> child; int count; final List> chunks = new LinkedList>(); - final Subscription parentSubscription = this; + volatile boolean noWindow = true; public InexactSubscriber(Subscriber> child) { /** @@ -148,6 +153,9 @@ public InexactSubscriber(Subscriber> child) { * applies to the outer, not the inner. */ this.child = child; + } + + void init() { /* * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) */ @@ -156,24 +164,38 @@ public InexactSubscriber(Subscriber> child) { @Override public void call() { // if no window we unsubscribe up otherwise wait until window ends - if (chunks == null || chunks.size() == 0) { - parentSubscription.unsubscribe(); + if (noWindow) { + unsubscribe(); } } })); - } - - @Override - public void onStart() { - // no backpressure as we are controlling data flow by window size - request(Long.MAX_VALUE); + + child.setProducer(new Producer() { + @Override + public void request(long n) { + if (n > 0) { + long u = n * size; + if (((u >>> 31) != 0) && (u / n != size)) { + u = Long.MAX_VALUE; + } + requestMore(u); + } + } + }); } + void requestMore(long n) { + request(n); + } + @Override public void onNext(T t) { if (count++ % skip == 0) { if (!child.isUnsubscribed()) { + if (chunks.isEmpty()) { + noWindow = false; + } CountedSubject cs = createCountedSubject(); chunks.add(cs); child.onNext(cs.producer); @@ -189,9 +211,11 @@ public void onNext(T t) { cs.consumer.onCompleted(); } } - if (chunks.size() == 0 && child.isUnsubscribed()) { - parentSubscription.unsubscribe(); - return; + if (chunks.isEmpty()) { + noWindow = true; + if (child.isUnsubscribed()) { + unsubscribe(); + } } } @@ -199,6 +223,7 @@ public void onNext(T t) { public void onError(Throwable e) { List> list = new ArrayList>(chunks); chunks.clear(); + noWindow = true; for (CountedSubject cs : list) { cs.consumer.onError(e); } @@ -209,6 +234,7 @@ public void onError(Throwable e) { public void onCompleted() { List> list = new ArrayList>(chunks); chunks.clear(); + noWindow = true; for (CountedSubject cs : list) { cs.consumer.onCompleted(); } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java index dd80a06a38..cac94c5ba0 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithTime.java @@ -15,21 +15,17 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; -import rx.Observable; + +import rx.*; import rx.Observable.Operator; -import rx.Observer; -import rx.Scheduler; import rx.Scheduler.Worker; -import rx.Subscriber; +import rx.Observable; +import rx.Observer; import rx.functions.Action0; -import rx.observers.SerializedObserver; -import rx.observers.SerializedSubscriber; +import rx.observers.*; +import rx.subscriptions.Subscriptions; /** * Creates windows of values into the source sequence with timed window creation, length and size bounds. @@ -62,15 +58,16 @@ public OperatorWindowWithTime(long timespan, long timeshift, TimeUnit unit, int @Override public Subscriber call(Subscriber> child) { Worker worker = scheduler.createWorker(); - child.add(worker); if (timespan == timeshift) { ExactSubscriber s = new ExactSubscriber(child, worker); + s.add(worker); s.scheduleExact(); return s; } InexactSubscriber s = new InexactSubscriber(child, worker); + s.add(worker); s.startNewChunk(); s.scheduleChunk(); return s; @@ -118,11 +115,19 @@ final class ExactSubscriber extends Subscriber { volatile State state; public ExactSubscriber(Subscriber> child, Worker worker) { - super(child); this.child = new SerializedSubscriber>(child); this.worker = worker; this.guard = new Object(); this.state = State.empty(); + child.add(Subscriptions.create(new Action0() { + @Override + public void call() { + // if there is no active window, unsubscribe the upstream + if (state.consumer == null) { + unsubscribe(); + } + } + })); } @Override @@ -132,7 +137,6 @@ public void onStart() { @Override public void onNext(T t) { - List localQueue; synchronized (guard) { if (emitting) { if (queue == null) { @@ -141,29 +145,29 @@ public void onNext(T t) { queue.add(t); return; } - localQueue = queue; - queue = null; emitting = true; } - boolean once = true; boolean skipFinal = false; try { - do { - drain(localQueue); - if (once) { - once = false; - emitValue(t); - } + if (!emitValue(t)) { + return; + } + + for (;;) { + List localQueue; synchronized (guard) { localQueue = queue; - queue = null; if (localQueue == null) { emitting = false; skipFinal = true; return; } + queue = null; + } + if (!drain(localQueue)) { + return; } - } while (!child.isUnsubscribed()); + } } finally { if (!skipFinal) { synchronized (guard) { @@ -172,13 +176,15 @@ public void onNext(T t) { } } } - void drain(List queue) { + boolean drain(List queue) { if (queue == null) { - return; + return true; } for (Object o : queue) { if (o == NEXT_SUBJECT) { - replaceSubject(); + if (!replaceSubject()) { + return false; + } } else if (nl.isError(o)) { error(nl.getError(o)); @@ -190,23 +196,35 @@ void drain(List queue) { } else { @SuppressWarnings("unchecked") T t = (T)o; - emitValue(t); + if (!emitValue(t)) { + return false; + } } } + return true; } - void replaceSubject() { + boolean replaceSubject() { Observer s = state.consumer; if (s != null) { s.onCompleted(); } + // if child has unsubscribed, unsubscribe upstream instead of opening a new window + if (child.isUnsubscribed()) { + state = state.clear(); + unsubscribe(); + return false; + } BufferUntilSubscriber bus = BufferUntilSubscriber.create(); state = state.create(bus, bus); child.onNext(bus); + return true; } - void emitValue(T t) { + boolean emitValue(T t) { State s = state; if (s.consumer == null) { - replaceSubject(); + if (!replaceSubject()) { + return false; + } s = state; } s.consumer.onNext(t); @@ -217,6 +235,7 @@ void emitValue(T t) { s = s.next(); } state = s; + return true; } @Override @@ -285,7 +304,6 @@ public void call() { }, 0, timespan, unit); } void nextWindow() { - List localQueue; synchronized (guard) { if (emitting) { if (queue == null) { @@ -294,29 +312,29 @@ void nextWindow() { queue.add(NEXT_SUBJECT); return; } - localQueue = queue; - queue = null; emitting = true; } - boolean once = true; boolean skipFinal = false; try { - do { - drain(localQueue); - if (once) { - once = false; - replaceSubject(); - } + if (!replaceSubject()) { + return; + } + for (;;) { + List localQueue; synchronized (guard) { localQueue = queue; - queue = null; if (localQueue == null) { emitting = false; skipFinal = true; return; } + queue = null; } - } while (!child.isUnsubscribed()); + + if (!drain(localQueue)) { + return; + } + } } finally { if (!skipFinal) { synchronized (guard) { diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java index ed8333e5ec..9dade31fbc 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java @@ -16,18 +16,21 @@ package rx.internal.operators; import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Test; +import org.junit.*; -import static org.mockito.Mockito.*; import rx.*; +import rx.Observable.OnSubscribe; import rx.Observable; import rx.Observer; import rx.functions.*; +import rx.internal.util.UtilityFunctions; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -245,4 +248,80 @@ public void onCompleted() { verify(o, times(1)).onCompleted(); // 1 inner } + public static Observable hotStream() { + return Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber s) { + while (!s.isUnsubscribed()) { + // burst some number of items + for (int i = 0; i < Math.random() * 20; i++) { + s.onNext(i); + } + try { + // sleep for a random amount of time + // NOTE: Only using Thread.sleep here as an artificial demo. + Thread.sleep((long) (Math.random() * 200)); + } catch (Exception e) { + // do nothing + } + } + System.out.println("Hot done."); + } + }).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block + } + + @Test + public void testTakeFlatMapCompletes() { + TestSubscriber ts = new TestSubscriber(); + + final int indicator = 999999999; + + hotStream() + .window(10) + .take(2) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(Observable w) { + return w.startWith(indicator); + } + }).subscribe(ts); + + ts.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts.assertCompleted(); + Assert.assertFalse(ts.getOnNextEvents().isEmpty()); + } + + @Test + @SuppressWarnings("unchecked") + public void testBackpressureOuterInexact() { + TestSubscriber> ts = new TestSubscriber>(0); + + Observable.range(1, 5).window(2, 1) + .map(new Func1, Observable>>() { + @Override + public Observable> call(Observable t) { + return t.toList(); + } + }).concatMap(UtilityFunctions.>>identity()) + .subscribe(ts); + + ts.assertNoErrors(); + ts.assertNoValues(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(Arrays.asList(1, 2), Arrays.asList(2, 3)); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(5); + + System.out.println(ts.getOnNextEvents()); + + ts.assertValues(Arrays.asList(1, 2), Arrays.asList(2, 3), + Arrays.asList(3, 4), Arrays.asList(4, 5), Arrays.asList(5)); + ts.assertNoErrors(); + ts.assertCompleted(); + } } \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithTimeTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithTimeTest.java index 22e6906463..34c3739c88 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithTimeTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithTimeTest.java @@ -26,6 +26,7 @@ import rx.Observable; import rx.Observer; import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; public class OperatorWindowWithTimeTest { @@ -171,4 +172,25 @@ public void testExactWindowSize() { assertEquals(Arrays.asList(10), lists.get(3)); } + @Test + public void testTakeFlatMapCompletes() { + TestSubscriber ts = new TestSubscriber(); + + final int indicator = 999999999; + + OperatorWindowWithSizeTest.hotStream() + .window(300, TimeUnit.MILLISECONDS) + .take(10) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(Observable w) { + return w.startWith(indicator); + } + }).subscribe(ts); + + ts.awaitTerminalEvent(5, TimeUnit.SECONDS); + ts.assertCompleted(); + Assert.assertFalse(ts.getOnNextEvents().isEmpty()); + } + }