Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ Since RxScala is part of the RxJava family the communication channels are simila

| RxScala version | Compatible RxJava version |
| ------------------- | ------------------------- |
| 0.24.* | 1.0.* |
| 0.23.*<sup>[1]</sup> | 1.0.* |
| 0.22.0 | 1.0.0-rc.5 |
| 0.21.1 | 1.0.0-rc.3 |
Expand All @@ -89,12 +90,13 @@ Since RxScala is part of the RxJava family the communication channels are simila
[1] You can use any release of RxScala 0.23 with any release of RxJava 1.0. E.g, use RxScala 0.23.0 with RxJava 1.0.1 <br/>
[2] You should use the same version of RxScala with RxJava. E.g, use RxScala 0.20.1 with RxJava 0.20.1

From 0.23.2, RxScala adds ExperimentalAPIs, which contains APIs using RxJava Beta/Experimental APIs. As these APIs are unstable in RxJava,
From 0.24.0, RxScala adds ExperimentalAPIs, which contains APIs using RxJava Beta/Experimental APIs. As these APIs are unstable in RxJava,
if you `import ExperimentalAPIs`, you should use the corresponding version of RxJava as the following table:

| RxScala version | Compatible RxJava version |
| ------------------- | ------------------------- |
| 0.23.2 | 1.0.7 |
| 0.24.1 | 1.0.8+ |
| 0.24.0 | 1.0.7+ |

## Full Documentation

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ scalaVersion in ThisBuild := "2.11.2"
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.0.7",
"io.reactivex" % "rxjava" % "1.0.8",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,16 @@ object ExperimentalAPIExamples {
10)
.toBlocking.foreach(v => System.out.println("Received: " + v))
}

@Test def onBackpressureDropDoExample(): Unit = {
Observable[Int](subscriber => {
(1 to 200).foreach(subscriber.onNext)
}).onBackpressureDropDo {
t => println(s"Dropping $t")
}.observeOn(IOScheduler()).subscribe {
v =>
Thread.sleep(10) // A slow consumer
println(s"process $v")
}
}
}
21 changes: 21 additions & 0 deletions src/main/scala/rx/lang/scala/ExperimentalAPIs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,27 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
}
toScalaObservable[R](o.asJavaObservable.flatMap[R](jOnNext, jOnError, jOnCompleted, maxConcurrent))
}

/**
* Instructs an [[Observable]] that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
*
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
*
* If the downstream request count hits `0` then the [[Observable]] will refrain from calling `onNext` until
* the observer invokes `request(n)` again to increase the request count.
*
* $noDefaultScheduler
*
* @param onDrop the action to invoke for each item dropped. `onDrop` action should be fast and should never block.
* @return an new [[Observable]] that will drop `onNext` notifications on overflow
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
*/
def onBackpressureDropDo(onDrop: T => Unit): Observable[T] = {
toScalaObservable[T](o.asJavaObservable.onBackpressureDrop(new Action1[T] {
override def call(t: T) = onDrop(t)
}))
}
}

object ExperimentalAPIs {
Expand Down
1 change: 1 addition & 0 deletions src/test/scala/rx/lang/scala/CompletenessTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class CompletenessTest extends JUnitSuite {
"onBackpressureBlock(Int)",
"onBackpressureBuffer(Long)",
"onBackpressureBuffer(Long, Action0)",
"onBackpressureDrop(Action1[_ >: T])",
"takeUntil(Func1[_ >: T, Boolean])",
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)",
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])",
Expand Down