Skip to content

Commit 7625a29

Browse files
committed
Merge pull request #151 from zsxwing/rxjava-1.0.8
Update to Rxjava 1.0.8 & add onBackpressureDropDo to ExperimentalAPIs
2 parents 743b293 + 4dd7329 commit 7625a29

File tree

5 files changed

+39
-3
lines changed

5 files changed

+39
-3
lines changed

README.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ Since RxScala is part of the RxJava family the communication channels are simila
8181

8282
| RxScala version | Compatible RxJava version |
8383
| ------------------- | ------------------------- |
84+
| 0.24.* | 1.0.* |
8485
| 0.23.*<sup>[1]</sup> | 1.0.* |
8586
| 0.22.0 | 1.0.0-rc.5 |
8687
| 0.21.1 | 1.0.0-rc.3 |
@@ -89,12 +90,13 @@ Since RxScala is part of the RxJava family the communication channels are simila
8990
[1] You can use any release of RxScala 0.23 with any release of RxJava 1.0. E.g, use RxScala 0.23.0 with RxJava 1.0.1 <br/>
9091
[2] You should use the same version of RxScala with RxJava. E.g, use RxScala 0.20.1 with RxJava 0.20.1
9192

92-
From 0.23.2, RxScala adds ExperimentalAPIs, which contains APIs using RxJava Beta/Experimental APIs. As these APIs are unstable in RxJava,
93+
From 0.24.0, RxScala adds ExperimentalAPIs, which contains APIs using RxJava Beta/Experimental APIs. As these APIs are unstable in RxJava,
9394
if you `import ExperimentalAPIs`, you should use the corresponding version of RxJava as the following table:
9495

9596
| RxScala version | Compatible RxJava version |
9697
| ------------------- | ------------------------- |
97-
| 0.23.2 | 1.0.7 |
98+
| 0.24.1 | 1.0.8+ |
99+
| 0.24.0 | 1.0.7+ |
98100

99101
## Full Documentation
100102

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ scalaVersion in ThisBuild := "2.11.2"
1717
crossScalaVersions in ThisBuild := Seq("2.10.4", "2.11.2")
1818

1919
libraryDependencies ++= Seq(
20-
"io.reactivex" % "rxjava" % "1.0.7",
20+
"io.reactivex" % "rxjava" % "1.0.8",
2121
"org.mockito" % "mockito-core" % "1.9.5" % "test",
2222
"junit" % "junit" % "4.11" % "test",
2323
"org.scalatest" %% "scalatest" % "2.2.2" % "test")

examples/src/test/scala/rx/lang/scala/examples/ExperimentalAPIExamples.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,16 @@ object ExperimentalAPIExamples {
143143
10)
144144
.toBlocking.foreach(v => System.out.println("Received: " + v))
145145
}
146+
147+
@Test def onBackpressureDropDoExample(): Unit = {
148+
Observable[Int](subscriber => {
149+
(1 to 200).foreach(subscriber.onNext)
150+
}).onBackpressureDropDo {
151+
t => println(s"Dropping $t")
152+
}.observeOn(IOScheduler()).subscribe {
153+
v =>
154+
Thread.sleep(10) // A slow consumer
155+
println(s"process $v")
156+
}
157+
}
146158
}

src/main/scala/rx/lang/scala/ExperimentalAPIs.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,27 @@ class ExperimentalObservable[+T](private val o: Observable[T]) {
241241
}
242242
toScalaObservable[R](o.asJavaObservable.flatMap[R](jOnNext, jOnError, jOnCompleted, maxConcurrent))
243243
}
244+
245+
/**
246+
* Instructs an [[Observable]] that is emitting items faster than its observer can consume them to discard,
247+
* rather than emit, those items that its observer is not prepared to observe.
248+
*
249+
* <img width="640" height="245" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.drop.png" alt="">
250+
*
251+
* If the downstream request count hits `0` then the [[Observable]] will refrain from calling `onNext` until
252+
* the observer invokes `request(n)` again to increase the request count.
253+
*
254+
* $noDefaultScheduler
255+
*
256+
* @param onDrop the action to invoke for each item dropped. `onDrop` action should be fast and should never block.
257+
* @return an new [[Observable]] that will drop `onNext` notifications on overflow
258+
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
259+
*/
260+
def onBackpressureDropDo(onDrop: T => Unit): Observable[T] = {
261+
toScalaObservable[T](o.asJavaObservable.onBackpressureDrop(new Action1[T] {
262+
override def call(t: T) = onDrop(t)
263+
}))
264+
}
244265
}
245266

246267
object ExperimentalAPIs {

src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class CompletenessTest extends JUnitSuite {
6161
"onBackpressureBlock(Int)",
6262
"onBackpressureBuffer(Long)",
6363
"onBackpressureBuffer(Long, Action0)",
64+
"onBackpressureDrop(Action1[_ >: T])",
6465
"takeUntil(Func1[_ >: T, Boolean])",
6566
"using(Func0[Resource], Func1[_ >: Resource, _ <: Observable[_ <: T]], Action1[_ >: Resource], Boolean)",
6667
"withLatestFrom(Observable[_ <: U], Func2[_ >: T, _ >: U, _ <: R])",

0 commit comments

Comments
 (0)