Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ scalacOptions in ThisBuild := Seq("-feature", "-unchecked", "-deprecation", "-en

scalaVersion in ThisBuild := "2.11.8"

crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M5")

parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.6",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
libraryDependencies ++= {
// Scalatest 3.0.0 supports 2.12.0-M5 but it drops Scala 2.10 support. So just use 2.2.6 for Scala 2.10.
val scalatestVersion = if (scalaVersion.value.startsWith("2.10")) "2.2.6" else "3.0.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 😃

Seq(
"io.reactivex" % "rxjava" % "1.1.9",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % scalatestVersion % "test")
}

// Set up the doc mappings
// See http://stackoverflow.com/questions/16934488/how-to-link-classes-from-jdk-into-scaladoc-generated-doc
Expand Down
4 changes: 2 additions & 2 deletions examples/src/test/scala/examples/TestSchedulerExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.mockito.Mockito._
import org.scalatest.junit.JUnitSuite
import rx.lang.scala._
import rx.lang.scala.schedulers.TestScheduler
import rx.observers.TestObserver
import rx.observers.TestSubscriber
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestObserver is deprecated.

import rx.lang.scala.JavaConversions._

class TestSchedulerExample extends JUnitSuite {
Expand All @@ -34,7 +34,7 @@ class TestSchedulerExample extends JUnitSuite {
val o = Observable.interval(1 second, scheduler)

// Wrap Java Observer in Scala Observer, then subscribe
val sub = o.subscribe(toScalaObserver(new TestObserver(observer)))
val sub = o.subscribe(toScalaSubscriber(new TestSubscriber(observer)))

verify(observer, never).onNext(0L)
verify(observer, never).onCompleted()
Expand Down
21 changes: 21 additions & 0 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3433,6 +3433,27 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector))
}

/**
* $experimental Returns an [[Observable]] that emits all items emitted by the source [[Observable]] that are distinct from their
* immediate predecessors when compared with each other via the provided comparator function.
*
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="" />
*
* ===Backpressure:===
* The operator doesn't interfere with backpressure which is determined by the source [[Observable]]'s backpressure behavior.
*
* $noDefaultScheduler
*
* @param comparator the function that receives the previous item and the current item and is
* expected to return true if the two are equal, thus skipping the current value.
* @return an [[Observable]] that emits those items from the source Observable that are distinct from their immediate predecessors
* @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
*/
@Experimental
def distinctUntilChanged[U](comparator: (T, T) => Boolean): Observable[T] = {
toScalaObservable[T](asJavaObservable.distinctUntilChanged(comparator))
}

/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
*
Expand Down
40 changes: 37 additions & 3 deletions src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.lang.scala.observers

import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConversions is deprecated in Scala 2.12.0-M5

import scala.concurrent.duration.Duration
import rx.{Subscriber => JSubscriber, Observer => JObserver, Subscription => JSubscription}
import rx.annotations.Experimental
Expand Down Expand Up @@ -48,7 +48,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of the `Throwable`s that were passed to the [[Subscriber.onError]] method
*/
def getOnErrorEvents: Seq[Throwable] = {
jTestSubscriber.getOnErrorEvents()
jTestSubscriber.getOnErrorEvents().asScala
}

/**
Expand All @@ -57,7 +57,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of items observed by this [[Subscriber]], in the order in which they were observed
*/
def getOnNextEvents: Seq[T] = {
jTestSubscriber.getOnNextEvents()
jTestSubscriber.getOnNextEvents().asScala
}

/**
Expand Down Expand Up @@ -259,6 +259,40 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
def assertValue(value: T): Unit = {
jTestSubscriber.assertValue(value)
}

/**
* $experimental Assert that the [[TestSubscriber]] contains the given first and optional rest values exactly
* and if so, clears the internal list of values.
* {{{
* val ts = TestSubscriber()
*
* ts.onNext(1)
*
* ts.assertValuesAndClear(1)
*
* ts.onNext(2)
* ts.onNext(3)
*
* ts.assertValuesAndClear(2, 3) // no mention of 1
* }}}
*
* @param expectedFirstValue the expected first value
* @param expectedRestValues the optional rest values
*/
@Experimental
def assertValuesAndClear(expectedFirstValue: T, expectedRestValues: T*): Unit = {
jTestSubscriber.assertValuesAndClear(expectedFirstValue, expectedRestValues: _*)
}

