Skip to content

Commit 3c3f3ff

Browse files
authored
Merge pull request #204 from zsxwing/rxjava-1.1.9
Upgrade to RxJava 1.1.9 and add missing operators
2 parents c17b434 + bff43b0 commit 3c3f3ff

File tree

10 files changed

+108
-14
lines changed

10 files changed

+108
-14
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ matrix:
66
- jdk: oraclejdk7
77
scala: 2.11.8
88
- jdk: oraclejdk8
9-
scala: 2.12.0-M4
9+
scala: 2.12.0-M5
1010
script:
1111
- sbt ++$TRAVIS_SCALA_VERSION test examples/test doc
1212
after_success:

build.sbt

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@ scalacOptions in ThisBuild := Seq("-feature", "-unchecked", "-deprecation", "-en
1414

1515
scalaVersion in ThisBuild := "2.11.8"
1616

17-
crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
17+
crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M5")
1818

1919
parallelExecution in Test := false
2020

21-
libraryDependencies ++= Seq(
22-
"io.reactivex" % "rxjava" % "1.1.6",
23-
"org.mockito" % "mockito-core" % "1.9.5" % "test",
24-
"junit" % "junit" % "4.11" % "test",
25-
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
21+
libraryDependencies ++= {
22+
// Scalatest 3.0.0 supports 2.12.0-M5 but it drops Scala 2.10 support. So just use 2.2.6 for Scala 2.10.
23+
val scalatestVersion = if (scalaVersion.value.startsWith("2.10")) "2.2.6" else "3.0.0"
24+
Seq(
25+
"io.reactivex" % "rxjava" % "1.1.9",
26+
"org.mockito" % "mockito-core" % "1.9.5" % "test",
27+
"junit" % "junit" % "4.11" % "test",
28+
"org.scalatest" %% "scalatest" % scalatestVersion % "test")
29+
}
2630

2731
// Set up the doc mappings
2832
// See http://stackoverflow.com/questions/16934488/how-to-link-classes-from-jdk-into-scaladoc-generated-doc

examples/src/test/scala/examples/TestSchedulerExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.mockito.Mockito._
2121
import org.scalatest.junit.JUnitSuite
2222
import rx.lang.scala._
2323
import rx.lang.scala.schedulers.TestScheduler
24-
import rx.observers.TestObserver
24+
import rx.observers.TestSubscriber
2525
import rx.lang.scala.JavaConversions._
2626

2727
class TestSchedulerExample extends JUnitSuite {
@@ -34,7 +34,7 @@ class TestSchedulerExample extends JUnitSuite {
3434
val o = Observable.interval(1 second, scheduler)
3535

3636
// Wrap Java Observer in Scala Observer, then subscribe
37-
val sub = o.subscribe(toScalaObserver(new TestObserver(observer)))
37+
val sub = o.subscribe(toScalaSubscriber(new TestSubscriber(observer)))
3838

3939
verify(observer, never).onNext(0L)
4040
verify(observer, never).onCompleted()

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3433,6 +3433,27 @@ trait Observable[+T]
34333433
toScalaObservable[T](asJavaObservable.distinctUntilChanged[U](keySelector))
34343434
}
34353435

3436+
/**
3437+
* $experimental Returns an [[Observable]] that emits all items emitted by the source [[Observable]] that are distinct from their
3438+
* immediate predecessors when compared with each other via the provided comparator function.
3439+
*
3440+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="" />
3441+
*
3442+
* ===Backpressure:===
3443+
* The operator doesn't interfere with backpressure which is determined by the source [[Observable]]'s backpressure behavior.
3444+
*
3445+
* $noDefaultScheduler
3446+
*
3447+
* @param comparator the function that receives the previous item and the current item and is
3448+
* expected to return true if the two are equal, thus skipping the current value.
3449+
* @return an [[Observable]] that emits those items from the source Observable that are distinct from their immediate predecessors
3450+
* @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
3451+
*/
3452+
@Experimental
3453+
def distinctUntilChanged[U](comparator: (T, T) => Boolean): Observable[T] = {
3454+
toScalaObservable[T](asJavaObservable.distinctUntilChanged(comparator))
3455+
}
3456+
34363457
/**
34373458
* Returns an Observable that forwards all distinct items emitted from the source Observable.
34383459
*

src/main/scala/rx/lang/scala/observers/TestSubscriber.scala

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
package rx.lang.scala.observers
1717

1818
import java.util.concurrent.TimeUnit
19-
import scala.collection.JavaConversions._
19+
import scala.collection.JavaConverters._
2020
import scala.concurrent.duration.Duration
2121
import rx.{Subscriber => JSubscriber, Observer => JObserver, Subscription => JSubscription}
2222
import rx.annotations.Experimental
@@ -48,7 +48,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
4848
* @return a sequence of the `Throwable`s that were passed to the [[Subscriber.onError]] method
4949
*/
5050
def getOnErrorEvents: Seq[Throwable] = {
51-
jTestSubscriber.getOnErrorEvents()
51+
jTestSubscriber.getOnErrorEvents().asScala
5252
}
5353

5454
/**
@@ -57,7 +57,7 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
5757
* @return a sequence of items observed by this [[Subscriber]], in the order in which they were observed
5858
*/
5959
def getOnNextEvents: Seq[T] = {
60-
jTestSubscriber.getOnNextEvents()
60+
jTestSubscriber.getOnNextEvents().asScala
6161
}
6262

6363
/**
@@ -259,6 +259,40 @@ class TestSubscriber[T] private[scala](jTestSubscriber: JTestSubscriber[T]) exte
259259
def assertValue(value: T): Unit = {
260260
jTestSubscriber.assertValue(value)
261261
}
262+
263+
/**
264+
* $experimental Assert that the [[TestSubscriber]] contains the given first and optional rest values exactly
265+
* and if so, clears the internal list of values.
266+
* {{{
267+
* val ts = TestSubscriber()
268+
*
269+
* ts.onNext(1)
270+
*
271+
* ts.assertValuesAndClear(1)
272+
*
273+
* ts.onNext(2)
274+
* ts.onNext(3)
275+
*
276+
* ts.assertValuesAndClear(2, 3) // no mention of 1
277+
* }}}
278+
*
279+
* @param expectedFirstValue the expected first value
280+
* @param expectedRestValues the optional rest values
281+
*/
282+
@Experimental
283+
def assertValuesAndClear(expectedFirstValue: T, expectedRestValues: T*): Unit = {
284+
jTestSubscriber.assertValuesAndClear(expectedFirstValue, expectedRestValues: _*)
285+
}
286+
287+
/**
288+
* $experimental Returns the number of times onCompleted was called on this [[TestSubscriber]].
289+
*
290+
* @return the number of times onCompleted was called on this [[TestSubscriber]].
291+
*/
292+
@Experimental
293+
def completions: Int = {
294+
jTestSubscriber.getCompletions
295+
}
262296
}
263297

264298
/**

src/test/scala-2.11/rx/lang/scala/completeness/CompletenessKit.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ trait CompletenessKit extends JUnitSuite {
106106
.filter(! _.contains("access$000"))
107107
// Ignore constructors
108108
.filter(! _.startsWith("<init>"))
109+
// TODO Skips withLatestFrom right now due to https://github.com/ReactiveX/RxJava/pull/4447
110+
.filter(! _.startsWith("withLatestFrom(Observable[T1]"))
109111
}
110112

111113
/**

src/test/scala-2.11/rx/lang/scala/completeness/ObservableCompletenessKit.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class ObservableCompletenessKit extends CompletenessKit {
105105
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
106106
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
107107
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",
108+
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R], Func1[Action1[K], Map[K, Object]])" -> "[TODO]",
108109
"mergeWith(Observable[_ <: T])" -> "merge(Observable[U])",
109110
"ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])`]",
110111
"onBackpressureBuffer(Long, Action0)" -> "onBackpressureBuffer(Long, => Unit)",
@@ -179,6 +180,8 @@ class ObservableCompletenessKit extends CompletenessKit {
179180
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
180181
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
181182
"toSortedList(Func2[_ >: T, _ >: T, Integer], Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
183+
"sorted()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
184+
"sorted(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
182185
"window(Int)" -> "tumbling(Int)",
183186
"window(Int, Int)" -> "sliding(Int, Int)",
184187
"window(Long, TimeUnit)" -> "tumbling(Duration)",
@@ -202,6 +205,7 @@ class ObservableCompletenessKit extends CompletenessKit {
202205
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
203206
"combineLatestDelayError(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Iterable[Observable[T]])(Seq[T] => R)",
204207
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
208+
"concat(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.concat`]",
205209
"concatDelayError(Observable[_ <: Observable[_ <: T]])" -> "[use `delayError.concat`]",
206210
"concatDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.delayError.concat`]",
207211
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
@@ -215,6 +219,7 @@ class ObservableCompletenessKit extends CompletenessKit {
215219
"from(Future[_ <: T])" -> fromFuture,
216220
"from(Future[_ <: T], Long, TimeUnit)" -> fromFuture,
217221
"from(Future[_ <: T], Scheduler)" -> fromFuture,
222+
"fromAsync(Action1[AsyncEmitter[T]], BackpressureMode)" -> "[TODO]",
218223
"just(T)" -> "just(T*)",
219224
"merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])",
220225
"merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])",
@@ -237,8 +242,11 @@ class ObservableCompletenessKit extends CompletenessKit {
237242
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource])" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
238243
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)" -> "using(=> Resource)(Resource => Observable[T], Resource => Unit, Boolean)",
239244
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])" -> "withLatestFrom(Observable[U])((T, U) => R)",
245+
"withLatestFrom(Array[Observable[_]], FuncN[R])" -> "[TODO]",
246+
"withLatestFrom(Iterable[Observable[_]], FuncN[R])" -> "[TODO]",
240247
"zip(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "[use instance method `zip` and `map`]",
241248
"zip(Observable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
249+
"zip(Array[Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
242250
"zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]",
243251
"zipWith(Observable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Observable[U])((T, U) => R)",
244252
"zipWith(Iterable[_ <: T2], Func2[_ >: T, _ >: T2, _ <: R])" -> "zipWith(Iterable[U])((T, U) => R)"
@@ -273,5 +281,8 @@ class ObservableCompletenessKit extends CompletenessKit {
273281
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
274282
// concatEager 2-9
275283
"concatEager(" + _ + ")" -> "[unnecessary because we can use `concatEager` instead or `Observable(o1, o2, ...).concatEager`]"
284+
).drop(1).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map(
285+
// concatDelayError 2-9
286+
"concatDelayError(" + _ + ")" -> "[unnecessary because we can use `Observable(o1, o2, ...).delayError.concat`]"
276287
).drop(1).toMap
277288
}

src/test/scala-2.11/rx/lang/scala/completeness/TestSchedulerCompletenessKit.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,8 @@ class TestSchedulerCompletenessKit extends CompletenessKit {
2323
override val rxScalaType = typeOf[rx.lang.scala.schedulers.TestScheduler]
2424

2525
// There are two methods without "()", so using `isOmittingParenthesesForArity0Method = true` by default is simpler
26-
override protected def correspondenceChanges: Map[String, String] = Map("triggerActions()" -> "triggerActions()")
26+
override protected def correspondenceChanges: Map[String, String] = Map(
27+
"triggerActions()" -> "triggerActions()",
28+
"when(Func1[Observable[Observable[Completable]], Completable])" -> "[TODO]"
29+
)
2730
}

src/test/scala-2.11/rx/lang/scala/completeness/TestSubscriberCompletenessKit.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ class TestSubscriberCompletenessKit extends CompletenessKit {
2828
"assertError(Class[_ <: Throwable])" -> "assertError(Class[_ <: Throwable])",
2929
"assertReceivedOnNext(List[T])" -> "assertValues(T*)",
3030
"getLastSeenThread()" -> "getLastSeenThread",
31-
"getOnCompletedEvents()" -> "assertCompleted()",
3231
"getOnErrorEvents()" -> "getOnErrorEvents",
3332
"getOnNextEvents()" -> "getOnNextEvents",
3433
"isUnsubscribed()" -> "isUnsubscribed",
34+
"getCompletions()" -> "completions",
35+
"getValueCount()" -> "[TODO]",
36+
"awaitValueCount(Int, Long, TimeUnit)" -> "[TODO]",
3537

3638
"create()" -> "apply()",
3739
"create(Long)" -> "apply(Long)",

src/test/scala/rx/lang/scala/SubscriberTests.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import rx.lang.scala.observers.TestSubscriber
19+
1820
import scala.collection.mutable
1921

2022
import org.junit.Test
@@ -182,4 +184,19 @@ class SubscriberTests extends JUnitSuite {
182184
val zeros = new Array[Int](10).toList
183185
assertEquals(zeros, l)
184186
}
187+
188+
@Test def testIssue202() {
189+
// https://github.com/ReactiveX/RxScala/issues/202
190+
val subject = Subject[Option[Unit]]()
191+
val testSubscriber = TestSubscriber[Option[Unit]]()
192+
subject.filter(_.isDefined).subscribe(testSubscriber)
193+
testSubscriber.assertNoValues()
194+
subject.onNext(None)
195+
testSubscriber.assertNoValues()
196+
subject.onNext(Some(()))
197+
testSubscriber.assertValuesAndClear(Some(()))
198+
testSubscriber.assertNoValues()
199+
subject.onNext(None)
200+
testSubscriber.assertNoValues()
201+
}
185202
}

0 commit comments

Comments
 (0)