Skip to content

Commit b515226

Browse files
author
David Hoepelman
committed
Make examples doubleplusgood
1 parent 1508f3e commit b515226

File tree

2 files changed

+82
-69
lines changed

2 files changed

+82
-69
lines changed

examples/src/test/scala/examples/RxScalaDemo.scala

Lines changed: 79 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@ import scala.concurrent.duration.DurationLong
3131
import scala.concurrent.ExecutionContext
3232
import scala.language.postfixOps
3333
import scala.language.implicitConversions
34-
3534
import org.junit.Assert.assertEquals
3635
import org.junit.Assert.assertTrue
3736
import org.junit.Assert.assertFalse
3837
import org.junit.Ignore
3938
import org.junit.Test
4039
import org.scalatest.junit.JUnitSuite
41-
4240
import rx.lang.scala._
41+
import rx.lang.scala.observables.{AsyncOnSubscribe, SyncOnSubscribe}
4342
import rx.lang.scala.schedulers._
4443

4544
/**
@@ -742,44 +741,68 @@ class RxScalaDemo extends JUnitSuite {
742741
}
743742

744743
/**
745-
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
746-
* calculated.
744+
* Using AsyncOnSubscribe of SyncOnSubscribe ensures that your Observable
745+
* does not calculate elements after the consumer unsubscribes and correctly does not overload
746+
* the subscriber with data (responds to backpressure).
747+
*
748+
* Here we create a SyncOnSubscribe without state, this means that it performs the same action every time downstream can handle more data.
749+
*/
750+
@Test def createExample(): Unit = {
751+
val o = Observable.create(SyncOnSubscribe.stateless(
752+
next = () => Notification.OnNext(math.random),
753+
onUnsubscribe = () => println("I have stopped generating random numbers for this subscriber")
754+
))
755+
o.take(10).foreach(r => println(s"Next random number: $r"))
756+
}
757+
758+
/**
759+
* You can also add state to (A)SyncOnSubscribe, you generate a state on each subscription and can alter that state in each next call
760+
* Here we use it to count to a specific number
747761
*/
748-
@Test def createExampleGood() {
749-
val o = Observable[String](subscriber => {
750-
var i = 0
751-
while (i < 2 && !subscriber.isUnsubscribed) {
752-
subscriber.onNext(calculateElement(i))
753-
i += 1
754-
}
755-
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
756-
})
762+
@Test def createExampleWithState() {
763+
// Starts with state `0`
764+
val o = Observable.create(SyncOnSubscribe(() => 0)(i => {
765+
if(i < 2)
766+
// Check if the state has reached 2 yet, if not, we emit the current state and add 1 to the state
767+
(Notification.OnNext(i), i+1)
768+
else
769+
// Otherwise we signal completion
770+
(Notification.OnCompleted, i)
771+
}))
757772
o.take(1).subscribe(println(_))
758773
}
759774

760-
@Test def createExampleGood2() {
775+
/**
776+
* This example shows how to read a (potentially blocking) data source step-by-step (line by line) using SyncOnSubscribe.
777+
*/
778+
@Test def createExampleFromInputStream() {
761779
import scala.io.{Codec, Source}
762780

763781
val rxscalaURL = "http://reactivex.io/rxscala/"
764-
val rxscala = Observable[String](subscriber => {
765-
try {
782+
// We use the `singleState` helper here, since we only want to generate one state per subscriber
783+
// and do not need to modify it afterwards
784+
val rxscala = Observable.create(SyncOnSubscribe.singleState(
785+
// This is our `generator`, which generates a state
786+
generator = () => {
766787
val input = new java.net.URL(rxscalaURL).openStream()
767-
subscriber.add(Subscription {
768-
input.close()
769-
})
770-
val iter = Source.fromInputStream(input)(Codec.UTF8).getLines()
771-
while(iter.hasNext && !subscriber.isUnsubscribed) {
772-
val line = iter.next()
773-
subscriber.onNext(line)
774-
}
775-
if (!subscriber.isUnsubscribed) {
776-
subscriber.onCompleted()
777-
}
778-
}
779-
catch {
780-
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
788+
(input, Source.fromInputStream(input)(Codec.UTF8).getLines())
789+
})(
790+
// This is our `next` function, which gets called whenever the subscriber can handle more data
791+
next = {
792+
case (_, lines) => {
793+
if(lines.hasNext)
794+
// Here we provide the next line
795+
Notification.OnNext(lines.next())
796+
else
797+
// Here we signal that the stream has completed
798+
Notification.OnCompleted
799+
}
800+
},
801+
// This is our `onUnsubscribe` function, which gets called after the subscriber unsubscribes, usually to perform cleanup
802+
onUnsubscribe = {
803+
case (input, _) => scala.util.Try { input.close() }
781804
}
782-
}).subscribeOn(IOScheduler())
805+
)).subscribeOn(IOScheduler())
783806

784807
val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
785808
.map(_.toLowerCase)
@@ -788,45 +811,35 @@ class RxScalaDemo extends JUnitSuite {
788811
println(s"RxScala appears ${count.toBlocking.single} times in ${rxscalaURL}")
789812
}
790813

791-
@Test def createExampleWithBackpressure() {
792-
val o = Observable {
793-
subscriber: Subscriber[String] => {
794-
var emitted = 0
795-
subscriber.setProducer(n => {
796-
val intN = if (n >= 10) 10 else n.toInt
797-
var i = 0
798-
while(i < intN && emitted < 10 && !subscriber.isUnsubscribed) {
799-
emitted += 1
800-
subscriber.onNext(s"item ${emitted}")
801-
i += 1
802-
}
803-
if (emitted == 10 && !subscriber.isUnsubscribed) {
804-
subscriber.onCompleted()
805-
}
806-
})
807-
}
808-
}.subscribeOn(IOScheduler()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
809-
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
810-
override def onStart() {
811-
println("Request a new one at the beginning")
812-
request(1)
813-
}
814-
815-
override def onNext(v: String) {
816-
println("Received " + v)
817-
println("Request a new one after receiving " + v)
818-
request(1)
819-
}
814+
/** This example show how to generate an Observable using AsyncOnSubscribe, which can be more efficient than SyncOnSubscribe.
815+
* Using AsyncOnSubscribe has the same advantages as SyncOnSubscribe, and furthermore it allows us to generate more than one
816+
* result at a time (which can be more efficient) and allows us to generate the results asynchronously.
817+
*/
818+
@Test def createExampleAsyncOnUnsubscribe(): Unit = {
819+
// We are going to count to this number
820+
val countTo = 200L
820821

821-
override def onError(e: Throwable) {
822-
e.printStackTrace()
822+
val o = Observable.create(AsyncOnSubscribe(() => 0L)(
823+
(count, demand) => {
824+
// Stop counting if we're past the number we were going to count to
825+
if(count > countTo)
826+
(Notification.OnCompleted, count)
827+
else {
828+
// Generate an observable that contains [count,count+demand) and thus contains exactly `demand` items
829+
val to = math.min(count + demand, countTo+1)
830+
val range = count until to
831+
val resultObservable = Observable.from(range)
832+
println(s"Currently at $count, received a demand of $demand. Next range [$count,$to)")
833+
(Notification.OnNext(resultObservable), to)
834+
}
823835
}
824-
825-
override def onCompleted() {
826-
println("Done")
836+
))
837+
o.subscribe(new Subscriber[Long] {
838+
override def onStart(): Unit = request(10)
839+
override def onNext(i: Long): Unit = {
840+
request(scala.util.Random.nextInt(10)+1)
827841
}
828842
})
829-
Thread.sleep(10000)
830843
}
831844

832845
def output(s: String): Unit = println(s)

src/test/scala/rx/lang/scala/observables/AsyncOnSubscribeTests.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ class AsyncOnSubscribeTests extends JUnitSuite {
1515
if(count > last)
1616
(Notification.OnCompleted, count)
1717
else {
18-
val max = math.max(count + demand, last)
19-
val next = Observable.from(count to max)
20-
(Notification.OnNext(next), max+1)
18+
val to = math.min(count+demand, last+1)
19+
val next = Observable.from(count until to)
20+
(Notification.OnNext(next), to)
2121
}
2222
))
2323
assertEquals((0L to last).toList, o.toBlocking.toList)

0 commit comments

Comments
 (0)