/**
* $experimental Returns the number of times onCompleted was called on this [[TestSubscriber]].
*
* @return the number of times onCompleted was called on this [[TestSubscriber]].
*/
@Experimental
def getCompletions: Int = {
jTestSubscriber.getCompletions
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getXxx method names are not so Scala-typical... Why not just completions?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ trait CompletenessKit extends JUnitSuite {
.filter(! _.contains("access$000"))
// Ignore constructors
.filter(! _.startsWith("<init>"))
// TODO Skips withLatestFrom right now due to https://github.com/ReactiveX/RxJava/pull/4447
.filter(! _.startsWith("withLatestFrom(Observable[T1]"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
Expand Down Expand Up @@ -179,6 +180,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer], Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"sorted()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"sorted(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Int)" -> "tumbling(Int)",
"window(Int, Int)" -> "sliding(Int, Int)",
"window(Long, TimeUnit)" -> "tumbling(Duration)",
Expand All @@ -202,6 +205,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"concat(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concat`]",
"concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]",
"concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
Expand All @@ -215,6 +219,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
"from(Future[_ <: T], Scheduler)" -> fromFuture,
"fromAsync(Action1[AsyncEmitter[T]], BackpressureMode)" -> "[TODO]",
"just(T)" -> "just(T*)",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
Expand All @@ -237,8 +242,11 @@ class ObservableCompletenessKit extends CompletenessKit {
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)",
"withLatestFrom(Array[Observable[_]], FuncN[R])" -> "[TODO]",
"withLatestFrom(Iterable[Observable[_]], FuncN[R])" -> "[TODO]",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zip(Array[Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zipWith(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U])((T, U) => R)",
"zipWith(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U])((T, U) => R)"
Expand Down Expand Up @@ -273,5 +281,8 @@ class ObservableCompletenessKit extends CompletenessKit {
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// concatEager 2-9
"concatEager(" + _ + ")" -> "[unnecessary because we can use `concatEager` instead or `Observable(o1, o2, ...).concatEager`]"
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// concatDelayError 2-9
"concatDelayError(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).delayError.concat`]"
).drop(1).toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ class TestSchedulerCompletenessKit extends CompletenessKit {
override val rxScalaType = typeOf[rx.lang.scala.schedulers.TestScheduler]

// There are two methods without "()", so using `isOmittingParenthesesForArity0Method = true` by default is simpler
override protected def correspondenceChanges: Map[String, String] = Map("triggerActions()" -> "triggerActions()")
override protected def correspondenceChanges: Map[String, String] = Map(
"triggerActions()" -> "triggerActions()",
"when(Func1[Observable[Observable[Completable]], Completable])" -> "[TODO]"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ class TestSubscriberCompletenessKit extends CompletenessKit {
"assertError(Class[_ <: Throwable])" -> "assertError(Class[_ <: Throwable])",
"assertReceivedOnNext(List[T])" -> "assertValues(T*)",
"getLastSeenThread()" -> "getLastSeenThread",
"getOnCompletedEvents()" -> "assertCompleted()",
"getOnErrorEvents()" -> "getOnErrorEvents",
"getOnNextEvents()" -> "getOnNextEvents",
"isUnsubscribed()" -> "isUnsubscribed",
"getCompletions()" -> "getCompletions",
"getValueCount()" -> "[TODO]",
"awaitValueCount(Int, Long, TimeUnit)" -> "[TODO]",

"create()" -> "apply()",
"create(Long)" -> "apply(Long)",
Expand Down
17 changes: 17 additions & 0 deletions src/test/scala/rx/lang/scala/SubscriberTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.lang.scala

import rx.lang.scala.observers.TestSubscriber

import scala.collection.mutable

import org.junit.Test
Expand Down Expand Up @@ -182,4 +184,19 @@ class SubscriberTests extends JUnitSuite {
val zeros = new Array[Int](10).toList
assertEquals(zeros, l)
}

@Test def testIssue202() {
// https://github.com/ReactiveX/RxScala/issues/202
val subject = Subject[Option[Unit]]()
val testSubscriber = TestSubscriber[Option[Unit]]()
subject.filter(_.isDefined).subscribe(testSubscriber)
testSubscriber.assertNoValues()
subject.onNext(None)
testSubscriber.assertNoValues()
subject.onNext(Some(()))
testSubscriber.assertValuesAndClear(Some(()))
testSubscriber.assertNoValues()
subject.onNext(None)
testSubscriber.assertNoValues()
}
}