Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.6", "2.11.8", "2.12.0-M4")
parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.5",
"io.reactivex" % "rxjava" % "1.1.6",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.6" % "test")
Expand Down
19 changes: 19 additions & 0 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1896,4 +1896,23 @@ class RxScalaDemo extends JUnitSuite {
println(s"onTerminateDetach: isUnsubscribed=${s.isUnsubscribed}, weakReference=${weakReference.get}")
}
}

@Test def rebatchRequestsExample(): Unit = {
val o = (1 to 100).toObservable
o.doOnRequest(r => println(s"Requesting $r via rebatchRequests"))
.rebatchRequests(10)
.doOnRequest(r => println(s"Requesting $r via Subscriber"))
.subscribe(new Subscriber[Int]() {
override def onStart(): Unit = request(1)

override def onNext(value: Int): Unit = {
println(s"Receive $value")
request(1)
}

override def onError(error: Throwable): Unit = error.printStackTrace()

override def onCompleted(): Unit = println("Done")
})
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice example 👍

}
20 changes: 20 additions & 0 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,26 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.unsubscribeOn(scheduler))
}

/**
* $experimental Returns an [[Observable]] that requests `n` initially from the upstream and then 75% of `n` subsequently after 75% of `n` values have
* been emitted to the downstream.
*
* This operator allows preventing the downstream to trigger unbounded mode via `request(Long.MaxValue)` or compensate for the per-item
* overhead of small and frequent requests.
*
* ===Backpressure:===
* The operator expects backpressure from upstream and honors backpressure from downstream.</dd>
*
* $noDefaultScheduler
*
* @param n the initial request amount, further request will happen after 75% of this value
* @return the [[Observable]] that rebatches request amounts from downstream
*/
@Experimental
def rebatchRequests(n: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.rebatchRequests(n))
}

/**
* Asynchronously notify [[rx.lang.scala.Observer]]s on the specified [[rx.lang.scala.Scheduler]].
*
Expand Down