diff --git a/src/main/java/rx/internal/operators/OperatorElementAt.java b/src/main/java/rx/internal/operators/OperatorElementAt.java index 844eb8bbad..19a156dfa2 100644 --- a/src/main/java/rx/internal/operators/OperatorElementAt.java +++ b/src/main/java/rx/internal/operators/OperatorElementAt.java @@ -15,7 +15,10 @@ */ package rx.internal.operators; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Observable.Operator; +import rx.Producer; import rx.Subscriber; /** @@ -45,25 +48,23 @@ private OperatorElementAt(int index, T defaultValue, boolean hasDefault) { } @Override - public Subscriber call(final Subscriber subscriber) { - return new Subscriber(subscriber) { + public Subscriber call(final Subscriber child) { + Subscriber parent = new Subscriber() { private int currentIndex = 0; @Override public void onNext(T value) { - if (currentIndex == index) { - subscriber.onNext(value); - subscriber.onCompleted(); - } else { - request(1); + if (currentIndex++ == index) { + child.onNext(value); + child.onCompleted(); + unsubscribe(); } - currentIndex++; } @Override public void onError(Throwable e) { - subscriber.onError(e); + child.onError(e); } @Override @@ -71,14 +72,46 @@ public void onCompleted() { if (currentIndex <= index) { // If "subscriber.onNext(value)" is called, currentIndex must be greater than index if (hasDefault) { - subscriber.onNext(defaultValue); - subscriber.onCompleted(); + child.onNext(defaultValue); + child.onCompleted(); } else { - subscriber.onError(new IndexOutOfBoundsException(index + " is out of bounds")); + child.onError(new IndexOutOfBoundsException(index + " is out of bounds")); } } } + + @Override + public void setProducer(Producer p) { + child.setProducer(new InnerProducer(p)); + } }; + child.add(parent); + + return parent; + } + /** + * A producer that wraps another Producer and requests Long.MAX_VALUE + * when the first positive request() call comes in. + */ + static class InnerProducer extends AtomicBoolean implements Producer { + /** */ + private static final long serialVersionUID = 1L; + + final Producer actual; + + public InnerProducer(Producer actual) { + this.actual = actual; + } + @Override + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required"); + } + if (n > 0 && compareAndSet(false, true)) { + // trigger the fast-path since the operator is going + // to skip all but the indexth element + actual.request(Long.MAX_VALUE); + } + } } - }