Skip to content

Commit d250ae7

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Fix Flowable.elementAt on empty sources. Plus sync tests (#4707)
1 parent 212db45 commit d250ae7

File tree

2 files changed

+63
-3
lines changed

2 files changed

+63
-3
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public void onError(Throwable t) {
8787

8888
@Override
8989
public void onComplete() {
90-
if (index <= count && !done) {
90+
if (!done) {
9191
done = true;
9292
T v = defaultValue;
9393
if (v == null) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableElementAtTest.java

+62-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import io.reactivex.*;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.internal.subscriptions.BooleanSubscription;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
import java.util.List;
1824
import java.util.NoSuchElementException;
1925
import org.junit.Test;
20-
21-
import io.reactivex.Flowable;
26+
import org.reactivestreams.*;
2227

2328
public class FlowableElementAtTest {
2429

@@ -175,4 +180,59 @@ public void elementAtOrErrorIndex1OnEmptySource() {
175180
.test()
176181
.assertFailure(NoSuchElementException.class);
177182
}
183+
184+
185+
@Test
186+
public void doubleOnSubscribe() {
187+
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
188+
@Override
189+
public Publisher<Object> apply(Flowable<Object> o) throws Exception {
190+
return o.elementAt(0).toFlowable();
191+
}
192+
});
193+
}
194+
195+
@Test
196+
public void elementAtIndex1WithDefaultOnEmptySourceObservable() {
197+
Flowable.empty()
198+
.elementAt(1, 10)
199+
.toFlowable()
200+
.test()
201+
.assertResult(10);
202+
}
203+
204+
@Test
205+
public void errorFlowable() {
206+
Flowable.error(new TestException())
207+
.elementAt(1, 10)
208+
.toFlowable()
209+
.test()
210+
.assertFailure(TestException.class);
211+
}
212+
213+
@Test
214+
public void badSource() {
215+
List<Throwable> errors = TestHelper.trackPluginErrors();
216+
try {
217+
new Flowable<Integer>() {
218+
@Override
219+
protected void subscribeActual(Subscriber<? super Integer> subscriber) {
220+
subscriber.onSubscribe(new BooleanSubscription());
221+
222+
subscriber.onNext(1);
223+
subscriber.onNext(2);
224+
subscriber.onError(new TestException());
225+
subscriber.onComplete();
226+
}
227+
}
228+
.elementAt(0)
229+
.toFlowable()
230+
.test()
231+
.assertResult(1);
232+
233+
TestHelper.assertError(errors, 0, TestException.class);
234+
} finally {
235+
RxJavaPlugins.reset();
236+
}
237+
}
178238
}

0 commit comments

Comments
 (0)