diff --git a/build.sbt b/build.sbt
index 5fbe0589..4ea1df5e 100644
--- a/build.sbt
+++ b/build.sbt
@@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6")
parallelExecution in Test := false
libraryDependencies ++= Seq(
- "io.reactivex" % "rxjava" % "1.1.0",
+ "io.reactivex" % "rxjava" % "1.1.1",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala
index 50eab72b..e1f677a2 100644
--- a/examples/src/test/scala/examples/RxScalaDemo.scala
+++ b/examples/src/test/scala/examples/RxScalaDemo.scala
@@ -877,8 +877,8 @@ class RxScalaDemo extends JUnitSuite {
// unsubscribed
}
- @Test def finallyDoExample(): Unit = {
- val o = List("red", "green", "blue").toObservable.finallyDo { println("finally") }
+ @Test def doAfterTerminateExample(): Unit = {
+ val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") }
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
@@ -1758,4 +1758,24 @@ class RxScalaDemo extends JUnitSuite {
override def onCompleted(): Unit = println("onCompleted")
})
}
+
+ def autoConnectExample(): Unit = {
+ val o = Observable.just(1, 2, 3).doOnSubscribe {
+ println("Start to emit items")
+ }.publish.autoConnect
+ println("1st Observer is subscribing")
+ o.subscribe(println(_))
+ }
+
+ def autoConnectExample2(): Unit = {
+ val o = Observable.just(1, 2, 3).doOnSubscribe {
+ println("Start to emit items")
+ }.publish.autoConnect(3)
+ println("1st Observer is subscribing")
+ o.subscribe(i => println(s"s1: $i"))
+ println("2nd Observer is subscribing")
+ o.subscribe(i => println(s"s2: $i"))
+ println("3rd Observer is subscribing")
+ o.subscribe(i => println(s"s3: $i"))
+ }
}
diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala
index 2315957d..f4b8a2f5 100644
--- a/src/main/scala/rx/lang/scala/Observable.scala
+++ b/src/main/scala/rx/lang/scala/Observable.scala
@@ -1067,8 +1067,24 @@ trait Observable[+T]
* an function to be invoked when the source Observable finishes
* @return an Observable that emits the same items as the source Observable, then invokes the function
*/
+ @deprecated("Use [[Observable.doAfterTerminate]] instead", "0.26.1")
def finallyDo(action: => Unit): Observable[T] = {
- toScalaObservable[T](asJavaObservable.finallyDo(() => action))
+ toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
+ }
+
+ /**
+ * Registers an function to be called when this [[Observable]] invokes either [[Observer.onCompleted onCompleted]] or [[Observer.onError onError]].
+ *
+ *
+ *
+ * $noDefaultScheduler
+ *
+ * @param action an function to be invoked when the source [[Observable]] finishes
+ * @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action`
+ * @see ReactiveX operators documentation: Do
+ */
+ def doAfterTerminate(action: => Unit): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
}
/**
@@ -1326,6 +1342,27 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.observeOn(scheduler))
}
+ /**
+ * Return an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]],
+ * asynchronously with a bounded buffer and optionally delays [[Observer.onError onError]] notifications.
+ *
+ *
+ *
+ * ===Scheduler:===
+ * you specify which [[Scheduler]] this operator will use
+ *
+ * @param scheduler the [[Scheduler]] to notify [[Observer]]s on
+ * @param delayError indicates if the [[Observer.onError onError]] notification may not cut ahead of onNext notification on the
+ * other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order
+ * as was received from upstream
+ * @return the source [[Observable]] that its [[Observer]]s are notified on the specified [[Scheduler]]
+ * @see ReactiveX operators documentation: ObserveOn
+ * @see RxJava Threading Examples
+ */
+ def observeOn(scheduler: Scheduler, delayError: Boolean): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError))
+ }
+
/**
* Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by
* transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items
@@ -1809,8 +1846,46 @@ trait Observable[+T]
* @see RxJava wiki: cache
* @since 0.20
*/
+ @deprecated("Use [[Observable.cacheWithInitialCapacity]] instead", "0.26.1")
def cache(capacity: Int): Observable[T] = {
- toScalaObservable[T](asJavaObservable.cache(capacity))
+ toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
+ }
+
+ /**
+ * Caches emissions from the source [[Observable]] and replays them in order to any subsequent [[Subscriber]]s.
+ * This method has similar behavior to [[Observable.replay:* replay]] except that this auto-subscribes to the source
+ * [[Observable]] rather than returning a [[rx.lang.scala.observables.ConnectableObservable ConnectableObservable]]
+ * for which you must call [[rx.lang.scala.observables.ConnectableObservable.connect connect]] to activate the subscription.
+ *
+ *
+ *
+ * This is useful when you want an [[Observable]] to cache responses and you can't control the
+ * subscribe/unsubscribe behavior of all the [[Subscriber]]s.
+ *
+ * When you call this method, it does not yet subscribe to the source Observable and so does not yet
+ * begin caching items. This only happens when the first [[Subscriber]] calls the resulting [[Observable]]'s `subscribe` method.
+ *
+ * **Note:** You sacrifice the ability to unsubscribe from the origin when you use the this method.
+ * So be careful not to use this method on [[Observable]]s that emit an infinite or very large number
+ * of items that will use up memory.
+ *
+ * ===Backpressure Support:===
+ * This operator does not support upstream backpressure as it is purposefully requesting and caching
+ * everything emitted.
+ *
+ * $noDefaultScheduler
+ *
+ * **Note:** The `capacity` hint is not an upper bound on cache size. For that, consider
+ * [[Observable.replay(bufferSize:Int):* replay(Int)]] in combination with
+ * [[rx.lang.scala.observables.ConnectableObservable.autoConnect:* ConnectableObservable.autoConnect]] or similar.
+ *
+ * @param capacity hint for number of items to cache (for optimizing underlying data structure)
+ * @return an [[Observable]] that, when first subscribed to, caches all of its items and notifications for the
+ * benefit of subsequent [[Subscriber]]s
+ * @see ReactiveX operators documentation: Replay
+ */
+ def cacheWithInitialCapacity(capacity: Int): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
}
/**
@@ -3809,7 +3884,7 @@ trait Observable[+T]
*
*
*
- * This differs from `finallyDo` in that this happens **before** `onCompleted/onError` are emitted.
+ * This differs from [[Observable.doAfterTerminate doAfterTerminate]] in that this happens **before** onCompleted/onError` are emitted.
*
* @param onTerminate the action to invoke when the source Observable calls `onCompleted` or `onError`
* @return the source Observable with the side-effecting behavior applied
diff --git a/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala b/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala
index bba27ecb..e9242c6d 100644
--- a/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala
+++ b/src/main/scala/rx/lang/scala/observables/ConnectableObservable.scala
@@ -16,9 +16,24 @@
package rx.lang.scala.observables
-import rx.lang.scala.{Observable, Subscription}
+import rx.annotations.Beta
+import rx.lang.scala.{Observable, Subscription, Subscriber}
import rx.lang.scala.JavaConversions._
+/**
+ * A [[ConnectableObservable]] resembles an ordinary [[Observable]], except that it does not begin
+ * emitting items when it is subscribed to, but only when its [[ConnectableObservable.connect connect]] method is called.
+ * In this way you can wait for all intended [[Subscriber]]s to subscribe to the [[Observable]]
+ * before the [[Observable]] begins emitting items.
+ *
+ *
+ *
+ * @define beta
+ * BETA
+ *
+ * @see RxJava Wiki: Connectable Observable Operators
+ * @tparam T the type of items emitted
+ */
class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T])
extends Observable[T] {
@@ -35,4 +50,31 @@ class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observab
* @return a [[rx.lang.scala.Observable]]
*/
def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount())
+
+ /**
+ * $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
+ * when the first [[Subscriber]] subscribes.
+ *
+ * @return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
+ * when the first [[Subscriber]] subscribes
+ */
+ @Beta
+ def autoConnect: Observable[T] = {
+ toScalaObservable(asJavaObservable.autoConnect())
+ }
+
+ /**
+ * $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
+ * when the specified number of [[Subscriber]]s subscribe to it.
+ *
+ * @param numberOfSubscribers the number of [[Subscriber]]s to await before calling connect
+ * on the [[ConnectableObservable]]. A non-positive value indicates
+ * an immediate connection.
+ * @return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
+ * when the specified number of [[Subscriber]]s subscribe to it
+ */
+ @Beta
+ def autoConnect(numberOfSubscribers: Int): Observable[T] = {
+ toScalaObservable(asJavaObservable.autoConnect(numberOfSubscribers))
+ }
}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
index f996f1c1..e3756f2d 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
@@ -70,6 +70,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])",
"delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])",
"delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])",
+ "delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)",
"doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]",
@@ -78,7 +79,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"extend(Func1[_ >: OnSubscribe[T], _ <: R])" -> "[use Scala implicit feature to extend `Observable`]",
- "finallyDo(Action0)" -> "finallyDo(=> Unit)",
+ "doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
@@ -166,6 +167,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultiMap(T => K, T => V, => M)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> commentForToMultimapWithCollectionFactory,
"toSingle()" -> "[TODO]",
+ "toCompletable()" -> "[TODO]",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
@@ -188,6 +190,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)",
+ "combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "[use `combineLatest(iter.toSeq)(Seq[T] => R)`]",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])",
@@ -211,6 +214,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])",
+ "mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.flattenDelayError`]",
+ "mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.flattenDelayError(Int)`]",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)",
"range(Int, Int)" -> commentForRange,