Skip to content

Commit fd936e7

Browse files
author
David Hoepelman
committed
Add FutureToObservable and rename ObservableExtensions to IterableToObservable
1 parent 30ec23a commit fd936e7

File tree

3 files changed

+129
-61
lines changed

3 files changed

+129
-61
lines changed

src/main/scala/rx/lang/scala/package.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
*/
1616
package rx.lang
1717

18-
import _root_.scala.util.{Try, Success, Failure}
18+
import _root_.scala.util.Try
19+
import _root_.scala.concurrent.{ExecutionContext, Future}
1920

2021
/**
2122
* This package contains all classes that RxScala users need.
@@ -24,11 +25,11 @@ import _root_.scala.util.{Try, Success, Failure}
2425
*/
2526
package object scala {
2627

27-
/**
28-
* Placeholder for extension methods into Observable[T] from other types
29-
*/
30-
implicit class ObservableExtensions[T](val source: Iterable[T]) extends AnyVal {
31-
def toObservable: Observable[T] = { Observable.from(source) }
28+
@deprecated("Use IterableToObservable", "0.26.5")
29+
type ObservableExtensions[T] = IterableToObservable[T]
30+
31+
implicit class IterableToObservable[T](val iterable: Iterable[T]) extends AnyVal {
32+
def toObservable: Observable[T] = Observable.from(iterable)
3233
}
3334

3435
implicit class TryToObservable[T](val tryT: Try[T]) extends AnyVal {
@@ -38,4 +39,8 @@ package object scala {
3839
implicit class OptionToObservable[T](val opt: Option[T]) extends AnyVal {
3940
def toObservable: Observable[T] = Observable.from(opt)
4041
}
42+
43+
implicit class FutureToObservable[T](val future: Future[T]) extends AnyVal {
44+
def toObservable(implicit ec: ExecutionContext): Observable[T] = Observable.from(future)
45+
}
4146
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package rx.lang.scala
2+
3+
import org.junit.Test
4+
import org.scalatest.junit.JUnitSuite
5+
import rx.lang.scala.observers.TestSubscriber
6+
7+
import scala.concurrent.Future
8+
import scala.util.{Failure, Success, Try}
9+
10+
class ScalaTypesConversionsTests extends JUnitSuite {
11+
12+
@Test
13+
def testIterableConversion() = {
14+
val it = Seq("1", "2", "3")
15+
val observer = TestSubscriber[String]()
16+
it.toObservable.subscribe(observer)
17+
18+
observer.assertValues("1", "2", "3")
19+
observer.assertNoErrors()
20+
observer.assertCompleted()
21+
}
22+
23+
@Test
24+
def testIterableEmptyConversion() = {
25+
val it = List[String]()
26+
val observer = TestSubscriber[String]()
27+
it.toObservable.subscribe(observer)
28+
29+
observer.assertNoValues()
30+
observer.assertNoErrors()
31+
observer.assertCompleted()
32+
}
33+
34+
@Test
35+
def testTrySuccessConversion() = {
36+
val success = Success("abc")
37+
val observer = TestSubscriber[String]()
38+
success.toObservable.subscribe(observer)
39+
40+
observer.assertValue("abc")
41+
observer.assertNoErrors()
42+
observer.assertCompleted()
43+
}
44+
45+
@Test
46+
def testTryFailureConversion() = {
47+
val error = new IllegalArgumentException("test error")
48+
val failure = Failure[String](error)
49+
val observer = TestSubscriber[String]()
50+
failure.toObservable.subscribe(observer)
51+
52+
observer.assertNoValues()
53+
observer.assertError(error)
54+
observer.assertNotCompleted()
55+
}
56+
57+
@Test
58+
def testOptionSomeConversion() = {
59+
val some = Option("abc")
60+
val observer = TestSubscriber[String]()
61+
some.toObservable.subscribe(observer)
62+
63+
observer.assertValue("abc")
64+
observer.assertNoErrors()
65+
observer.assertCompleted()
66+
}
67+
68+
@Test
69+
def testOptionNoneConversion() = {
70+
val some = Option.empty[String]
71+
val observer = TestSubscriber[String]()
72+
some.toObservable.subscribe(observer)
73+
74+
observer.assertNoValues()
75+
observer.assertNoErrors()
76+
observer.assertCompleted()
77+
}
78+
79+
@Test
80+
def testFutureSuccessfulConversion() = {
81+
import scala.concurrent.ExecutionContext.Implicits.global
82+
val fut = Future.successful("abc")
83+
val observer = TestSubscriber[String]()
84+
fut.toObservable.subscribe(observer)
85+
86+
observer.awaitTerminalEvent()
87+
observer.assertValue("abc")
88+
observer.assertNoErrors()
89+
observer.assertCompleted()
90+
}
91+
92+
@Test
93+
def testFutureSuccessfulConversion2() = {
94+
import scala.concurrent.ExecutionContext.Implicits.global
95+
val fut = Future { "abc" }
96+
val observer = TestSubscriber[String]()
97+
fut.toObservable.subscribe(observer)
98+
99+
observer.awaitTerminalEvent()
100+
observer.assertValue("abc")
101+
observer.assertNoErrors()
102+
observer.assertCompleted()
103+
}
104+
105+
@Test
106+
def testFutureFailedConversion() = {
107+
import scala.concurrent.ExecutionContext.Implicits.global
108+
val error = new IllegalArgumentException("test error")
109+
val fut = Future.failed[Unit](error)
110+
val observer = TestSubscriber[Unit]()
111+
fut.toObservable.subscribe(observer)
112+
113+
observer.awaitTerminalEvent()
114+
observer.assertNoValues()
115+
observer.assertError(error)
116+
observer.assertNotCompleted()
117+
}
118+
}

src/test/scala/rx/lang/scala/TryOptionConversionsTests.scala

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)