Skip to content

Commit 6647bd5

Browse files
Merge pull request #621 from Applied-Duality/CleanFixes
SerialSubscription & From
2 parents cfb8f5f + 3921da7 commit 6647bd5

File tree

5 files changed

+94
-72
lines changed

5 files changed

+94
-72
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/subscriptions/SerialSubscription.scala

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,15 @@ object SerialSubscription {
3535
/**
3636
* Represents a [[rx.lang.scala.Subscription]] that can be checked for status.
3737
*/
38-
class SerialSubscription private[scala] (serial: rx.subscriptions.SerialSubscription) extends Subscription {
38+
class SerialSubscription private[scala] (override val asJavaSubscription: rx.subscriptions.SerialSubscription) extends Subscription {
3939

40-
/*
41-
* As long as rx.subscriptions.SerialSubscription has no isUnsubscribed,
42-
* we need to intercept and do it ourselves.
43-
*/
44-
override val asJavaSubscription: rx.subscriptions.SerialSubscription = new rx.subscriptions.SerialSubscription() {
45-
override def unsubscribe(): Unit = {
46-
if(unsubscribed.compareAndSet(false, true)) { serial.unsubscribe() }
47-
}
48-
override def setSubscription(subscription: rx.Subscription): Unit = serial.setSubscription(subscription)
49-
override def getSubscription(): rx.Subscription = serial.getSubscription()
50-
}
40+
override def unsubscribe(): Unit = asJavaSubscription.unsubscribe()
41+
override def isUnsubscribed: Boolean = asJavaSubscription.isUnsubscribed
5142

52-
def subscription_=(value: Subscription): this.type = { asJavaSubscription.setSubscription(value.asJavaSubscription); this }
43+
def subscription_=(value: Subscription): this.type = {
44+
asJavaSubscription.setSubscription(value.asJavaSubscription)
45+
this
46+
}
5347
def subscription: Subscription = Subscription(asJavaSubscription.getSubscription)
5448

5549
}

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public static <T> Observable<T> error(Throwable exception, Scheduler scheduler)
731731
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
732732
*/
733733
public static <T> Observable<T> from(Iterable<? extends T> iterable) {
734-
return create(OperationToObservableIterable.toObservableIterable(iterable));
734+
return from(iterable, Schedulers.currentThread());
735735
}
736736

737737
/**
@@ -750,7 +750,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
750750
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140.aspx">MSDN: Observable.ToObservable</a>
751751
*/
752752
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
753-
return from(iterable).observeOn(scheduler);
753+
return create(OperationToObservableIterable.toObservableIterable(iterable, scheduler));
754754
}
755755

756756
/**
@@ -763,14 +763,35 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler s
763763
* {@link Subscription} is returned, it is not possible to unsubscribe from
764764
* the sequence before it completes.
765765
*
766-
* @param items the source sequence
766+
* @param items the source array
767767
* @param <T> the type of items in the Array and the type of items to be
768768
* emitted by the resulting Observable
769769
* @return an Observable that emits each item in the source Array
770770
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
771771
*/
772772
public static <T> Observable<T> from(T[] items) {
773-
return create(OperationToObservableIterable.toObservableIterable(Arrays.asList(items)));
773+
return from(Arrays.asList(items));
774+
}
775+
776+
/**
777+
* Converts an Array into an Observable.
778+
* <p>
779+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
780+
* <p>
781+
* Note: the entire array is immediately emitted each time an
782+
* {@link Observer} subscribes. Since this occurs before the
783+
* {@link Subscription} is returned, it is not possible to unsubscribe from
784+
* the sequence before it completes.
785+
*
786+
* @param items the source array
787+
* @param scheduler the scheduler to emit the items of the array
788+
* @param <T> the type of items in the Array and the type of items to be
789+
* emitted by the resulting Observable
790+
* @return an Observable that emits each item in the source Array
791+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#from">RxJava Wiki: from()</a>
792+
*/
793+
public static <T> Observable<T> from(T[] items, Scheduler scheduler) {
794+
return from(Arrays.asList(items), scheduler);
774795
}
775796

776797
/**
@@ -827,7 +848,7 @@ public static <T> Observable<T> from(T t1, T t2) {
827848
* subscribes. Since this occurs before the {@link Subscription} is
828849
* returned, it is not possible to unsubscribe from the sequence before it
829850
* completes.
830-
*
851+
*
831852
* @param t1 first item
832853
* @param t2 second item
833854
* @param t3 third item
@@ -1012,11 +1033,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
10121033
* <p>
10131034
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/from.png">
10141035
* <p>
1015-
* Note: the items will be immediately emitted each time an {@link Observer}
1016-
* subscribes. Since this occurs before the {@link Subscription} is
1017-
* returned, it is not possible to unsubscribe from the sequence before it
1018-
* completes.
1019-
*
10201036
* @param t1 first item
10211037
* @param t2 second item
10221038
* @param t3 third item
@@ -1044,11 +1060,6 @@ public static <T> Observable<T> from(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T
10441060
* <p>
10451061
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/range.png">
10461062
* <p>
1047-
* Note: the entire range is immediately emitted each time an
1048-
* {@link Observer} subscribes. Since this occurs before the
1049-
* {@link Subscription} is returned, it is not possible to unsubscribe from
1050-
* the sequence before it completes.
1051-
*
10521063
* @param start the value of the first Integer in the sequence
10531064
* @param count the number of sequential Integers to generate
10541065
* @return an Observable that emits a range of sequential Integers
@@ -1073,7 +1084,7 @@ public static Observable<Integer> range(int start, int count) {
10731084
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896.aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
10741085
*/
10751086
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
1076-
return range(start, count).observeOn(scheduler);
1087+
return from(Range.createWithCount(start, count), scheduler);
10771088
}
10781089

