From a385e1a474a05af5173d3a6c5f380b0f87b50dff Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Sat, 20 Feb 2016 15:13:12 -0800 Subject: [PATCH] Upgrade to RxJava 1.1.1 and add missing methods --- build.sbt | 2 +- .../src/test/scala/examples/RxScalaDemo.scala | 24 +++++- src/main/scala/rx/lang/scala/Observable.scala | 81 ++++++++++++++++++- .../observables/ConnectableObservable.scala | 44 +++++++++- .../ObservableCompletenessKit.scala | 7 +- 5 files changed, 150 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index 5fbe0589..4ea1df5e 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6") parallelExecution in Test := false libraryDependencies ++= Seq( - "io.reactivex" % "rxjava" % "1.1.0", + "io.reactivex" % "rxjava" % "1.1.1", "org.mockito" % "mockito-core" % "1.9.5" % "test", "junit" % "junit" % "4.11" % "test", "org.scalatest" %% "scalatest" % "2.2.2" % "test") diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala index 50eab72b..e1f677a2 100644 --- a/examples/src/test/scala/examples/RxScalaDemo.scala +++ b/examples/src/test/scala/examples/RxScalaDemo.scala @@ -877,8 +877,8 @@ class RxScalaDemo extends JUnitSuite { // unsubscribed } - @Test def finallyDoExample(): Unit = { - val o = List("red", "green", "blue").toObservable.finallyDo { println("finally") } + @Test def doAfterTerminateExample(): Unit = { + val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") } o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) // red // green @@ -1758,4 +1758,24 @@ class RxScalaDemo extends JUnitSuite { override def onCompleted(): Unit = println("onCompleted") }) } + + def autoConnectExample(): Unit = { + val o = Observable.just(1, 2, 3).doOnSubscribe { + println("Start to emit items") + }.publish.autoConnect + println("1st Observer is subscribing") + o.subscribe(println(_)) + } + + def autoConnectExample2(): Unit = { + val o = Observable.just(1, 2, 3).doOnSubscribe { + println("Start to emit items") + }.publish.autoConnect(3) + println("1st Observer is subscribing") + o.subscribe(i => println(s"s1: $i")) + println("2nd Observer is subscribing") + o.subscribe(i => println(s"s2: $i")) + println("3rd Observer is subscribing") + o.subscribe(i => println(s"s3: $i")) + } } diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala index 2315957d..f4b8a2f5 100644 --- a/src/main/scala/rx/lang/scala/Observable.scala +++ b/src/main/scala/rx/lang/scala/Observable.scala @@ -1067,8 +1067,24 @@ trait Observable[+T] * an function to be invoked when the source Observable finishes * @return an Observable that emits the same items as the source Observable, then invokes the function */ + @deprecated("Use [[Observable.doAfterTerminate]] instead", "0.26.1") def finallyDo(action: => Unit): Observable[T] = { - toScalaObservable[T](asJavaObservable.finallyDo(() => action)) + toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action)) + } + + /** + * Registers an function to be called when this [[Observable]] invokes either [[Observer.onCompleted onCompleted]] or [[Observer.onError onError]]. + * + * + * + * $noDefaultScheduler + * + * @param action an function to be invoked when the source [[Observable]] finishes + * @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action` + * @see ReactiveX operators documentation: Do + */ + def doAfterTerminate(action: => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action)) } /** @@ -1326,6 +1342,27 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.observeOn(scheduler)) } + /** + * Return an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]], + * asynchronously with a bounded buffer and optionally delays [[Observer.onError onError]] notifications. + * + * + * + * ===Scheduler:=== + * you specify which [[Scheduler]] this operator will use + * + * @param scheduler the [[Scheduler]] to notify [[Observer]]s on + * @param delayError indicates if the [[Observer.onError onError]] notification may not cut ahead of onNext notification on the + * other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order + * as was received from upstream + * @return the source [[Observable]] that its [[Observer]]s are notified on the specified [[Scheduler]] + * @see ReactiveX operators documentation: ObserveOn + * @see RxJava Threading Examples + */ + def observeOn(scheduler: Scheduler, delayError: Boolean): Observable[T] = { + toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError)) + } + /** * Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by * transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items @@ -1809,8 +1846,46 @@ trait Observable[+T] * @see RxJava wiki: cache * @since 0.20 */ + @deprecated("Use [[Observable.cacheWithInitialCapacity]] instead", "0.26.1") def cache(capacity: Int): Observable[T] = { - toScalaObservable[T](asJavaObservable.cache(capacity)) + toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity)) + } + + /** + * Caches emissions from the source [[Observable]] and replays them in order to any subsequent [[Subscriber]]s. + * This method has similar behavior to [[Observable.replay:* replay]] except that this auto-subscribes to the source + * [[Observable]] rather than returning a [[rx.lang.scala.observables.ConnectableObservable ConnectableObservable]] + * for which you must call [[rx.lang.scala.observables.ConnectableObservable.connect connect]] to activate the subscription. + * + * + * + * This is useful when you want an [[Observable]] to cache responses and you can't control the + * subscribe/unsubscribe behavior of all the [[Subscriber]]s. + * + * When you call this method, it does not yet subscribe to the source Observable and so does not yet + * begin caching items. This only happens when the first [[Subscriber]] calls the resulting [[Observable]]'s `subscribe` method. + * + * **Note:** You sacrifice the ability to unsubscribe from the origin when you use the this method. + * So be careful not to use this method on [[Observable]]s that emit an infinite or very large number + * of items that will use up memory. + * + * ===Backpressure Support:=== + * This operator does not support upstream backpressure as it is purposefully requesting and caching + * everything emitted. + * + * $noDefaultScheduler + * + * **Note:** The `capacity` hint is not an upper bound on cache size. For that, consider + * [[Observable.replay(bufferSize:Int):* replay(Int)]] in combination with + * [[rx.lang.scala.observables.ConnectableObservable.autoConnect:* ConnectableObservable.autoConnect]] or similar. + * + * @param capacity hint for number of items to cache (for optimizing underlying data structure) + * @return an [[Observable]] that, when first subscribed to, caches all of its items and notifications for the + * benefit of subsequent [[Subscriber]]s + * @see ReactiveX operators documentation: Replay + */ + def cacheWithInitialCapacity(capacity: Int): Observable[T] = { + toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity)) } /** @@ -3809,7 +3884,7 @@ trait Observable[+T] *

