diff --git a/.travis.yml b/.travis.yml
index 385d43dd..2162c962 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,7 +2,7 @@ language: scala
jdk:
- oraclejdk7
script:
-- sbt +test +examples/test
+- sbt +test +examples/test +doc
after_success:
- "./publishViaTravis.sh"
env:
diff --git a/build.sbt b/build.sbt
index fed1abcf..5ea3bd7d 100644
--- a/build.sbt
+++ b/build.sbt
@@ -19,7 +19,64 @@ crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6")
parallelExecution in Test := false
libraryDependencies ++= Seq(
- "io.reactivex" % "rxjava" % "1.0.12",
+ "io.reactivex" % "rxjava" % "1.0.17",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
+
+// Set up the doc mappings
+// See http://stackoverflow.com/questions/16934488/how-to-link-classes-from-jdk-into-scaladoc-generated-doc
+autoAPIMappings := true
+
+val externalJavadocMap = Map(
+ "rxjava" -> "http://reactivex.io/RxJava/javadoc/index.html"
+)
+
+/*
+ * The rt.jar file is located in the path stored in the sun.boot.class.path system property.
+ * See the Oracle documentation at http://docs.oracle.com/javase/6/docs/technotes/tools/findingclasses.html.
+ */
+val rtJar: String = System.getProperty("sun.boot.class.path").split(java.io.File.pathSeparator).collectFirst {
+ case str: String if str.endsWith(java.io.File.separator + "rt.jar") => str
+}.get // fail hard if not found
+
+val javaApiUrl: String = "http://docs.oracle.com/javase/8/docs/api/index.html"
+
+val allExternalJavadocLinks: Seq[String] = javaApiUrl +: externalJavadocMap.values.toSeq
+
+import scala.util.matching.Regex
+import scala.util.matching.Regex.Match
+
+def javadocLinkRegex(javadocURL: String): Regex = ("""\"(\Q""" + javadocURL + """\E)#([^"]*)\"""").r
+
+def hasJavadocLink(f: File): Boolean = allExternalJavadocLinks exists {
+ javadocURL: String =>
+ (javadocLinkRegex(javadocURL) findFirstIn IO.read(f)).nonEmpty
+}
+
+val fixJavaLinks: Match => String = m =>
+ m.group(1) + "?" + m.group(2).replace(".", "/") + ".html"
+
+apiMappings ++= {
+ // Lookup the path to jar from the classpath
+ val classpath = (fullClasspath in Compile).value
+ def findJar(nameBeginsWith: String): File = {
+ classpath.find { attributed: Attributed[File] => (attributed.data ** s"$nameBeginsWith*.jar").get.nonEmpty }.get.data // fail hard if not found
+ }
+ // Define external documentation paths
+ (externalJavadocMap map {
+ case (name, javadocURL) => findJar(name) -> url(javadocURL)
+ }) + (file(rtJar) -> url(javaApiUrl))
+}
+
+// Override the task to fix the links to JavaDoc
+doc in Compile <<= (doc in Compile) map { target: File =>
+ (target ** "*.html").get.filter(hasJavadocLink).foreach { f =>
+ val newContent: String = allExternalJavadocLinks.foldLeft(IO.read(f)) {
+ case (oldContent: String, javadocURL: String) =>
+ javadocLinkRegex(javadocURL).replaceAllIn(oldContent, fixJavaLinks)
+ }
+ IO.write(f, newContent)
+ }
+ target
+}
diff --git a/examples/src/test/scala/examples/ExperimentalAPIExamples.scala b/examples/src/test/scala/examples/ExperimentalAPIExamples.scala
index e5b2d583..5523462f 100644
--- a/examples/src/test/scala/examples/ExperimentalAPIExamples.scala
+++ b/examples/src/test/scala/examples/ExperimentalAPIExamples.scala
@@ -39,26 +39,6 @@ object ExperimentalAPIExamples {
)
}
- @Test def onBackpressureBlockExample(): Unit = {
- Observable[Int](subscriber => {
- (1 to 200).foreach(subscriber.onNext)
- }).doOnNext(v => println(s"emit $v")).onBackpressureBlock.observeOn(IOScheduler()).subscribe {
- v =>
- Thread.sleep(10) // A slow consumer
- println(s"process $v")
- }
- }
-
- @Test def onBackpressureBlockExample2(): Unit = {
- Observable[Int](subscriber => {
- (1 to 200).foreach(subscriber.onNext)
- }).doOnNext(v => println(s"emit $v")).onBackpressureBlock(10).observeOn(IOScheduler()).subscribe {
- v =>
- Thread.sleep(10) // A slow consumer
- println(s"process $v")
- }
- }
-
@Test def doOnRequestExample(): Unit = {
(1 to 300).toObservable.doOnRequest(request => println(s"request $request")).subscribe(new Subscriber[Int]() {
override def onStart(): Unit = request(1)
@@ -112,7 +92,7 @@ object ExperimentalAPIExamples {
@Test def withLatestFromExample2(): Unit = {
val a = Observable.interval(1 second).take(7)
- val b = Observable.timer(3 seconds, 250 millis)
+ val b = Observable.interval(3 seconds, 250 millis)
a.withLatestFrom(b)((x, y) => (x, y)).toBlocking.foreach {
case (x, y) => println(s"a: $x b: $y")
}
diff --git a/examples/src/test/scala/examples/RxScalaDemo.scala b/examples/src/test/scala/examples/RxScalaDemo.scala
index fb418dc2..50eab72b 100644
--- a/examples/src/test/scala/examples/RxScalaDemo.scala
+++ b/examples/src/test/scala/examples/RxScalaDemo.scala
@@ -895,10 +895,10 @@ class RxScalaDemo extends JUnitSuite {
@Test def timeoutExample2(): Unit = {
val firstTimeoutSelector = () => {
- Observable.timer(10 seconds, 10 seconds, ComputationScheduler()).take(1)
+ Observable.interval(10 seconds, 10 seconds, ComputationScheduler()).take(1)
}
val timeoutSelector = (t: Long) => {
- Observable.timer(
+ Observable.interval(
(500 - t * 100) max 1 millis,
(500 - t * 100) max 1 millis,
ComputationScheduler()).take(1)
@@ -1682,4 +1682,80 @@ class RxScalaDemo extends JUnitSuite {
val o2 = Observable.error(new RuntimeException("Oops"))
(o1 ++ o2).subscribe(v => println(v), e => e.printStackTrace(), () => println("completed"))
}
+
+ @Test def concatEagerExample(): Unit = {
+ val o1 = Observable.interval(100 millis).take(3).map(l => s"o1 emit $l").doOnSubscribe(println("subscribe to o1"))
+ val o2 = Observable.interval(100 millis).take(3).map(l => s"o2 emit $l").doOnSubscribe(println("subscribe to o2"))
+ o1.concatEager(o2).subscribe(println(_))
+ }
+
+ @Test def concatEagerExample2(): Unit = {
+ (0 until 10).map { i =>
+ Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
+ }.toObservable.concatEager.subscribe(println(_))
+ }
+
+ @Test def concatEagerExample3(): Unit = {
+ (0 until 10).map { i =>
+ Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
+ }.toObservable.concatEager(capacityHint = 3).subscribe(println(_))
+ }
+
+ @Test def concatMapEagerExample(): Unit = {
+ (0 until 10).toObservable.concatMapEager { i =>
+ Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
+ }.subscribe(println(_))
+ }
+
+ @Test def concatMapEagerExample2(): Unit = {
+ (0 until 10).toObservable.concatMapEager(capacityHint = 10, i => {
+ Observable.interval(100 millis).take(3).map(l => s"o$i emit $l").doOnSubscribe(println(s"subscribe to o$i"))
+ }).subscribe(println(_))
+ }
+
+ @Test def flattenDelayErrorExample() {
+ val o1 = Observable.just(1).delay(200 millis).
+ flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
+ val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
+ val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
+ val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
+ Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace())
+ }
+
+ @Test def flattenDelayErrorExample2() {
+ val o1 = Observable.just(1).delay(200 millis).
+ flatMap(i => Observable.error(new RuntimeException("Oops!"))).doOnSubscribe(println(s"subscribe to o1"))
+ val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
+ val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
+ val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
+ Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace())
+ }
+
+ @Test def blockingObservableSubscribeExample(): Unit = {
+ val b = Observable.just(1, 2, 3).toBlocking
+ println("---b.subscribe()---")
+ b.subscribe()
+ println("---b.subscribe(onNext)---")
+ b.subscribe(println(_))
+ println("---b.subscribe(onNext, onError)---")
+ b.subscribe(println(_), _.printStackTrace())
+ println("---b.subscribe(onNext, onError, onCompleted)---")
+ b.subscribe(println(_), _.printStackTrace(), () => println("onCompleted"))
+ println("---b.subscribe(Observer)---")
+ b.subscribe(new Observer[Int] {
+ override def onNext(v: Int): Unit = println(v)
+
+ override def onError(e: Throwable): Unit = e.printStackTrace()
+
+ override def onCompleted(): Unit = println("onCompleted")
+ })
+ println("---b.subscribe(Subscriber)---")
+ b.subscribe(new Subscriber[Int] {
+ override def onNext(v: Int): Unit = println(v)
+
+ override def onError(e: Throwable): Unit = e.printStackTrace()
+
+ override def onCompleted(): Unit = println("onCompleted")
+ })
+ }
}
diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala
index 77909d40..95071cc5 100644
--- a/src/main/scala/rx/lang/scala/Observable.scala
+++ b/src/main/scala/rx/lang/scala/Observable.scala
@@ -132,7 +132,7 @@ trait Observable[+T]
* $noDefaultScheduler
*
* @return $subscribeAllReturn
- * @throws OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
+ * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
* @see ReactiveX operators documentation: Subscribe
*/
def subscribe(): Subscription = {
@@ -212,7 +212,7 @@ trait Observable[+T]
*
* @param onNext $subscribeCallbacksParamOnNext
* @return $subscribeAllReturn
- * @throws OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
+ * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
* @see ReactiveX operators documentation: Subscribe
*/
def subscribe(onNext: T => Unit): Subscription = {
@@ -331,6 +331,143 @@ trait Observable[+T]
}))
}
+ /**
+ * $experimental Concatenates `this` and `that` source [[Observable]]s eagerly into a single stream of values.
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them
+ * in order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @param that the source to concat with.
+ * @return an [[Observable]] that emits items all of the items emitted by `this` and `that`, one after the other,
+ * without interleaving them.
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def concatEager[U >: T](that: Observable[U]): Observable[U] = {
+ val o1: rx.Observable[_ <: U] = this.asJavaObservable
+ val o2: rx.Observable[_ <: U] = that.asJavaObservable
+ toScalaObservable(rx.Observable.concatEager(o1, o2))
+ }
+
+ /**
+ * $experimental Concatenates an [[Observable]] sequence of [[Observable]]s eagerly into a single stream of values.
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source [[Observable]]s as they are observed. The operator buffers the values emitted by these
+ * [[Observable]]s and then drains them in order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s emitted by
+ * `this`, one after the other, without interleaving them
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def concatEager[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = this
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.concatEager[U](o4)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * $experimental Concatenates an [[Observable]] sequence of [[Observable]]s eagerly into a single stream of values.
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * emitted source [[Observable]]s as they are observed. The operator buffers the values emitted by these
+ * [[Observable]]s and then drains them in order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @param capacityHint hints about the number of expected values in an [[Observable]]
+ * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s emitted by
+ * `this`, one after the other, without interleaving them
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def concatEager[U](capacityHint: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = this
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.concatEager[U](o4, capacityHint)
+ toScalaObservable[U](o5)
+ }
+
+ /**
+ * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single
+ * Observable.
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
+ * order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be
+ * eagerly concatenated
+ * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
+ * `f`, one after the other, without interleaving them
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def concatMapEager[R](f: T => Observable[R]): Observable[R] = {
+ toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
+ def call(t1: T): rx.Observable[_ <: R] = {
+ f(t1).asJavaObservable
+ }
+ }))
+ }
+
+ /**
+ * $experimental Maps a sequence of values into [[Observable]]s and concatenates these [[Observable]]s eagerly into a single
+ * Observable.
+ *
+ * Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
+ * source [[Observable]]s. The operator buffers the values emitted by these [[Observable]]s and then drains them in
+ * order, each one after the previous one completes.
+ *
+ * ===Backpressure:===
+ * Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
+ * are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.
+ *
+ * $noDefaultScheduler
+ *
+ * @param f the function that maps a sequence of values into a sequence of [[Observable]]s that will be
+ * eagerly concatenated
+ * @param capacityHint hints about the number of expected values in an [[Observable]]
+ * @return an [[Observable]] that emits items all of the items emitted by the [[Observable]]s returned by
+ * `f`, one after the other, without interleaving them
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def concatMapEager[R](capacityHint: Int, f: T => Observable[R]): Observable[R] = {
+ toScalaObservable[R](asJavaObservable.concatMapEager[R](new Func1[T, rx.Observable[_ <: R]] {
+ def call(t1: T): rx.Observable[_ <: R] = {
+ f(t1).asJavaObservable
+ }
+ }, capacityHint))
+ }
+
/**
* Wraps this Observable in another Observable that ensures that the resulting
* Observable is chronologically well-behaved.
@@ -1420,7 +1557,7 @@ trait Observable[+T]
* @return an Observable that emits items that are the results of invoking the selector on items emitted by
* a `ConnectableObservable` that shares a single subscription to the source Observable, and
* replays no more than `bufferSize` items that were emitted within the window defined by `time`
- * @throws IllegalArgumentException if `bufferSize` is less than zero
+ * @throws java.lang.IllegalArgumentException if `bufferSize` is less than zero
*/
def replay[R](selector: Observable[T] => Observable[R], bufferSize: Int, time: Duration, scheduler: Scheduler): Observable[R] = {
val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[T]]
@@ -1518,7 +1655,7 @@ trait Observable[+T]
* @param scheduler the scheduler that is used as a time source for the window
* @return a `ConnectableObservable` that shares a single subscription to the source Observable and
* replays at most `bufferSize` items that were emitted during the window defined by `time`
- *@throws IllegalArgumentException if `bufferSize` is less than zero
+ *@throws java.lang.IllegalArgumentException if `bufferSize` is less than zero
*/
def replay(bufferSize: Int, time: Duration, scheduler: Scheduler): ConnectableObservable[T] = {
new ConnectableObservable[T](asJavaObservable.replay(bufferSize, time.length, time.unit, scheduler))
@@ -1952,7 +2089,7 @@ trait Observable[+T]
* @param n number of items to drop from the end of the source sequence
* @return an Observable that emits the items emitted by the source Observable except for the dropped ones
* at the end
- * @throws IndexOutOfBoundsException if `n` is less than zero
+ * @throws java.lang.IndexOutOfBoundsException if `n` is less than zero
*/
def dropRight(n: Int): Observable[T] = {
toScalaObservable(asJavaObservable.skipLast(n))
@@ -2153,7 +2290,7 @@ trait Observable[+T]
* @param time the length of the time window
* @return an Observable that emits at most `count` items from the source Observable that were emitted
* in a specified window of time before the Observable completed
- * @throws IllegalArgumentException if `count` is less than zero
+ * @throws java.lang.IllegalArgumentException if `count` is less than zero
*/
def takeRight(count: Int, time: Duration): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit))
@@ -2172,7 +2309,7 @@ trait Observable[+T]
* @return an Observable that emits at most `count` items from the source Observable that were emitted
* in a specified window of time before the Observable completed, where the timing information is
* provided by the given `scheduler`
- * @throws IllegalArgumentException if `count` is less than zero
+ * @throws java.lang.IllegalArgumentException if `count` is less than zero
*/
def takeRight(count: Int, time: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.takeLast(count, time.length, time.unit, scheduler.asJavaScheduler))
@@ -2474,7 +2611,7 @@ trait Observable[+T]
*
* @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits items that are the result of flattening the Observables emitted by the `source` Observable
- * @throws IllegalArgumentException if `maxConcurrent` is less than or equal to 0
+ * @throws java.lang.IllegalArgumentException if `maxConcurrent` is less than or equal to 0
*/
def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -2485,27 +2622,24 @@ trait Observable[+T]
}
/**
- * This behaves like `flatten` except that if any of the merged Observables
- * notify of an error via [[rx.lang.scala.Observer.onError onError]], this method will
- * refrain from propagating that error notification until all of the merged Observables have
- * finished emitting items.
+ * Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
+ * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
+ * an error notification from one of them, while limiting the
+ * number of concurrent subscriptions to these [[Observable]]s.
*
- *
+ * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
+ * error via `onError`, `flattenDelayError` will refrain from propagating that
+ * error notification until all of the merged [[Observable]]s have finished emitting items.
*
- * Even if multiple merged Observables send `onError` notifications, this method will only invoke the `onError` method of its
- * Observers once.
+ *
*
- * This method allows an Observer to receive all successfully emitted items from all of the
- * source Observables without being interrupted by an error notification from one of them.
- *
- * This operation is only available if `this` is of type `Observable[Observable[U]]` for some `U`,
- * otherwise you'll get a compilation error.
+ * Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only
+ * invoke the `onError` method of its `Observer`s once.
*
- * @return an Observable that emits items that are the result of flattening the items emitted by
- * the Observables emitted by the this Observable
+ * $noDefaultScheduler
*
- * @usecase def flattenDelayError[U]: Observable[U]
- * @inheritdoc
+ * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
+ * @see ReactiveX operators documentation: Merge
*/
def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
val o2: Observable[Observable[U]] = this
@@ -2515,6 +2649,37 @@ trait Observable[+T]
toScalaObservable[U](o5)
}
+ /**
+ * $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
+ * receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
+ * an error notification from one of them, while limiting the
+ * number of concurrent subscriptions to these [[Observable]]s.
+ *
+ * This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
+ * error via `onError`, `flattenDelayError` will refrain from propagating that
+ * error notification until all of the merged [[Observable]]s have finished emitting items.
+ *
+ *
+ *
+ * Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only
+ * invoke the `onError` method of its `Observer`s once.
+ *
+ * $noDefaultScheduler
+ *
+ * @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
+ * @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
+ * @see ReactiveX operators documentation: Merge
+ * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
+ */
+ @Experimental
+ def flattenDelayError[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
+ val o2: Observable[Observable[U]] = this
+ val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
+ val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
+ val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent)
+ toScalaObservable[U](o5)
+ }
+
/**
* Combines two observables, emitting a pair of the latest values of each of
* the source observables each time an event is received from one of the source observables.
@@ -3060,8 +3225,8 @@ trait Observable[+T]
*
*
* @return an Observable that emits the single item emitted by the source Observable
- * @throws IllegalArgumentException if the source emits more than one item
- * @throws NoSuchElementException if the source emits no items
+ * @throws java.lang.IllegalArgumentException if the source emits more than one item
+ * @throws java.util.NoSuchElementException if the source emits no items
* @see RxJava Wiki: single()
* @see "MSDN: Observable.singleAsync()"
*/
@@ -3078,7 +3243,7 @@ trait Observable[+T]
*
* @return an Observable that emits an `Option` with the single item emitted by the source Observable, or
* `None` if the source Observable is empty
- * @throws IllegalArgumentException if the source Observable emits more than one item
+ * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
*/
def singleOption: Observable[Option[T]] = {
val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]]
@@ -3096,7 +3261,7 @@ trait Observable[+T]
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return an Observable that emits the single item emitted by the source Observable, or a default item if
* the source Observable is empty
- * @throws IllegalArgumentException if the source Observable emits more than one item
+ * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
*/
def singleOrElse[U >: T](default: => U): Observable[U] = {
singleOption.map {
@@ -3383,7 +3548,7 @@ trait Observable[+T]
* @param count the number of times the source Observable items are repeated,
* a count of 0 will yield an empty sequence
* @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times
- * @throws IllegalArgumentException if `count` is less than zero
+ * @throws java.lang.IllegalArgumentException if `count` is less than zero
* @see RxJava Wiki: repeat()
* @see MSDN: Observable.Repeat
*/
@@ -3833,7 +3998,7 @@ trait Observable[+T]
* the zero-based index of the item to retrieve
* @return an Observable that emits a single item: the item at the specified position in the sequence of
* those emitted by the source Observable
- * @throws IndexOutOfBoundsException
+ * @throws java.lang.IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable, or index is less than 0
*/
@@ -3853,7 +4018,7 @@ trait Observable[+T]
* the default item
* @return an Observable that emits the item at the specified position in the sequence emitted by the source
* Observable, or the default item if that index is outside the bounds of the source sequence
- * @throws IndexOutOfBoundsException
+ * @throws java.lang.IndexOutOfBoundsException
* if `index` is less than 0
*/
def elementAtOrDefault[U >: T](index: Int, default: U): Observable[U] = {
@@ -4053,8 +4218,8 @@ trait Observable[+T]
* $noDefaultScheduler
*
* @param onNext function to execute for each item.
- * @throws IllegalArgumentException if `onNext` is null
- * @throws OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
+ * @throws java.lang.IllegalArgumentException if `onNext` is null
+ * @throws rx.exceptions.OnErrorNotImplementedException if the [[Observable]] tries to call `onError`
* @since 0.19
* @see ReactiveX operators documentation: Subscribe
*/
@@ -4071,7 +4236,7 @@ trait Observable[+T]
*
* @param onNext function to execute for each item.
* @param onError function to execute when an error is emitted.
- * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null
+ * @throws java.lang.IllegalArgumentException if `onNext` is null, or if `onError` is null
* @since 0.19
* @see ReactiveX operators documentation: Subscribe
*/
@@ -4089,7 +4254,7 @@ trait Observable[+T]
* @param onNext function to execute for each item.
* @param onError function to execute when an error is emitted.
* @param onComplete function to execute when completion is signalled.
- * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null
+ * @throws java.lang.IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null
* @since 0.19
* @see ReactiveX operators documentation: Subscribe
*/
@@ -4536,6 +4701,7 @@ trait Observable[+T]
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("The operator doesn't work properly with [[Observable.subscribeOn]]` and is prone to deadlocks.It will be removed / unavailable in future", "0.25.1")
def onBackpressureBlock(maxQueueLength: Int): Observable[T] = {
asJavaObservable.onBackpressureBlock(maxQueueLength)
}
@@ -4559,6 +4725,7 @@ trait Observable[+T]
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("The operator doesn't work properly with [[Observable.subscribeOn]]` and is prone to deadlocks.It will be removed / unavailable in future", "0.25.1")
def onBackpressureBlock: Observable[T] = {
asJavaObservable.onBackpressureBlock()
}
@@ -5028,6 +5195,53 @@ object Observable {
toScalaObservable[java.lang.Long](rx.Observable.interval(period.length, period.unit, scheduler)).map(_.longValue())
}
+ /**
+ * Returns an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers
+ * after each `period` of time thereafter.
+ *
+ *
+ *
+ * ===Backpressure Support:===
+ * This operator does not support backpressure as it uses time. If the downstream needs a slower rate
+ * it should slow the timer or use something like [[Observable.onBackpressureDrop:* onBackpressureDrop]].
+ *
+ * ===Scheduler:===
+ * `interval` operates by default on the `computation` [[Scheduler]].
+ *
+ * @param initialDelay the initial delay time to wait before emitting the first value of 0L
+ * @param period the period of time between emissions of the subsequent numbers
+ * @return an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers after
+ * each `period` of time thereafter
+ * @see ReactiveX operators documentation: Interval
+ */
+ def interval(initialDelay: Duration, period: Duration): Observable[Long] = {
+ toScalaObservable[java.lang.Long](rx.Observable.interval(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
+ }
+
+ /**
+ * Returns an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers
+ * after each `period` of time thereafter, on a specified [[Scheduler]].
+ *
+ *
+ *
+ * ===Backpressure Support:===
+ * This operator does not support backpressure as it uses time. If the downstream needs a slower rate
+ * it should slow the timer or use something like [[Observable.onBackpressureDrop:* onBackpressureDrop]].
+ *
+ * ===Scheduler:===
+ * you specify which [[Scheduler]] this operator will use.
+ *
+ * @param initialDelay the initial delay time to wait before emitting the first value of `0L`
+ * @param period the period of time between emissions of the subsequent numbers
+ * @param scheduler the [[Scheduler]] on which the waiting happens and items are emitted
+ * @return an [[Observable]] that emits a `0L` after the `initialDelay` and ever increasing numbers after
+ * each `period` of time thereafter, while running on the given [[Scheduler]]
+ * @see ReactiveX operators documentation: Interval
+ */
+ def interval(initialDelay: Duration, period: Duration, scheduler: Scheduler): Observable[Long] = {
+ toScalaObservable[java.lang.Long](rx.Observable.interval(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue())
+ }
+
/**
* Return an Observable that emits a 0L after the `initialDelay` and ever increasing
* numbers after each `period` of time thereafter, on a specified Scheduler.
@@ -5041,6 +5255,7 @@ object Observable {
* @return an Observable that emits a 0L after the `initialDelay` and ever increasing
* numbers after each `period` of time thereafter, while running on the given `scheduler`
*/
+ @deprecated("Use [[Observable$.interval(initialDelay:scala\\.concurrent\\.duration\\.Duration,period:scala\\.concurrent\\.duration\\.Duration)* interval]] instead.", "0.25.1")
def timer(initialDelay: Duration, period: Duration): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS)).map(_.longValue())
}
@@ -5060,6 +5275,7 @@ object Observable {
* @return an Observable that emits a 0L after the `initialDelay` and ever increasing
* numbers after each `period` of time thereafter, while running on the given `scheduler`
*/
+ @deprecated("Use [[Observable$.interval(initialDelay:scala\\.concurrent\\.duration\\.Duration,period:scala\\.concurrent\\.duration\\.Duration,scheduler:rx\\.lang\\.scala\\.Scheduler)* interval]] instead.", "0.25.1")
def timer(initialDelay: Duration, period: Duration, scheduler: Scheduler): Observable[Long] = {
toScalaObservable[java.lang.Long](rx.Observable.timer(initialDelay.toNanos, period.toNanos, duration.NANOSECONDS, scheduler)).map(_.longValue())
}
diff --git a/src/main/scala/rx/lang/scala/Subject.scala b/src/main/scala/rx/lang/scala/Subject.scala
index 757b33e5..d9cbf874 100644
--- a/src/main/scala/rx/lang/scala/Subject.scala
+++ b/src/main/scala/rx/lang/scala/Subject.scala
@@ -73,6 +73,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def hasThrowable: Boolean = asJavaSubject.hasThrowable
/**
@@ -84,6 +85,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def hasCompleted: Boolean = asJavaSubject.hasCompleted
/**
@@ -96,6 +98,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def getThrowable: Throwable = asJavaSubject.getThrowable
/**
@@ -112,6 +115,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def hasValue: Boolean = asJavaSubject.hasValue
/**
@@ -129,6 +133,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def getValue: T = asJavaSubject.getValue.asInstanceOf[T]
/**
@@ -140,6 +145,7 @@ trait Subject[T] extends Observable[T] with Observer[T] {
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
*/
@Experimental
+ @deprecated("this method will be moved to each Subject class individually", "0.25.1")
def getValues: Seq[T] = asJavaSubject.getValues.map(_.asInstanceOf[T])
}
diff --git a/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala b/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
index dc398e15..11a9bcd1 100644
--- a/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
+++ b/src/main/scala/rx/lang/scala/observables/BlockingObservable.scala
@@ -17,8 +17,9 @@ package rx.lang.scala.observables
import scala.collection.JavaConverters._
import scala.concurrent.{Future, Promise}
+import rx.annotations.Experimental
import rx.lang.scala.ImplicitFunctionConversions._
-import rx.lang.scala.Observable
+import rx.lang.scala.{Observable, Observer, Subscriber}
import rx.observables.{BlockingObservable => JBlockingObservable}
@@ -26,6 +27,9 @@ import rx.observables.{BlockingObservable => JBlockingObservable}
* An Observable that provides blocking operators.
*
* You can obtain a BlockingObservable from an Observable using [[rx.lang.scala.Observable.toBlocking]]
+ *
+ * @define experimental
+ * EXPERIMENTAL
*/
// constructor is private because users should use Observable.toBlocking
class BlockingObservable[+T] private[scala] (val o: Observable[T])
@@ -34,20 +38,18 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
// This is def because "field definition is not allowed in value class"
private def asJava: JBlockingObservable[_ <: T] = o.asJavaObservable.toBlocking
/**
- * Invoke a method on each item emitted by the {@link Observable}; block until the Observable
+ * Invoke a method on each item emitted by the [[Observable]]; block until the Observable
* completes.
*
- * NOTE: This will block even if the Observable is asynchronous.
+ * NOTE: This will block even if the [[Observable]] is asynchronous.
*
- * This is similar to {@link Observable#subscribe(Observer)}, but it blocks. Because it blocks it does
- * not need the {@link Observer#onCompleted()} or {@link Observer#onError(Throwable)} methods.
+ * This is similar to [[Observable.subscribe()*]], but it blocks. Because it blocks it does
+ * not need the [[Observer.onCompleted]] or [[Observer.onError]] methods.
*
*
*
- * @param f
- * the {@link Action1} to invoke for every item emitted by the {@link Observable}
- * @throws RuntimeException
- * if an error occurs
+ * @param f the action to invoke for every item emitted by the [[Observable]]
+ * @throws java.lang.RuntimeException if an error occurs
*/
def foreach(f: T => Unit): Unit = {
asJava.forEach(f)
@@ -64,8 +66,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
*
*
* @return the last item emitted by the source [[Observable]]
- * @throws NoSuchElementException
- * if source contains no elements
+ * @throws java.util.NoSuchElementException if source contains no elements
* @see RxJava Wiki: last()
* @see MSDN: Observable.Last
*/
@@ -103,8 +104,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
* `NoSuchElementException` if source contains no elements.
*
* @return the first item emitted by the source [[Observable]]
- * @throws NoSuchElementException
- * if source contains no elements
+ * @throws java.util.NoSuchElementException if source contains no elements
* @see RxJava Wiki: first()
* @see MSDN: Observable.First
*/
@@ -117,8 +117,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
* `NoSuchElementException` if source contains no elements.
*
* @return the first item emitted by the source [[Observable]]
- * @throws NoSuchElementException
- * if source contains no elements
+ * @throws java.util.NoSuchElementException if source contains no elements
* @see RxJava Wiki: first()
* @see MSDN: Observable.First
* @see [[BlockingObservable.first]]
@@ -182,8 +181,8 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
*
*
* @return an Observable that emits the single item emitted by the source Observable
- * @throws IllegalArgumentException if the source emits more than one item
- * @throws NoSuchElementException if the source emits no items
+ * @throws java.lang.IllegalArgumentException if the source emits more than one item
+ * @throws java.util.NoSuchElementException if the source emits no items
*/
def single: T = {
asJava.single(): T // useless ascription because of compiler bug
@@ -196,7 +195,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
*
* @return an `Option` with the single item emitted by the source Observable, or
* `None` if the source Observable is empty
- * @throws IllegalArgumentException if the source Observable emits more than one item
+ * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
*/
def singleOption: Option[T] = {
o.singleOption.toBlocking.single
@@ -213,7 +212,7 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
* This is a by-name parameter, so it is only evaluated if the source Observable doesn't emit anything.
* @return the single item emitted by the source Observable, or a default item if
* the source Observable is empty
- * @throws IllegalArgumentException if the source Observable emits more than one item
+ * @throws java.lang.IllegalArgumentException if the source Observable emits more than one item
*/
def singleOrElse[U >: T](default: => U): U = {
singleOption getOrElse default
@@ -265,6 +264,70 @@ class BlockingObservable[+T] private[scala] (val o: Observable[T])
o.single.subscribe(t => p.success(t), e => p.failure(e))
p.future
}
+
+ /**
+ * $experimental Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
+ */
+ @Experimental
+ def subscribe(): Unit = {
+ asJava.subscribe()
+ }
+
+ /**
+ * $experimental Subscribes to the source and calls the given functions on the current thread, or
+ * rethrows any exception wrapped into [[rx.exceptions.OnErrorNotImplementedException]].
+ *
+ * @param onNext this function will be called whenever the Observable emits an item
+ */
+ @Experimental
+ def subscribe(onNext: T => Unit): Unit = {
+ asJava.subscribe(onNext)
+ }
+
+ /**
+ * $experimental Subscribes to the source and calls the given functions on the current thread.
+ *
+ * @param onNext this function will be called whenever the Observable emits an item
+ * @param onError this function will be called if an error occurs
+ */
+ @Experimental
+ def subscribe(onNext: T => Unit, onError: Throwable => Unit): Unit = {
+ asJava.subscribe(onNext, onError)
+ }
+
+ /**
+ * $experimental Subscribes to the source and calls the given functions on the current thread.
+ *
+ * @param onNext this function will be called whenever the Observable emits an item
+ * @param onError this function will be called if an error occurs
+ * @param onCompleted this function will be called when this Observable has finished emitting items
+ */
+ @Experimental
+ def subscribe(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Unit = {
+ asJava.subscribe(onNext, onError, onCompleted)
+ }
+
+ /**
+ * $experimental Subscribes to the source and calls back the [[Observer]] methods on the current thread.
+ *
+ * @param observer the [[Observer]] to call event methods on
+ */
+ @Experimental
+ def subscribe(observer: Observer[T]): Unit = {
+ asJava.subscribe(observer.asJavaObserver)
+ }
+
+ /**
+ * $experimental Subscribes to the source and calls the [[Subscriber]] methods on the current thread.
+ *
+ * The unsubscription and backpressure is composed through.
+ *
+ * @param subscriber the [[Subscriber]] to forward events and calls to in the current thread
+ */
+ @Experimental
+ def subscribe(subscriber: Subscriber[T]): Unit = {
+ asJava.subscribe(subscriber.asJavaSubscriber)
+ }
}
// Cannot yet have inner class because of this error message:
diff --git a/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala b/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
index d59f00b3..83d36fb6 100644
--- a/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
+++ b/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
@@ -73,7 +73,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* Assert that a single terminal event occurred, either `onCompleted` or `onError`.
*
- * @throws AssertionError if not exactly one terminal event notification was received
+ * @throws java.lang.AssertionError if not exactly one terminal event notification was received
*/
@throws[AssertionError]
def assertTerminalEvent(): Unit = {
@@ -83,7 +83,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* Assert that this [[Subscriber]] is unsubscribed.
*
- * @throws AssertionError if this [[Subscriber]] is not unsubscribed
+ * @throws java.lang.AssertionError if this [[Subscriber]] is not unsubscribed
*/
@throws[AssertionError]
def assertUnsubscribed(): Unit = {
@@ -93,7 +93,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* Assert that this [[Subscriber]] has received no `onError` notifications.
*
- * @throws AssertionError if this [[Subscriber]] has received one or more `onError` notifications
+ * @throws java.lang.AssertionError if this [[Subscriber]] has received one or more `onError` notifications
*/
@throws[AssertionError]
def assertNoErrors(): Unit = {
@@ -104,7 +104,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* Blocks until this [[Subscriber]] receives a notification that the [[Observable]] is complete
* (either an `onCompleted` or `onError` notification).
*
- * @throws RuntimeException if the Subscriber is interrupted before the Observable is able to complete
+ * @throws java.lang.RuntimeException if the Subscriber is interrupted before the Observable is able to complete
*/
@throws[RuntimeException]
def awaitTerminalEvent(): Unit = {
@@ -116,7 +116,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* (either an `onCompleted` or `onError` notification), or until a timeout expires.
*
* @param timeout the duration of the timeout
- * @throws RuntimeException if the Subscriber is interrupted before the Observable is able to complete
+ * @throws java.lang.RuntimeException if the Subscriber is interrupted before the Observable is able to complete
*/
@throws[RuntimeException]
def awaitTerminalEvent(timeout: Duration): Unit = {
@@ -148,7 +148,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* $experimental Assert if there is exactly a single completion event.
*
- * @throws AssertionError if there were zero, or more than one, onCompleted events
+ * @throws java.lang.AssertionError if there were zero, or more than one, onCompleted events
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -160,7 +160,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* $experimental Assert if there is no completion event.
*
- * @throws AssertionError if there were one or more than one onCompleted events
+ * @throws java.lang.AssertionError if there were one or more than one onCompleted events
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -173,7 +173,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* $experimental Assert if there is exactly one error event which is a subclass of the given class.
*
* @param clazz the class to check the error against.
- * @throws AssertionError if there were zero, or more than one, onError events, or if the single onError
+ * @throws java.lang.AssertionError if there were zero, or more than one, onError events, or if the single onError
* event did not carry an error of a subclass of the given class
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@@ -187,7 +187,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* $experimental Assert there is a single onError event with the exact exception.
*
* @param throwable the throwable to check
- * @throws AssertionError if there were zero, or more than one, onError events, or if the single onError
+ * @throws java.lang.AssertionError if there were zero, or more than one, onError events, or if the single onError
* event did not carry an error that matches the specified throwable
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@@ -200,7 +200,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* $experimental Assert for no onError and onCompleted events.
*
- * @throws AssertionError if there was either an onError or onCompleted event
+ * @throws java.lang.AssertionError if there was either an onError or onCompleted event
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -212,7 +212,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
/**
* $experimental Assert if there are no onNext events received.
*
- * @throws AssertionError if there were any onNext events
+ * @throws java.lang.AssertionError if there were any onNext events
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -225,7 +225,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* $experimental Assert if the given number of onNext events are received.
*
* @param count the expected number of onNext events
- * @throws AssertionError if there were more or fewer onNext events than specified by `count`
+ * @throws java.lang.AssertionError if there were more or fewer onNext events than specified by `count`
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -238,7 +238,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* $experimental Assert if the received onNext events, in order, are the specified items.
*
* @param values the items to check
- * @throws AssertionError if the items emitted do not exactly match those specified by `values`
+ * @throws java.lang.AssertionError if the items emitted do not exactly match those specified by `values`
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
@@ -251,7 +251,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* $experimental Assert if there is only a single received onNext event and that it marks the emission of a specific item.
*
* @param value the item to check
- * @throws AssertionError if the [[Observable]] does not emit only the single item specified by `value`
+ * @throws java.lang.AssertionError if the [[Observable]] does not emit only the single item specified by `value`
* @since (if this graduates from "Experimental" replace this parenthetical with the release number)
*/
@Experimental
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/BlockingObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/BlockingObservableCompletenessKit.scala
index ff29324a..3aa5545d 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/BlockingObservableCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/BlockingObservableCompletenessKit.scala
@@ -35,6 +35,7 @@ class BlockingObservableCompletenessKit extends CompletenessKit {
"single(Func1[_ >: T, Boolean])" -> "[use `Observable.filter(p).toBlocking.single`]",
"singleOrDefault(T)" -> "singleOrElse(=> U)",
"singleOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `Observable.filter(p).toBlocking.singleOrElse(=> U)`]",
+ "subscribe()" -> "subscribe()",
"getIterator()" -> "[use `toIterable.toIterator`]"
)
}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
index 1a2289fc..f996f1c1 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
@@ -61,6 +61,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"cast(Class[R])" -> "[RxJava needs this one because `rx.Observable` is invariant. `Observable` in RxScala is covariant and does not need this operator.]",
"collect(Func0[R], Action2[R, _ >: T])" -> "[TODO: See https://github.com/ReactiveX/RxScala/issues/63]",
"compose(Transformer[_ >: T, _ <: R])" -> "[use extension methods instead]",
+ "concatMapEager(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "concatMapEager(Int, T => Observable[R])",
"concatWith(Observable[_ <: T])" -> "[use `o1 ++ o2`]",
"contains(Any)" -> "contains(U)",
"count()" -> "length",
@@ -76,6 +77,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"doOnUnsubscribe(Action0)" -> "doOnUnsubscribe(=> Unit)",
"doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
+ "extend(Func1[_ >: OnSubscribe[T], _ <: R])" -> "[use Scala implicit feature to extend `Observable`]",
"finallyDo(Action0)" -> "finallyDo(=> Unit)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
@@ -163,6 +165,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V])" -> "toMultiMap(T => K, T => V)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultiMap(T => K, T => V, => M)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> commentForToMultimapWithCollectionFactory,
+ "toSingle()" -> "[TODO]",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
@@ -186,8 +189,13 @@ class ObservableCompletenessKit extends CompletenessKit {
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
+ "concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
+ "concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])",
+ "concatEager(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concatEager`]",
+ "concatEager(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.concatEager(Int)`]",
"defer(Func0[Observable[T]])" -> "defer(=> Observable[T])",
"from(Array[T])" -> "from(Iterable[T])",
+ "fromCallable(Callable[_ <: T])" -> "[use `Observable.defer(Observable.just(expensiveComputation))`]",
"from(Iterable[_ <: T])" -> "from(Iterable[T])",
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
@@ -198,9 +206,11 @@ class ObservableCompletenessKit extends CompletenessKit {
"merge(Observable[_ <: Observable[_ <: T]], Int)" -> "flatten(Int)(<:<[Observable[T], Observable[Observable[U]]])",
"merge(Array[Observable[_ <: T]])" -> "[use `Observable.from(array).flatten`]",
"merge(Iterable[_ <: Observable[_ <: T]])" -> "[use `Observable.from(iter).flatten`]",
+ "merge(Array[Observable[_ <: T]], Int)" -> "[use `Observable.from(array).flatten(n)`]",
"merge(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `Observable.from(iter).flatten(n)`]",
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
+ "mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)",
"range(Int, Int)" -> commentForRange,
@@ -242,5 +252,8 @@ class ObservableCompletenessKit extends CompletenessKit {
}).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// amb 2-9
"amb(" + _ + ")" -> "[unnecessary because we can use `o1 amb o2` instead or `amb(List(o1, o2, o3, ...)`]"
+ ).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
+ // concatEager 2-9
+ "concatEager(" + _ + ")" -> "[unnecessary because we can use `concatEager` instead or `Observable(o1, o2, ...).concatEager`]"
).drop(1).toMap
}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
index 1f445c54..11b2c27b 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
@@ -31,6 +31,12 @@ class TestSubscriberCompletenessKit extends CompletenessKit {
"getOnCompletedEvents()" -> "assertCompleted()",
"getOnErrorEvents()" -> "getOnErrorEvents",
"getOnNextEvents()" -> "getOnNextEvents",
- "isUnsubscribed()" -> "isUnsubscribed"
+ "isUnsubscribed()" -> "isUnsubscribed",
+
+ "create()" -> "apply()",
+ "create(Long)" -> "apply(Long)",
+ "create(Observer[T])" -> "apply(Observer[T])",
+ "create(Observer[T], Long)" -> "apply(Observer[T], Long)",
+ "create(Subscriber[T])" -> "apply(Subscriber[T])"
)
}