10791090
/**
@@ -1120,10 +1131,7 @@ public static <T> Observable<T> defer(Func0<? extends Observable<? extends T>> o
11201131
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
11211132
*/
11221133
public static <T> Observable<T> just(T value) {
1123-
List<T> list = new ArrayList<T>();
1124-
list.add(value);
1125-
1126-
return from(list);
1134+
return from(Arrays.asList((value)));
11271135
}
11281136

11291137
/**
@@ -1142,7 +1150,7 @@ public static <T> Observable<T> just(T value) {
11421150
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#just">RxJava Wiki: just()</a>
11431151
*/
11441152
public static <T> Observable<T> just(T value, Scheduler scheduler) {
1145-
return just(value).observeOn(scheduler);
1153+
return from(Arrays.asList((value)), scheduler);
11461154
}
11471155

11481156
/**

rxjava-core/src/main/java/rx/operators/OperationToObservableIterable.java

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717

1818
import rx.Observable.OnSubscribeFunc;
1919
import rx.Observer;
20+
import rx.Scheduler;
2021
import rx.Subscription;
22+
import rx.schedulers.Schedulers;
2123
import rx.subscriptions.Subscriptions;
24+
import rx.util.functions.Action0;
25+
import rx.util.functions.Action1;
26+
27+
import java.util.Iterator;
2228

2329
/**
2430
* Converts an Iterable sequence into an Observable.
@@ -30,24 +36,42 @@
3036
*/
3137
public final class OperationToObservableIterable<T> {
3238

39+
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
40+
return new ToObservableIterable<T>(list, scheduler);
41+
}
42+
3343
public static <T> OnSubscribeFunc<T> toObservableIterable(Iterable<? extends T> list) {
34-
return new ToObservableIterable<T>(list);
44+
return toObservableIterable(list, Schedulers.currentThread());
3545
}
3646

