|
26 | 26 |
|
27 | 27 | import rx.Observable;
|
28 | 28 | import rx.Observable.OnSubscribeFunc;
|
| 29 | +import rx.subscriptions.Subscriptions; |
29 | 30 | import rx.Observer;
|
30 | 31 | import rx.Subscription;
|
31 | 32 |
|
@@ -59,6 +60,26 @@ private static class TakeLast<T> implements OnSubscribeFunc<T> {
|
59 | 60 | }
|
60 | 61 |
|
61 | 62 | public Subscription onSubscribe(Observer<? super T> observer) {
|
| 63 | + if(count == 0) { |
| 64 | + items.subscribe(new Observer<T>() { |
| 65 | + |
| 66 | + @Override |
| 67 | + public void onCompleted() { |
| 68 | + } |
| 69 | + |
| 70 | + @Override |
| 71 | + public void onError(Throwable e) { |
| 72 | + } |
| 73 | + |
| 74 | + @Override |
| 75 | + public void onNext(T args) { |
| 76 | + } |
| 77 | + |
| 78 | + }).unsubscribe(); |
| 79 | + observer.onCompleted(); |
| 80 | + return Subscriptions.empty(); |
| 81 | + } |
| 82 | + |
62 | 83 | return subscription.wrap(items.subscribe(new ItemObserver(observer)));
|
63 | 84 | }
|
64 | 85 |
|
@@ -140,6 +161,19 @@ public void testTakeLast2() {
|
140 | 161 | verify(aObserver, times(1)).onCompleted();
|
141 | 162 | }
|
142 | 163 |
|
| 164 | + @Test |
| 165 | + public void testTakeLastWithZeroCount() { |
| 166 | + Observable<String> w = Observable.from("one"); |
| 167 | + Observable<String> take = Observable.create(takeLast(w, 0)); |
| 168 | + |
| 169 | + @SuppressWarnings("unchecked") |
| 170 | + Observer<String> aObserver = mock(Observer.class); |
| 171 | + take.subscribe(aObserver); |
| 172 | + verify(aObserver, never()).onNext("one"); |
| 173 | + verify(aObserver, never()).onError(any(Throwable.class)); |
| 174 | + verify(aObserver, times(1)).onCompleted(); |
| 175 | + } |
| 176 | + |
143 | 177 | }
|
144 | 178 |
|
145 | 179 | }
|
0 commit comments