* *

- * This differs from `finallyDo` in that this happens **before** `onCompleted/onError` are emitted. + * This differs from [[Observable.doAfterTerminate doAfterTerminate]] in that this happens **before** onCompleted/onError` are emitted. * * @param onTerminate the action to invoke when the source Observable calls `onCompleted` or `onError` * @return the source Observable with the side-effecting behavior applied diff --git a/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala index bba27ecb..e9242c6d 100644 --- a/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala +++ b/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala @@ -16,9 +16,24 @@ package rx.lang.scala.observables -import rx.lang.scala.{Observable, Subscription} +import rx.annotations.Beta +import rx.lang.scala.{Observable, Subscription, Subscriber} import rx.lang.scala.JavaConversions._ +/** + * A [[ConnectableObservable]] resembles an ordinary [[Observable]], except that it does not begin + * emitting items when it is subscribed to, but only when its [[ConnectableObservable.connect connect]] method is called. + * In this way you can wait for all intended [[Subscriber]]s to subscribe to the [[Observable]] + * before the [[Observable]] begins emitting items. + * + * + * + * @define beta + * BETA + * + * @see RxJava Wiki: Connectable Observable Operators + * @tparam T the type of items emitted + */ class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T]) extends Observable[T] { @@ -35,4 +50,31 @@ class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observab * @return a [[rx.lang.scala.Observable]] */ def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount()) + + /** + * $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]] + * when the first [[Subscriber]] subscribes. + * + * @return an [[Observable]] that automatically connects to this [[ConnectableObservable]] + * when the first [[Subscriber]] subscribes + */ + @Beta + def autoConnect: Observable[T] = { + toScalaObservable(asJavaObservable.autoConnect()) + } + + /** + * $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]] + * when the specified number of [[Subscriber]]s subscribe to it. + * + * @param numberOfSubscribers the number of [[Subscriber]]s to await before calling connect + * on the [[ConnectableObservable]]. A non-positive value indicates + * an immediate connection. + * @return an [[Observable]] that automatically connects to this [[ConnectableObservable]] + * when the specified number of [[Subscriber]]s subscribe to it + */ + @Beta + def autoConnect(numberOfSubscribers: Int): Observable[T] = { + toScalaObservable(asJavaObservable.autoConnect(numberOfSubscribers)) + } } diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala index f996f1c1..e3756f2d 100644 --- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala +++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala @@ -70,6 +70,7 @@ class ObservableCompletenessKit extends CompletenessKit { "delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])", "delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])", "delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])", + "delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", "doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)", "doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]", @@ -78,7 +79,7 @@ class ObservableCompletenessKit extends CompletenessKit { "doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)", "elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)", "extend(Func1[_ >: OnSubscribe[T], _ <: R])" -> "[use Scala implicit feature to extend `Observable`]", - "finallyDo(Action0)" -> "finallyDo(=> Unit)", + "doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)", "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", @@ -166,6 +167,7 @@ class ObservableCompletenessKit extends CompletenessKit { "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultiMap(T => K, T => V, => M)", "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> commentForToMultimapWithCollectionFactory, "toSingle()" -> "[TODO]", + "toCompletable()" -> "[TODO]", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", @@ -188,6 +190,7 @@ class ObservableCompletenessKit extends CompletenessKit { "create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)", "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)", + "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "[use `combineLatest(iter.toSeq)(Seq[T] => R)`]", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])", "concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])", @@ -211,6 +214,8 @@ class ObservableCompletenessKit extends CompletenessKit { "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])", + "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.flattenDelayError`]", + "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.flattenDelayError(Int)`]", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)", "range(Int, Int)" -> commentForRange,