diff --git a/.travis.yml b/.travis.yml
index 32db2f3b..7fbc0039 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,7 +6,7 @@ matrix:
- jdk: oraclejdk7
scala: 2.11.8
- jdk: oraclejdk8
- scala: 2.12.0-M4
+ scala: 2.12.0-M5
script:
- sbt ++$TRAVIS_SCALA_VERSION test examples/test doc
after_success:
diff --git a/build.sbt b/build.sbt
index 2a7922c3..01e22d45 100644
--- a/build.sbt
+++ b/build.sbt
@@ -14,15 +14,19 @@ scalacOptions in ThisBuild := Seq("-feature", "-unchecked", "-deprecation", "-en
scalaVersion in ThisBuild := "2.11.8"
-crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
+crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M5")
parallelExecution in Test := false
-libraryDependencies ++= Seq(
- "io.reactivex" % "rxjava" % "1.1.6",
- "org.mockito" % "mockito-core" % "1.9.5" % "test",
- "junit" % "junit" % "4.11" % "test",
- "org.scalatest" %% "scalatest" % "2.2.6" % "test")
+libraryDependencies ++= {
+ // Scalatest 3.0.0 supports 2.12.0-M5 but it drops Scala 2.10 support. So just use 2.2.6 for Scala 2.10.
+ val scalatestVersion = if (scalaVersion.value.startsWith("2.10")) "2.2.6" else "3.0.0"
+ Seq(
+ "io.reactivex" % "rxjava" % "1.1.9",
+ "org.mockito" % "mockito-core" % "1.9.5" % "test",
+ "junit" % "junit" % "4.11" % "test",
+ "org.scalatest" %% "scalatest" % scalatestVersion % "test")
+}
// Set up the doc mappings
// See http://stackoverflow.com/questions/16934488/how-to-link-classes-from-jdk-into-scaladoc-generated-doc
diff --git a/examples/src/test/scala/examples/TestSchedulerExample.scala b/examples/src/test/scala/examples/TestSchedulerExample.scala
index dff84be2..a85c9151 100644
--- a/examples/src/test/scala/examples/TestSchedulerExample.scala
+++ b/examples/src/test/scala/examples/TestSchedulerExample.scala
@@ -21,7 +21,7 @@ import org.mockito.Mockito._
import org.scalatest.junit.JUnitSuite
import rx.lang.scala._
import rx.lang.scala.schedulers.TestScheduler
-import rx.observers.TestObserver
+import rx.observers.TestSubscriber
import rx.lang.scala.JavaConversions._
class TestSchedulerExample extends JUnitSuite {
@@ -34,7 +34,7 @@ class TestSchedulerExample extends JUnitSuite {
val o = Observable.interval(1 second, scheduler)
// Wrap Java Observer in Scala Observer, then subscribe
- val sub = o.subscribe(toScalaObserver(new TestObserver(observer)))
+ val sub = o.subscribe(toScalaSubscriber(new TestSubscriber(observer)))
verify(observer, never).onNext(0L)
verify(observer, never).onCompleted()
diff --git a/src/main/scala/rx/lang/scala/Observable.scala b/src/main/scala/rx/lang/scala/Observable.scala
index a14c4613..873c0a3f 100644
--- a/src/main/scala/rx/lang/scala/Observable.scala
+++ b/src/main/scala/rx/lang/scala/Observable.scala
@@ -3433,6 +3433,27 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector))
}
+ /**
+ * $experimental Returns an [[Observable]] that emits all items emitted by the source [[Observable]] that are distinct from their
+ * immediate predecessors when compared with each other via the provided comparator function.
+ *
+ *
+ *
+ * ===Backpressure:===
+ * The operator doesn't interfere with backpressure which is determined by the source [[Observable]]'s backpressure behavior.
+ *
+ * $noDefaultScheduler
+ *
+ * @param comparator the function that receives the previous item and the current item and is
+ * expected to return true if the two are equal, thus skipping the current value.
+ * @return an [[Observable]] that emits those items from the source Observable that are distinct from their immediate predecessors
+ * @see ReactiveX operators documentation: Distinct
+ */
+ @Experimental
+ def distinctUntilChanged[U](comparator: (T, T) => Boolean): Observable[T] = {
+ toScalaObservable[T](asJavaObservable.distinctUntilChanged(comparator))
+ }
+
/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
*
diff --git a/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala b/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
index 83d36fb6..f75c6c8f 100644
--- a/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
+++ b/src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
@@ -16,7 +16,7 @@
package rx.lang.scala.observers
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import rx.{Subscriber => JSubscriber, Observer => JObserver, Subscription => JSubscription}
import rx.annotations.Experimental
@@ -48,7 +48,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of the `Throwable`s that were passed to the [[Subscriber.onError]] method
*/
def getOnErrorEvents: Seq[Throwable] = {
- jTestSubscriber.getOnErrorEvents()
+ jTestSubscriber.getOnErrorEvents().asScala
}
/**
@@ -57,7 +57,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of items observed by this [[Subscriber]], in the order in which they were observed
*/
def getOnNextEvents: Seq[T] = {
- jTestSubscriber.getOnNextEvents()
+ jTestSubscriber.getOnNextEvents().asScala
}
/**
@@ -259,6 +259,40 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
def assertValue(value: T): Unit = {
jTestSubscriber.assertValue(value)
}
+
+ /**
+ * $experimental Assert that the [[TestSubscriber]] contains the given first and optional rest values exactly
+ * and if so, clears the internal list of values.
+ * {{{
+ * val ts = TestSubscriber()
+ *
+ * ts.onNext(1)
+ *
+ * ts.assertValuesAndClear(1)
+ *
+ * ts.onNext(2)
+ * ts.onNext(3)
+ *
+ * ts.assertValuesAndClear(2, 3) // no mention of 1
+ * }}}
+ *
+ * @param expectedFirstValue the expected first value
+ * @param expectedRestValues the optional rest values
+ */
+ @Experimental
+ def assertValuesAndClear(expectedFirstValue: T, expectedRestValues: T*): Unit = {
+ jTestSubscriber.assertValuesAndClear(expectedFirstValue, expectedRestValues: _*)
+ }
+
+ /**
+ * $experimental Returns the number of times onCompleted was called on this [[TestSubscriber]].
+ *
+ * @return the number of times onCompleted was called on this [[TestSubscriber]].
+ */
+ @Experimental
+ def completions: Int = {
+ jTestSubscriber.getCompletions
+ }
}
/**
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/CompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/CompletenessKit.scala
index 986520c1..d07de648 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/CompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/CompletenessKit.scala
@@ -106,6 +106,8 @@ trait CompletenessKit extends JUnitSuite {
.filter(! _.contains("access$000"))
// Ignore constructors
.filter(! _.startsWith(""))
+ // TODO Skips withLatestFrom right now due to https://github.com/ReactiveX/RxJava/pull/4447
+ .filter(! _.startsWith("withLatestFrom(Observable[T1]"))
}
/**
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
index 83133dc8..abd7ce52 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala
@@ -105,6 +105,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",
+ "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
@@ -179,6 +180,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer], Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
+ "sorted()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
+ "sorted(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Int)" -> "tumbling(Int)",
"window(Int, Int)" -> "sliding(Int, Int)",
"window(Long, TimeUnit)" -> "tumbling(Duration)",
@@ -202,6 +205,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
+ "concat(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concat`]",
"concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]",
"concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
@@ -215,6 +219,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
"from(Future[_ <: T], Scheduler)" -> fromFuture,
+ "fromAsync(Action1[AsyncEmitter[T]], BackpressureMode)" -> "[TODO]",
"just(T)" -> "just(T*)",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
@@ -237,8 +242,11 @@ class ObservableCompletenessKit extends CompletenessKit {
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)",
+ "withLatestFrom(Array[Observable[_]], FuncN[R])" -> "[TODO]",
+ "withLatestFrom(Iterable[Observable[_]], FuncN[R])" -> "[TODO]",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
+ "zip(Array[Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zipWith(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U])((T, U) => R)",
"zipWith(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U])((T, U) => R)"
@@ -273,5 +281,8 @@ class ObservableCompletenessKit extends CompletenessKit {
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// concatEager 2-9
"concatEager(" + _ + ")" -> "[unnecessary because we can use `concatEager` instead or `Observable(o1, o2, ...).concatEager`]"
+ ).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
+ // concatDelayError 2-9
+ "concatDelayError(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).delayError.concat`]"
).drop(1).toMap
}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/TestSchedulerCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/TestSchedulerCompletenessKit.scala
index 42fde123..0ab31d79 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/TestSchedulerCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/TestSchedulerCompletenessKit.scala
@@ -23,5 +23,8 @@ class TestSchedulerCompletenessKit extends CompletenessKit {
override val rxScalaType = typeOf[rx.lang.scala.schedulers.TestScheduler]
// There are two methods without "()", so using `isOmittingParenthesesForArity0Method = true` by default is simpler
- override protected def correspondenceChanges: Map[String, String] = Map("triggerActions()" -> "triggerActions()")
+ override protected def correspondenceChanges: Map[String, String] = Map(
+ "triggerActions()" -> "triggerActions()",
+ "when(Func1[Observable[Observable[Completable]], Completable])" -> "[TODO]"
+ )
}
diff --git a/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala b/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
index 11b2c27b..237feb05 100644
--- a/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
+++ b/src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala
@@ -28,10 +28,12 @@ class TestSubscriberCompletenessKit extends CompletenessKit {
"assertError(Class[_ <: Throwable])" -> "assertError(Class[_ <: Throwable])",
"assertReceivedOnNext(List[T])" -> "assertValues(T*)",
"getLastSeenThread()" -> "getLastSeenThread",
- "getOnCompletedEvents()" -> "assertCompleted()",
"getOnErrorEvents()" -> "getOnErrorEvents",
"getOnNextEvents()" -> "getOnNextEvents",
"isUnsubscribed()" -> "isUnsubscribed",
+ "getCompletions()" -> "completions",
+ "getValueCount()" -> "[TODO]",
+ "awaitValueCount(Int, Long, TimeUnit)" -> "[TODO]",
"create()" -> "apply()",
"create(Long)" -> "apply(Long)",
diff --git a/src/test/scala/rx/lang/scala/SubscriberTests.scala b/src/test/scala/rx/lang/scala/SubscriberTests.scala
index e0b32fe6..445ce130 100644
--- a/src/test/scala/rx/lang/scala/SubscriberTests.scala
+++ b/src/test/scala/rx/lang/scala/SubscriberTests.scala
@@ -15,6 +15,8 @@
*/
package rx.lang.scala
+import rx.lang.scala.observers.TestSubscriber
+
import scala.collection.mutable
import org.junit.Test
@@ -182,4 +184,19 @@ class SubscriberTests extends JUnitSuite {
val zeros = new Array[Int](10).toList
assertEquals(zeros, l)
}
+
+ @Test def testIssue202() {
+ // https://github.com/ReactiveX/RxScala/issues/202
+ val subject = Subject[Option[Unit]]()
+ val testSubscriber = TestSubscriber[Option[Unit]]()
+ subject.filter(_.isDefined).subscribe(testSubscriber)
+ testSubscriber.assertNoValues()
+ subject.onNext(None)
+ testSubscriber.assertNoValues()
+ subject.onNext(Some(()))
+ testSubscriber.assertValuesAndClear(Some(()))
+ testSubscriber.assertNoValues()
+ subject.onNext(None)
+ testSubscriber.assertNoValues()
+ }
}