Skip to content

Adding super/extends so that Observable is covariant #331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Sep 4, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
34942ad
making Func0 covariant in its return type, cleaning up a few warnings…
Aug 31, 2013
52342a2
added variance to Func1 (hopefully) everywhere...
Aug 31, 2013
f8cba6a
added variance to Func2, too
Aug 31, 2013
21bc549
added variance to all other Func*; this breaks Scala for good, it see…
Aug 31, 2013
69c6e80
added variance to Action*, too
Aug 31, 2013
0438505
lots of Observer<? super X>
Aug 31, 2013
6b9867c
updated to Scala 2.10.2 again, repaired Scala tests, generalized two …
Aug 31, 2013
6bd2033
UnitTest confirming compilation failure without super/extends and suc…
benjchristensen Aug 31, 2013
98cd711
Need to stay pinned on Scala 2.10.1 still …
benjchristensen Aug 31, 2013
ac6a0a1
Zip: Order of Generics and Artities 5-9
benjchristensen Aug 31, 2013
ebaa616
Merge pull request #1 from benjchristensen/super-extends-additions
Sep 1, 2013
a127b06
adapted RxImplicits tests againt zip to new generics order, renamed t…
Sep 1, 2013
eba4857
generalized everything in Observable that deals with covariance of ob…
Sep 1, 2013
29289d1
Timestamped, Notification and Future are now also treated as covariant
Sep 1, 2013
78a0a1b
added an unnecessary explicit cast because the Jenkins java compiler …
Sep 1, 2013
1ca5900
Generalized all the operators, too
Sep 1, 2013
04f35cd
generalized BlockingObservable and the execution hook further
Sep 1, 2013
e962d00
added a few 'compiler' tests
Sep 1, 2013
68d181b
removed some <? super Throwable>s because that's rather unnecessary
Sep 4, 2013
51dd848
Merged in master so that the gradle pull request build has a chance t…
Sep 4, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object RxImplicits {
* type never escapes the for-comprehension
*/
implicit class ScalaObservable[A](wrapped: Observable[A]) {
def map[B](f: A => B): Observable[B] = wrapped.map(f)
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
Expand Down Expand Up @@ -147,7 +147,7 @@ class UnitTestSuite extends JUnitSuite {
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
var t: Thread = null

override def subscribe(observer: Observer[String]): Subscription = {
override def subscribe(observer: Observer[_ >: String]): Subscription = {
println("ObservableWithException subscribed to ...")
t = new Thread(new Runnable() {
override def run() {
Expand Down Expand Up @@ -248,7 +248,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFlattenMerge {
val observable = Observable.from(Observable.from(1, 2, 3))
val merged = Observable.merge(observable)
val merged = Observable.merge[Int](observable)
assertSubscribeReceives(merged)(1, 2, 3)
}

Expand All @@ -272,6 +272,18 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(synchronized)(1, 2, 3)
}

@Test def testZip2() {
val colors: Observable[String] = Observable.from("red", "green", "blue")
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")

case class Character(color: String, name: String)

val cheetara = Character("green", "cheetara")
val panthro = Character("blue", "panthro")
val characters = Observable.zip[String, String, Character](colors, names, Character.apply _)
assertSubscribeReceives(characters)(cheetara, panthro)
}

@Test def testZip3() {
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
Expand All @@ -283,7 +295,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara")
val panthro = Character(3, "blue", "panthro")

val characters = Observable.zip(numbers, colors, names, Character.apply _)
val characters = Observable.zip[Int, String, String, Character](numbers, colors, names, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand All @@ -299,7 +311,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara", false)
val panthro = Character(3, "blue", "panthro", false)

val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
val characters = Observable.zip[Int, String, String, Boolean, Character](numbers, colors, names, isLeader, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand Down Expand Up @@ -338,7 +350,8 @@ class UnitTestSuite extends JUnitSuite {
@Test def testMap {
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
val mappedNumbers = ArrayBuffer.empty[Int]
numbers.map((x: Int) => x * x).subscribe((squareVal: Int) => {
val mapped: Observable[Int] = numbers map ((x: Int) => x * x)
mapped.subscribe((squareVal: Int) => {
mappedNumbers.append(squareVal)
})
assertEquals(List(1, 4, 9, 16, 25, 36, 49, 64, 81), mappedNumbers.toList)
Expand Down Expand Up @@ -458,18 +471,9 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(skipped)(3, 4)
}

/**
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
* it should produce onNext(first), onNext(second), and 1 onCompleted
*
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
*/
@Test def testTake {
import rx.operators._

val observable = Observable.from(1, 2, 3, 4, 5)
val took = Observable.create(OperationTake.take(observable, 2))
val took = observable.take(2)
assertSubscribeReceives(took)(1, 2)
}

Expand All @@ -479,11 +483,11 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(took)(1, 3, 5)
}

/*@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
assertSubscribeReceives(took)(9, 11)
}*/
@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
}

@Test def testTakeLast {
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
Expand Down Expand Up @@ -559,7 +563,7 @@ class UnitTestSuite extends JUnitSuite {

@Test def testFilterInForComprehension {
val doubler = (i: Int) => Observable.from(i, i)
val filteredObservable = for {
val filteredObservable: Observable[Int] = for {
i: Int <- Observable.from(1, 2, 3, 4)
j: Int <- doubler(i) if isOdd(i)
} yield j
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package rx.android.concurrency;

import android.os.Handler;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import rx.Scheduler;
import rx.Subscription;
import rx.operators.SafeObservableSubscription;
Expand Down Expand Up @@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
}

Expand All @@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
final Scheduler _scheduler = this;
handler.postDelayed(new Runnable() {
Expand All @@ -76,6 +78,7 @@ public static final class UnitTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand All @@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
final Object state = new Object();
@SuppressWarnings("unchecked")
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private SwingScheduler() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
EventQueue.invokeLater(new Runnable() {
@Override
Expand All @@ -75,7 +75,7 @@ public void call() {
}

@Override
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
long delay = unit.toMillis(dueTime);
assertThatTheDelayIsValidForTheSwingTimer(delay);
Expand Down Expand Up @@ -113,7 +113,7 @@ public void call() {
}

@Override
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
final AtomicReference<Timer> timer = new AtomicReference<Timer>();

final long delay = unit.toMillis(period);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public enum AbstractButtonSource { ; // no instances
* @see SwingObservable.fromButtonAction
*/
public static Observable<ActionEvent> fromActionOf(final AbstractButton button) {
return Observable.create(new Func1<Observer<ActionEvent>, Subscription>() {
return Observable.create(new Func1<Observer<? super ActionEvent>, Subscription>() {
@Override
public Subscription call(final Observer<ActionEvent> observer) {
public Subscription call(final Observer<? super ActionEvent> observer) {
final ActionListener listener = new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ public enum ComponentEventSource { ; // no instances
* @see SwingObservable.fromComponentEvents
*/
public static Observable<ComponentEvent> fromComponentEventsOf(final Component component) {
return Observable.create(new Func1<Observer<ComponentEvent>, Subscription>() {
return Observable.create(new Func1<Observer<? super ComponentEvent>, Subscription>() {
@Override
public Subscription call(final Observer<ComponentEvent> observer) {
public Subscription call(final Observer<? super ComponentEvent> observer) {
final ComponentListener listener = new ComponentListener() {
@Override
public void componentHidden(ComponentEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ public enum KeyEventSource { ; // no instances
* @see SwingObservable.fromKeyEvents(Component)
*/
public static Observable<KeyEvent> fromKeyEventsOf(final Component component) {
return Observable.create(new Func1<Observer<KeyEvent>, Subscription>() {
return Observable.create(new Func1<Observer<? super KeyEvent>, Subscription>() {
@Override
public Subscription call(final Observer<KeyEvent> observer) {
public Subscription call(final Observer<? super KeyEvent> observer) {
final KeyListener listener = new KeyListener() {
@Override
public void keyPressed(KeyEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public enum MouseEventSource { ; // no instances
* @see SwingObservable.fromMouseEvents
*/
public static Observable<MouseEvent> fromMouseEventsOf(final Component component) {
return Observable.create(new Func1<Observer<MouseEvent>, Subscription>() {
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
@Override
public Subscription call(final Observer<MouseEvent> observer) {
public Subscription call(final Observer<? super MouseEvent> observer) {
final MouseListener listener = new MouseListener() {
@Override
public void mouseClicked(MouseEvent event) {
Expand Down Expand Up @@ -78,9 +78,9 @@ public void call() {
* @see SwingObservable.fromMouseMotionEvents
*/
public static Observable<MouseEvent> fromMouseMotionEventsOf(final Component component) {
return Observable.create(new Func1<Observer<MouseEvent>, Subscription>() {
return Observable.create(new Func1<Observer<? super MouseEvent>, Subscription>() {
@Override
public Subscription call(final Observer<MouseEvent> observer) {
public Subscription call(final Observer<? super MouseEvent> observer) {
final MouseMotionListener listener = new MouseMotionListener() {
@Override
public void mouseDragged(MouseEvent event) {
Expand Down
Loading