Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ matrix:
- jdk: oraclejdk7
scala: 2.11.8
- jdk: oraclejdk8
scala: 2.12.0-M4
scala: 2.12.0-M5
script:
- sbt ++$TRAVIS_SCALA_VERSION test examples/test doc
after_success:
Expand Down
16 changes: 10 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ scalacOptions in ThisBuild := Seq("-feature", "-unchecked", "-deprecation", "-en

scalaVersion in ThisBuild := "2.11.8"

crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M5")

parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.6",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
libraryDependencies ++= {
// Scalatest 3.0.0 supports 2.12.0-M5 but it drops Scala 2.10 support. So just use 2.2.6 for Scala 2.10.
val scalatestVersion = if (scalaVersion.value.startsWith("2.10")) "2.2.6" else "3.0.0"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice 😃

Seq(
"io.reactivex" % "rxjava" % "1.1.9",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % scalatestVersion % "test")
}

// Set up the doc mappings
// See http://stackoverflow.com/questions/16934488/how-to-link-classes-from-jdk-into-scaladoc-generated-doc
Expand Down
4 changes: 2 additions & 2 deletions examples/src/test/scala/examples/TestSchedulerExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.mockito.Mockito._
import org.scalatest.junit.JUnitSuite
import rx.lang.scala._
import rx.lang.scala.schedulers.TestScheduler
import rx.observers.TestObserver
import rx.observers.TestSubscriber
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestObserver is deprecated.

import rx.lang.scala.JavaConversions._

class TestSchedulerExample extends JUnitSuite {
Expand All @@ -34,7 +34,7 @@ class TestSchedulerExample extends JUnitSuite {
val o = Observable.interval(1 second, scheduler)

// Wrap Java Observer in Scala Observer, then subscribe
val sub = o.subscribe(toScalaObserver(new TestObserver(observer)))
val sub = o.subscribe(toScalaSubscriber(new TestSubscriber(observer)))

verify(observer, never).onNext(0L)
verify(observer, never).onCompleted()
Expand Down
21 changes: 21 additions & 0 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3433,6 +3433,27 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector))
}

/**
* $experimental Returns an [[Observable]] that emits all items emitted by the source [[Observable]] that are distinct from their
* immediate predecessors when compared with each other via the provided comparator function.
*
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="" />
*
* ===Backpressure:===
* The operator doesn't interfere with backpressure which is determined by the source [[Observable]]'s backpressure behavior.
*
* $noDefaultScheduler
*
* @param comparator the function that receives the previous item and the current item and is
* expected to return true if the two are equal, thus skipping the current value.
* @return an [[Observable]] that emits those items from the source Observable that are distinct from their immediate predecessors
* @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
*/
@Experimental
def distinctUntilChanged[U](comparator: (T, T) => Boolean): Observable[T] = {
toScalaObservable[T](asJavaObservable.distinctUntilChanged(comparator))
}

/**
* Returns an Observable that forwards all distinct items emitted from the source Observable.
*
Expand Down
40 changes: 37 additions & 3 deletions src/main/scala/rx/lang/scala/observers/TestSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.lang.scala.observers

import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConversions is deprecated in Scala 2.12.0-M5

import scala.concurrent.duration.Duration
import rx.{Subscriber => JSubscriber, Observer => JObserver, Subscription => JSubscription}
import rx.annotations.Experimental
Expand Down Expand Up @@ -48,7 +48,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of the `Throwable`s that were passed to the [[Subscriber.onError]] method
*/
def getOnErrorEvents: Seq[Throwable] = {
jTestSubscriber.getOnErrorEvents()
jTestSubscriber.getOnErrorEvents().asScala
}

/**
Expand All @@ -57,7 +57,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
* @return a sequence of items observed by this [[Subscriber]], in the order in which they were observed
*/
def getOnNextEvents: Seq[T] = {
jTestSubscriber.getOnNextEvents()
jTestSubscriber.getOnNextEvents().asScala
}

/**
Expand Down Expand Up @@ -259,6 +259,40 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
def assertValue(value: T): Unit = {
jTestSubscriber.assertValue(value)
}

/**
* $experimental Assert that the [[TestSubscriber]] contains the given first and optional rest values exactly
* and if so, clears the internal list of values.
* {{{
* val ts = TestSubscriber()
*
* ts.onNext(1)
*
* ts.assertValuesAndClear(1)
*
* ts.onNext(2)
* ts.onNext(3)
*
* ts.assertValuesAndClear(2, 3) // no mention of 1
* }}}
*
* @param expectedFirstValue the expected first value
* @param expectedRestValues the optional rest values
*/
@Experimental
def assertValuesAndClear(expectedFirstValue: T, expectedRestValues: T*): Unit = {
jTestSubscriber.assertValuesAndClear(expectedFirstValue, expectedRestValues: _*)
}

