Skip to content

Commit fcf13c2

Browse files
Merge pull request #430 from zsxwing/issue-428
Fixed issue #428
2 parents 67b3f4d + ede1288 commit fcf13c2

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,11 @@ public void emitChunk(Chunk<T, C> chunk) {
187187
return;
188188
}
189189

190-
subscription.unsubscribe();
190+
// Fixed issue 428.
191+
// As unsubscribe will cancel the Future, and the currrent thread's interrupt status
192+
// will be set. So we need to emit the chunk before unsubscribe.
191193
super.emitChunk(chunk);
194+
subscription.unsubscribe();
192195
createChunk();
193196
}
194197

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

+38
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.assertFalse;
19+
1820
import java.util.ArrayList;
1921
import java.util.List;
22+
import java.util.concurrent.CountDownLatch;
2023
import java.util.concurrent.TimeUnit;
2124

2225
import org.junit.Before;
@@ -37,6 +40,7 @@
3740
import rx.util.Opening;
3841
import rx.util.Openings;
3942
import rx.util.functions.Action0;
43+
import rx.util.functions.Action1;
4044
import rx.util.functions.Func0;
4145
import rx.util.functions.Func1;
4246

@@ -631,6 +635,40 @@ public Subscription onSubscribe(Observer<? super Closing> observer) {
631635
inOrder.verify(observer, Mockito.times(1)).onCompleted();
632636
}
633637

638+
@Test
639+
public void testLongTimeAction() throws InterruptedException {
640+
final CountDownLatch latch = new CountDownLatch(1);
641+
LongTimeAction action = new LongTimeAction(latch);
642+
Observable.from(1).buffer(10, TimeUnit.MILLISECONDS, 10)
643+
.subscribe(action);
644+
latch.await();
645+
assertFalse(action.fail);
646+
}
647+
648+
private static class LongTimeAction implements Action1<List<Integer>> {
649+
650+
CountDownLatch latch;
651+
boolean fail = false;
652+
653+
public LongTimeAction(CountDownLatch latch) {
654+
this.latch = latch;
655+
}
656+
657+
@Override
658+
public void call(List<Integer> t1) {
659+
try {
660+
if (fail) {
661+
return;
662+
}
663+
Thread.sleep(200);
664+
} catch (InterruptedException e) {
665+
fail = true;
666+
} finally {
667+
latch.countDown();
668+
}
669+
}
670+
}
671+
634672
private List<String> list(String... args) {
635673
List<String> list = new ArrayList<String>();
636674
for (String arg : args) {

0 commit comments

Comments
 (0)