3747
private static class ToObservableIterable<T> implements OnSubscribeFunc<T> {
38-
public ToObservableIterable(Iterable<? extends T> list) {
48+
49+
public ToObservableIterable(Iterable<? extends T> list, Scheduler scheduler) {
3950
this.iterable = list;
51+
this.scheduler = scheduler;
4052
}
4153

42-
public Iterable<? extends T> iterable;
43-
44-
public Subscription onSubscribe(Observer<? super T> observer) {
45-
for (T item : iterable) {
46-
observer.onNext(item);
47-
}
48-
observer.onCompleted();
54+
Scheduler scheduler;
55+
final Iterable<? extends T> iterable;
4956

50-
return Subscriptions.empty();
57+
public Subscription onSubscribe(final Observer<? super T> observer) {
58+
final Iterator<? extends T> iterator = iterable.iterator();
59+
return scheduler.schedule(new Action1<Action0>() {
60+
@Override
61+
public void call(Action0 self) {
62+
try {
63+
if (iterator.hasNext()) {
64+
T x = iterator.next();
65+
observer.onNext(x);
66+
self.call();
67+
} else {
68+
observer.onCompleted();
69+
}
70+
} catch (Exception e) {
71+
observer.onError(e);
72+
}
73+
}
74+
});
5175
}
5276
}
5377
}

rxjava-core/src/test/java/rx/ObservableWindowTests.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,21 @@ public class ObservableWindowTests {
3030
@Test
3131
public void testWindow() {
3232
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
33-
Observable.from(1, 2, 3, 4, 5, 6)
34-
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {
3533

36-
@Override
37-
public List<Integer> call(Observable<Integer> o) {
38-
return o.toList().toBlockingObservable().single();
39-
}
34+
Observable.concat(Observable.from(1, 2, 3, 4, 5, 6).window(3).map(new Func1<Observable<Integer>, Observable<List<Integer>>>() {
35+
@Override
36+
public Observable<List<Integer>> call(Observable<Integer> xs) {
37+
return xs.toList();
38+
}
39+
})).toBlockingObservable().forEach(new Action1<List<Integer>>() {
4040

41-
}).toBlockingObservable().forEach(new Action1<List<Integer>>() {
41+
@Override
42+
public void call(List<Integer> xs) {
43+
lists.add(xs);
44+
}
45+
});
4246

43-
@Override
44-
public void call(List<Integer> t) {
45-
lists.add(t);
46-
}
47-
});
48-
49-
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
47+
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[]{1, 2, 3});
5048
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
5149
assertEquals(2, lists.size());
5250

rxjava-core/src/test/java/rx/operators/OperationWindowTest.java

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import rx.Observable;
2929
import rx.Observer;
3030
import rx.Subscription;
31+
import rx.schedulers.Schedulers;
3132
import rx.schedulers.TestScheduler;
3233
import rx.subscriptions.Subscriptions;
3334
import rx.util.functions.Action0;
@@ -44,21 +45,18 @@ public void before() {
4445
scheduler = new TestScheduler();
4546
}
4647

47-
private static <T> List<List<T>> toLists(Observable<Observable<T>> observable) {
48-
final List<T> list = new ArrayList<T>();
49-
final List<List<T>> lists = new ArrayList<List<T>>();
48+
private static <T> List<List<T>> toLists(Observable<Observable<T>> observables) {
5049

51-
observable.subscribe(new Action1<Observable<T>>() {
50+
final List<List<T>> lists = new ArrayList<List<T>>();
51+
Observable.concat(observables.map(new Func1<Observable<T>, Observable<List<T>>>() {
5252
@Override
53-
public void call(Observable<T> tObservable) {
54-
tObservable.subscribe(new Action1<T>() {
55-
@Override
56-
public void call(T t) {
57-
list.add(t);
58-
}
59-
});
60-
lists.add(new ArrayList<T>(list));
61-
list.clear();
53+
public Observable<List<T>> call(Observable<T> xs) {
54+
return xs.toList();
55+
}
56+
})).toBlockingObservable().forEach(new Action1<List<T>>() {
57+
@Override
58+
public void call(List<T> xs) {
59+
lists.add(xs);
6260
}
6361
});
6462
return lists;
@@ -90,7 +88,7 @@ public void testSkipAndCountGaplessEindows() {
9088

9189
@Test
9290
public void testOverlappingWindows() {
93-
Observable<String> subject = Observable.from("zero", "one", "two", "three", "four", "five");
91+
Observable<String> subject = Observable.from(new String[]{"zero", "one", "two", "three", "four", "five"}, Schedulers.currentThread());
9492
Observable<Observable<String>> windowed = Observable.create(window(subject, 3, 1));
9593

9694
List<List<String>> windows = toLists(windowed);

0 commit comments

Comments
 (0)