Skip to content

Commit 0e16542

Browse files
authored
Merge pull request #197 from zsxwing/rxjava-1.1.6
Bump to RxJava 1.1.6
2 parents bee74d0 + 0b8e56d commit 0e16542

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
1919
parallelExecution in Test := false
2020

2121
libraryDependencies ++= Seq(
22-
"io.reactivex" % "rxjava" % "1.1.5",
22+
"io.reactivex" % "rxjava" % "1.1.6",
2323
"org.mockito" % "mockito-core" % "1.9.5" % "test",
2424
"junit" % "junit" % "4.11" % "test",
2525
"org.scalatest" %% "scalatest" % "2.2.6" % "test")

examples/src/test/scala/examples/RxScalaDemo.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1896,4 +1896,23 @@ class RxScalaDemo extends JUnitSuite {
18961896
println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
18971897
}
18981898
}
1899+
1900+
@Test def rebatchRequestsExample(): Unit = {
1901+
val o = (1 to 100).toObservable
1902+
o.doOnRequest(r => println(s"Requesting $r via rebatchRequests"))
1903+
.rebatchRequests(10)
1904+
.doOnRequest(r => println(s"Requesting $r via Subscriber"))
1905+
.subscribe(new Subscriber[Int]() {
1906+
override def onStart(): Unit = request(1)
1907+
1908+
override def onNext(value: Int): Unit = {
1909+
println(s"Receive $value")
1910+
request(1)
1911+
}
1912+
1913+
override def onError(error: Throwable): Unit = error.printStackTrace()
1914+
1915+
override def onCompleted(): Unit = println("Done")
1916+
})
1917+
}
18991918
}

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,26 @@ trait Observable[+T]
13611361
toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler))
13621362
}
13631363

1364+
/**
1365+
* $experimental Returns an [[Observable]] that requests `n` initially from the upstream and then 75% of `n` subsequently after 75% of `n` values have
1366+
* been emitted to the downstream.
1367+
*
1368+
* This operator allows preventing the downstream to trigger unbounded mode via `request(Long.MaxValue)` or compensate for the per-item
1369+
* overhead of small and frequent requests.
1370+
*
1371+
* ===Backpressure:===
1372+
* The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
1373+
*
1374+
* $noDefaultScheduler
1375+
*
1376+
* @param n the initial request amount, further request will happen after 75% of this value
1377+
* @return the [[Observable]] that rebatches request amounts from downstream
1378+
*/
1379+
@Experimental
1380+
def rebatchRequests(n: Int): Observable[T] = {
1381+
toScalaObservable[T](asJavaObservable.rebatchRequests(n))
1382+
}
1383+
13641384
/**
13651385
* Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]].
13661386
*

0 commit comments

Comments
 (0)