/**
* $experimental Returns the number of times onCompleted was called on this [[TestSubscriber]].
*
* @return the number of times onCompleted was called on this [[TestSubscriber]].
*/
@Experimental
def getCompletions: Int = {
jTestSubscriber.getCompletions
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getXxx method names are not so Scala-typical... Why not just completions?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ trait CompletenessKit extends JUnitSuite {
.filter(! _.contains("access$000"))
// Ignore constructors
.filter(! _.startsWith("<init>"))
// TODO Skips withLatestFrom right now due to https://github.com/ReactiveX/RxJava/pull/4447
.filter(! _.startsWith("withLatestFrom(Observable[T1]"))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
Expand Down Expand Up @@ -179,6 +180,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer], Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"sorted()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"sorted(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
"window(Int)" -> "tumbling(Int)",
"window(Int, Int)" -> "sliding(Int, Int)",
"window(Long, TimeUnit)" -> "tumbling(Duration)",
Expand All @@ -202,6 +205,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"concat(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concat`]",
"concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]",
"concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
Expand All @@ -215,6 +219,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"from(Future[_ <: T])" -> fromFuture,
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
"from(Future[_ <: T], Scheduler)" -> fromFuture,
"fromAsync(Action1[AsyncEmitter[T]], BackpressureMode)" -> "[TODO]",
"just(T)" -> "just(T*)",
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
Expand All @@ -237,8 +242,11 @@ class ObservableCompletenessKit extends CompletenessKit {
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)",
"withLatestFrom(Array[Observable[_]], FuncN[R])" -> "[TODO]",
"withLatestFrom(Iterable[Observable[_]], FuncN[R])" -> "[TODO]",
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zip(Array[Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
"zipWith(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U])((T, U) => R)",
"zipWith(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U])((T, U) => R)"
Expand Down Expand Up @@ -273,5 +281,8 @@ class ObservableCompletenessKit extends CompletenessKit {
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// concatEager 2-9
"concatEager(" + _ + ")" -> "[unnecessary because we can use `concatEager` instead or `Observable(o1, o2, ...).concatEager`]"
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
// concatDelayError 2-9
"concatDelayError(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).delayError.concat`]"
).drop(1).toMap
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ class TestSchedulerCompletenessKit extends CompletenessKit {
override val rxScalaType = typeOf[rx.lang.scala.schedulers.TestScheduler]

// There are two methods without "()", so using `isOmittingParenthesesForArity0Method = true` by default is simpler
override protected def correspondenceChanges: Map[String, String] = Map("triggerActions()" -> "triggerActions()")
override protected def correspondenceChanges: Map[String, String] = Map(
"triggerActions()" -> "triggerActions()",
"when(Func1[Observable[Observable[Completable]], Completable])" -> "[TODO]"
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ class TestSubscriberCompletenessKit extends CompletenessKit {
"assertError(Class[_ <: Throwable])" -> "assertError(Class[_ <: Throwable])",
"assertReceivedOnNext(List[T])" -> "assertValues(T*)",
"getLastSeenThread()" -> "getLastSeenThread",
"getOnCompletedEvents()" -> "assertCompleted()",
"getOnErrorEvents()" -> "getOnErrorEvents",
"getOnNextEvents()" -> "getOnNextEvents",
"isUnsubscribed()" -> "isUnsubscribed",
"getCompletions()" -> "getCompletions",
"getValueCount()" -> "[TODO]",
"awaitValueCount(Int, Long, TimeUnit)" -> "[TODO]",

"create()" -> "apply()",
"create(Long)" -> "apply(Long)",
Expand Down
17 changes: 17 additions & 0 deletions src/test/scala/rx/lang/scala/SubscriberTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.lang.scala

import rx.lang.scala.observers.TestSubscriber

import scala.collection.mutable

import org.junit.Test
Expand Down Expand Up @@ -182,4 +184,19 @@ class SubscriberTests extends JUnitSuite {
val zeros = new Array[Int](10).toList
assertEquals(zeros, l)
}

@Test def testIssue202() {
// https://github.com/ReactiveX/RxScala/issues/202
val subject = Subject[Option[Unit]]()
val testSubscriber = TestSubscriber[Option[Unit]]()
subject.filter(_.isDefined).subscribe(testSubscriber)
testSubscriber.assertNoValues()
subject.onNext(None)
testSubscriber.assertNoValues()
subject.onNext(Some(()))
testSubscriber.assertValuesAndClear(Some(()))
testSubscriber.assertNoValues()
subject.onNext(None)
testSubscriber.assertNoValues()
}
}