Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6")
parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.12",
"io.reactivex" % "rxjava" % "1.0.16",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
22 changes: 1 addition & 21 deletions examples/src/test/scala/examples/ExperimentalAPIExamples.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,6 @@ object ExperimentalAPIExamples {
)
}

@Test def onBackpressureBlockExample(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).doOnNext(v => println(s"emit $v")).onBackpressureBlock.observeOn(IOScheduler()).subscribe {
v =>
Thread.sleep(10) // A slow consumer
println(s"process $v")
}
}

@Test def onBackpressureBlockExample2(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).doOnNext(v => println(s"emit $v")).onBackpressureBlock(10).observeOn(IOScheduler()).subscribe {
v =>
Thread.sleep(10) // A slow consumer
println(s"process $v")
}
}

@Test def doOnRequestExample(): Unit = {
(1 to 300).toObservable.doOnRequest(request => println(s"request $request")).subscribe(new Subscriber[Int]() {
override def onStart(): Unit = request(1)
Expand Down Expand Up @@ -112,7 +92,7 @@ object ExperimentalAPIExamples {

@Test def withLatestFromExample2(): Unit = {
val a = Observable.interval(1 second).take(7)
val b = Observable.timer(3 seconds, 250 millis)
val b = Observable.interval(3 seconds, 250 millis)
a.withLatestFrom(b)((x, y) => (x, y)).toBlocking.foreach {
case (x, y) => println(s"a: $x b: $y")
}
Expand Down
80 changes: 78 additions & 2 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,10 @@ class RxScalaDemo extends JUnitSuite {

@Test def timeoutExample2(): Unit = {
val firstTimeoutSelector = () => {
Observable.timer(10 seconds, 10 seconds, ComputationScheduler()).take(1)
Observable.interval(10 seconds, 10 seconds, ComputationScheduler()).take(1)
}
val timeoutSelector = (t: Long) => {
Observable.timer(
Observable.interval(
(500 - t * 100) max 1 millis,
(500 - t * 100) max 1 millis,
ComputationScheduler()).take(1)
Expand Down Expand Up @@ -1682,4 +1682,80 @@ class RxScalaDemo extends JUnitSuite {
val o2 = Observable.error(new RuntimeException("Oops"))
(o1 ++ o2).subscribe(v => println(v), e => e.printStackTrace(), () => println("completed"))
}

@Test def concatEagerExample(): Unit = {
val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(println("subscribe to o1"))
val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(println("subscribe to o2"))
o1.concatEager(o2).subscribe(println(_))
}

@Test def concatEagerExample2(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
}.toObservable.concatEager.subscribe(println(_))
}

@Test def concatEagerExample3(): Unit = {
(0 until 10).map { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
}.toObservable.concatEager(capacityHint = 3).subscribe(println(_))
}

@Test def concatMapEagerExample(): Unit = {
(0 until 10).toObservable.concatMapEager { i =>
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
}.subscribe(println(_))
}

@Test def concatMapEagerExample2(): Unit = {
(0 until 10).toObservable.concatMapEager(i => {
Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
}, capacityHint = 10).subscribe(println(_))
}

@Test def flattenDelayErrorExample() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace())
}

@Test def flattenDelayErrorExample2() {
val o1 = Observable.just(1).delay(200 millis).
flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace())
}

@Test def blockingObservableSubscribeExample(): Unit = {
val b = Observable.just(1, 2, 3).toBlocking
println("---b.subscribe()---")
b.subscribe()
println("---b.subscribe(onNext)---")
b.subscribe(println(_))
println("---b.subscribe(onNext, onError)---")
b.subscribe(println(_), _.printStackTrace())
println("---b.subscribe(onNext, onError, onCompleted)---")
b.subscribe(println(_), _.printStackTrace(), () => println("onCompleted"))
println("---b.subscribe(Observer)---")
b.subscribe(new Observer[Int] {
override def onNext(v: Int): Unit = println(v)

override def onError(e: Throwable): Unit = e.printStackTrace()

override def onCompleted(): Unit = println("onCompleted")
})
println("---b.subscribe(Subscriber)---")
b.subscribe(new Subscriber[Int] {
override def onNext(v: Int): Unit = println(v)

override def onError(e: Throwable): Unit = e.printStackTrace()

override def onCompleted(): Unit = println("onCompleted")
})
}
}
Loading