Skip to content

Commit 30ec23a

Browse files
authored
Merge pull request #220 from dhoepelman/synconsubscribe
Implement SyncOnSubscribe and AsyncOnSubscribe
2 parents 9dc9ba3 + b515226 commit 30ec23a

File tree

9 files changed

+420
-70
lines changed

9 files changed

+420
-70
lines changed

examples/src/test/scala/examples/RxScalaDemo.scala

Lines changed: 79 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,14 @@ import scala.concurrent.duration.DurationLong
3131
import scala.concurrent.ExecutionContext
3232
import scala.language.postfixOps
3333
import scala.language.implicitConversions
34-
3534
import org.junit.Assert.assertEquals
3635
import org.junit.Assert.assertTrue
3736
import org.junit.Assert.assertFalse
3837
import org.junit.Ignore
3938
import org.junit.Test
4039
import org.scalatest.junit.JUnitSuite
41-
4240
import rx.lang.scala._
41+
import rx.lang.scala.observables.{AsyncOnSubscribe, SyncOnSubscribe}
4342
import rx.lang.scala.schedulers._
4443

4544
/**
@@ -742,44 +741,68 @@ class RxScalaDemo extends JUnitSuite {
742741
}
743742

744743
/**
745-
* This is the good way of doing it: If the consumer unsubscribes, no more elements are
746-
* calculated.
744+
* Using AsyncOnSubscribe of SyncOnSubscribe ensures that your Observable
745+
* does not calculate elements after the consumer unsubscribes and correctly does not overload
746+
* the subscriber with data (responds to backpressure).
747+
*
748+
* Here we create a SyncOnSubscribe without state, this means that it performs the same action every time downstream can handle more data.
749+
*/
750+
@Test def createExample(): Unit = {
751+
val o = Observable.create(SyncOnSubscribe.stateless(
752+
next = () => Notification.OnNext(math.random),
753+
onUnsubscribe = () => println("I have stopped generating random numbers for this subscriber")
754+
))
755+
o.take(10).foreach(r => println(s"Next random number: $r"))
756+
}
757+
758+
/**
759+
* You can also add state to (A)SyncOnSubscribe, you generate a state on each subscription and can alter that state in each next call
760+
* Here we use it to count to a specific number
747761
*/
748-
@Test def createExampleGood() {
749-
val o = Observable[String](subscriber => {
750-
var i = 0
751-
while (i < 2 && !subscriber.isUnsubscribed) {
752-
subscriber.onNext(calculateElement(i))
753-
i += 1
754-
}
755-
if (!subscriber.isUnsubscribed) subscriber.onCompleted()
756-
})
762+
@Test def createExampleWithState() {
763+
// Starts with state `0`
764+
val o = Observable.create(SyncOnSubscribe(() => 0)(i => {
765+
if(i < 2)
766+
// Check if the state has reached 2 yet, if not, we emit the current state and add 1 to the state
767+
(Notification.OnNext(i), i+1)
768+
else
769+
// Otherwise we signal completion
770+
(Notification.OnCompleted, i)
771+
}))
757772
o.take(1).subscribe(println(_))
758773
}
759774

760-
@Test def createExampleGood2() {
775+
/**
776+
* This example shows how to read a (potentially blocking) data source step-by-step (line by line) using SyncOnSubscribe.
777+
*/
778+
@Test def createExampleFromInputStream() {
761779
import scala.io.{Codec, Source}
762780

763781
val rxscalaURL = "http://reactivex.io/rxscala/"
764-
val rxscala = Observable[String](subscriber => {
765-
try {
782+
// We use the `singleState` helper here, since we only want to generate one state per subscriber
783+
// and do not need to modify it afterwards
784+
val rxscala = Observable.create(SyncOnSubscribe.singleState(
785+
// This is our `generator`, which generates a state
786+
generator = () => {
766787
val input = new java.net.URL(rxscalaURL).openStream()
767-
subscriber.add(Subscription {
768-
input.close()
769-
})
770-
val iter = Source.fromInputStream(input)(Codec.UTF8).getLines()
771-
while(iter.hasNext && !subscriber.isUnsubscribed) {
772-
val line = iter.next()
773-
subscriber.onNext(line)
774-
}
775-
if (!subscriber.isUnsubscribed) {
776-
subscriber.onCompleted()
777-
}
778-
}
779-
catch {
780-
case e: Throwable => if (!subscriber.isUnsubscribed) subscriber.onError(e)
788+
(input, Source.fromInputStream(input)(Codec.UTF8).getLines())
789+
})(
790+
// This is our `next` function, which gets called whenever the subscriber can handle more data
791+
next = {
792+
case (_, lines) => {
793+
if(lines.hasNext)
794+
// Here we provide the next line
795+
Notification.OnNext(lines.next())
796+
else
797+
// Here we signal that the stream has completed
798+
Notification.OnCompleted
799+
}
800+
},
801+
// This is our `onUnsubscribe` function, which gets called after the subscriber unsubscribes, usually to perform cleanup
802+
onUnsubscribe = {
803+
case (input, _) => scala.util.Try { input.close() }
781804
}
782-
}).subscribeOn(IOScheduler())
805+
)).subscribeOn(IOScheduler())
783806

784807
val count = rxscala.flatMap(_.split("\\W+").toSeq.toObservable)
785808
.map(_.toLowerCase)
@@ -788,45 +811,35 @@ class RxScalaDemo extends JUnitSuite {
788811
println(s"RxScala appears ${count.toBlocking.single} times in ${rxscalaURL}")
789812
}
790813

791-
@Test def createExampleWithBackpressure() {
792-
val o = Observable {
793-
subscriber: Subscriber[String] => {
794-
var emitted = 0
795-
subscriber.setProducer(n => {
796-
val intN = if (n >= 10) 10 else n.toInt
797-
var i = 0
798-
while(i < intN && emitted < 10 && !subscriber.isUnsubscribed) {
799-
emitted += 1
800-
subscriber.onNext(s"item ${emitted}")
801-
i += 1
802-
}
803-
if (emitted == 10 && !subscriber.isUnsubscribed) {
804-
subscriber.onCompleted()
805-
}
806-
})
807-
}
808-
}.subscribeOn(IOScheduler()) // Use `subscribeOn` to make sure `Producer` will run in the same Scheduler
809-
o.observeOn(ComputationScheduler()).subscribe(new Subscriber[String] {
810-
override def onStart() {
811-
println("Request a new one at the beginning")
812-
request(1)
813-
}
814-
815-
override def onNext(v: String) {
816-
println("Received " + v)
817-
println("Request a new one after receiving " + v)
818-
request(1)
819-
}
814+
/** This example show how to generate an Observable using AsyncOnSubscribe, which can be more efficient than SyncOnSubscribe.
815+
* Using AsyncOnSubscribe has the same advantages as SyncOnSubscribe, and furthermore it allows us to generate more than one
816+
* result at a time (which can be more efficient) and allows us to generate the results asynchronously.
817+
*/
818+
@Test def createExampleAsyncOnUnsubscribe(): Unit = {
819+
// We are going to count to this number
820+
val countTo = 200L
820821

821-
override def onError(e: Throwable) {
822-
e.printStackTrace()
822+
val o = Observable.create(AsyncOnSubscribe(() => 0L)(
823+
(count, demand) => {
824+
// Stop counting if we're past the number we were going to count to
825+
if(count > countTo)
826+
(Notification.OnCompleted, count)
827+
else {
828+
// Generate an observable that contains [count,count+demand) and thus contains exactly `demand` items
829+
val to = math.min(count + demand, countTo+1)
830+
val range = count until to
831+
val resultObservable = Observable.from(range)
832+
println(s"Currently at $count, received a demand of $demand. Next range [$count,$to)")
833+
(Notification.OnNext(resultObservable), to)
834+
}
823835
}
824-
825-
override def onCompleted() {
826-
println("Done")
836+
))
837+
o.subscribe(new Subscriber[Long] {
838+
override def onStart(): Unit = request(10)
839+
override def onNext(i: Long): Unit = {
840+
request(scala.util.Random.nextInt(10)+1)
827841
}
828842
})
829-
Thread.sleep(10000)
830843
}
831844

832845
def output(s: String): Unit = println(s)

src/main/scala/rx/lang/scala/Notification.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ sealed trait Notification[+T] {
6363
}
6464

6565
def apply(observer: Observer[T]): Unit = accept(observer)
66+
67+
def map[U](f: T => U): Notification[U] = {
68+
this match {
69+
case Notification.OnNext(value) => Notification.OnNext(f(value))
70+
case Notification.OnError(error) => Notification.OnError(error)
71+
case Notification.OnCompleted => Notification.OnCompleted
72+
}
73+
}
6674
}
6775

6876
/**

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ package rx.lang.scala
1919
import rx.annotations.{Beta, Experimental}
2020
import rx.exceptions.OnErrorNotImplementedException
2121
import rx.functions.FuncN
22-
import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
22+
import rx.lang.scala.observables.{AsyncOnSubscribe, ConnectableObservable, ErrorDelayingObservable, SyncOnSubscribe}
23+
2324
import scala.concurrent.duration
2425
import java.util
2526

@@ -4919,6 +4920,52 @@ object Observable {
49194920
}
49204921
*/
49214922

4923+
/**
4924+
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
4925+
* subscribed to it will initiate the given [[observables.SyncOnSubscribe]]'s life cycle for
4926+
* generating events.
4927+
*
4928+
* Note: the `SyncOnSubscribe` provides a generic way to fulfill data by iterating
4929+
* over a (potentially stateful) function (e.g. reading data off of a channel, a parser). If your
4930+
* data comes directly from an asynchronous/potentially concurrent source then consider using [[observables.AsyncOnSubscribe]].
4931+
*
4932+
* $supportBackpressure
4933+
*
4934+
* $noDefaultScheduler
4935+
*
4936+
* @tparam T the type of the items that this Observable emits
4937+
* @tparam S the state type
4938+
* @param syncOnSubscribe
4939+
* an implementation of `SyncOnSubscribe`. There are many creation methods on the object for convenience.
4940+
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `SyncOnSubscribe` to generate events
4941+
* @see [[observables.SyncOnSubscribe.stateful]]
4942+
* @see [[observables.SyncOnSubscribe.singleState]]
4943+
* @see [[observables.SyncOnSubscribe.stateless]]
4944+
*/
4945+
@Experimental
4946+
def create[S,T](syncOnSubscribe: SyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(syncOnSubscribe))
4947+
4948+
/**
4949+
* Returns an Observable that respects the back-pressure semantics. When the returned Observable is
4950+
* subscribed to it will initiate the given [[observables.AsyncOnSubscribe]]'s life cycle for
4951+
* generating events.
4952+
*
4953+
* $supportBackpressure
4954+
*
4955+
* $noDefaultScheduler
4956+
*
4957+
* @tparam T the type of the items that this Observable emits
4958+
* @tparam S the state type
4959+
* @param asyncOnSubscribe
4960+
* an implementation of `AsyncOnSubscribe`. There are many creation methods on the object for convenience.
4961+
* @return an Observable that, when a [[Subscriber]] subscribes to it, will use the specified `AsyncOnSubscribe` to generate events
4962+
* @see [[observables.AsyncOnSubscribe.stateful]]
4963+
* @see [[observables.AsyncOnSubscribe.singleState]]
4964+
* @see [[observables.AsyncOnSubscribe.stateless]]
4965+
*/
4966+
@Experimental
4967+
def create[S,T](asyncOnSubscribe: AsyncOnSubscribe[S,T]): Observable[T] = toScalaObservable[T](rx.Observable.create(asyncOnSubscribe))
4968+
49224969
/**
49234970
* Returns an Observable that will execute the specified function when someone subscribes to it.
49244971
*
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package rx.lang.scala.observables
2+
3+
import rx.annotations.Experimental
4+
import rx.lang.scala.{Notification, Observable}
5+
6+
/**
7+
* An utility class to create `Observable`s that start acting when subscribed to and responds
8+
* correctly to back pressure requests from subscribers.
9+
*
10+
* Semantics:
11+
* * `generator` is called to provide an initial state on each new subscription
12+
* * `next` is called with the last state and a `requested` amount of items to provide a new state
13+
* and an `Observable` that (potentially asynchronously) emits up to `requested` items.
14+
* * `onUnsubscribe` is called with the state provided by the last `next` call when the observer unsubscribes
15+
*/
16+
object AsyncOnSubscribe {
17+
18+
/**
19+
* Alias for [[AsyncOnSubscribe.stateful]]
20+
* @see [[AsyncOnSubscribe.stateful]]
21+
*/
22+
@Experimental
23+
def apply[S,T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
24+
stateful[S, T](generator)(next, onUnsubscribe)
25+
26+
/**
27+
* Generates a stateful [[AsyncOnSubscribe]]
28+
*
29+
* @tparam T the type of the generated values
30+
* @tparam S the type of the associated state with each Subscriber
31+
* @param generator generates the initial state value
32+
* @param next produces observables which contain data for the stream
33+
* @param onUnsubscribe clean up behavior
34+
*/
35+
@Experimental
36+
def stateful[S, T](generator: () => S)(next: (S, Long) => (Notification[Observable[T]], S), onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] = {
37+
// The anonymous class shadows these names
38+
val nextF = next
39+
val onUnsubscribeF = onUnsubscribe
40+
41+
new rx.observables.AsyncOnSubscribe[S,T] {
42+
import rx.lang.scala.JavaConversions._
43+
override def generateState(): S = generator()
44+
override def next(state: S, requested: Long, observer: rx.Observer[rx.Observable[_ <: T]]): S =
45+
nextF(state, requested) match {
46+
case (notification, nextState) =>
47+
toJavaNotification(notification.map(toJavaObservable)).accept(observer)
48+
nextState
49+
}
50+
override def onUnsubscribe(state: S): Unit = onUnsubscribeF(state)
51+
}
52+
}
53+
54+
/**
55+
* Generates a [[AsyncOnSubscribe]] which does not generate a new state in `next`
56+
*
57+
* @tparam T the type of the generated values
58+
* @tparam S the type of the associated state with each Subscriber
59+
* @param generator generates the state value
60+
* @param next produces observables which contain data for the stream
61+
* @param onUnsubscribe clean up behavior
62+
*/
63+
@Experimental
64+
def singleState[S, T](generator: () => S)(next: (S, Long) => Notification[Observable[T]], onUnsubscribe: S => Unit = (_:S) => ()): AsyncOnSubscribe[S,T] =
65+
stateful[S, T](generator)((s,r) => (next(s,r), s), onUnsubscribe)
66+
67+
/**
68+
* Generates a stateless [[AsyncOnSubscribe]], useful when the state is closed over in `next` or the `SyncOnSubscribe` inherently does not have a state
69+
*
70+
* @tparam T the type of the generated values
71+
* @param next produces observables which contain data for the stream
72+
* @param onUnsubscribe clean up behavior
73+
*/
74+
@Experimental
75+
def stateless[T](next: Long => Notification[Observable[T]], onUnsubscribe: () => Unit = () => ()) =
76+
stateful[Unit, T](() => ())((_,r) => (next(r), ()), _ => onUnsubscribe())
77+
78+
}

0 commit comments

Comments
 (0)