Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ crossScalaVersions in ThisBuild := Seq("2.10.5", "2.11.6")
parallelExecution in Test := false

libraryDependencies ++= Seq(
"io.reactivex" % "rxjava" % "1.1.0",
"io.reactivex" % "rxjava" % "1.1.1",
"org.mockito" % "mockito-core" % "1.9.5" % "test",
"junit" % "junit" % "4.11" % "test",
"org.scalatest" %% "scalatest" % "2.2.2" % "test")
Expand Down
24 changes: 22 additions & 2 deletions examples/src/test/scala/examples/RxScalaDemo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -877,8 +877,8 @@ class RxScalaDemo extends JUnitSuite {
// unsubscribed
}

@Test def finallyDoExample(): Unit = {
val o = List("red", "green", "blue").toObservable.finallyDo { println("finally") }
@Test def doAfterTerminateExample(): Unit = {
val o = List("red", "green", "blue").toObservable.doAfterTerminate { println("finally") }
o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted"))
// red
// green
Expand Down Expand Up @@ -1758,4 +1758,24 @@ class RxScalaDemo extends JUnitSuite {
override def onCompleted(): Unit = println("onCompleted")
})
}

def autoConnectExample(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect
println("1st Observer is subscribing")
o.subscribe(println(_))
}

def autoConnectExample2(): Unit = {
val o = Observable.just(1, 2, 3).doOnSubscribe {
println("Start to emit items")
}.publish.autoConnect(3)
println("1st Observer is subscribing")
o.subscribe(i => println(s"s1: $i"))
println("2nd Observer is subscribing")
o.subscribe(i => println(s"s2: $i"))
println("3rd Observer is subscribing")
o.subscribe(i => println(s"s3: $i"))
}
}
81 changes: 78 additions & 3 deletions src/main/scala/rx/lang/scala/Observable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,24 @@ trait Observable[+T]
* an function to be invoked when the source Observable finishes
* @return an Observable that emits the same items as the source Observable, then invokes the function
*/
@deprecated("Use [[Observable.doAfterTerminate]] instead", "0.26.1")
def finallyDo(action: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.finallyDo(() => action))
toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
}

/**
* Registers an function to be called when this [[Observable]] invokes either [[Observer.onCompleted onCompleted]] or [[Observer.onError onError]].
*
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/finallyDo.png" alt="">
*
* $noDefaultScheduler
*
* @param action an function to be invoked when the source [[Observable]] finishes
* @return an [[Observable]] that emits the same items as the source [[Observable]], then invokes the `action`
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
def doAfterTerminate(action: => Unit): Observable[T] = {
toScalaObservable[T](asJavaObservable.doAfterTerminate(() => action))
}

/**
Expand Down Expand Up @@ -1326,6 +1342,27 @@ trait Observable[+T]
toScalaObservable[T](asJavaObservable.observeOn(scheduler))
}

/**
* Return an [[Observable]] to perform its emissions and notifications on a specified [[Scheduler]],
* asynchronously with a bounded buffer and optionally delays [[Observer.onError onError]] notifications.
*
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
*
* ===Scheduler:===
* you specify which [[Scheduler]] this operator will use
*
* @param scheduler the [[Scheduler]] to notify [[Observer]]s on
* @param delayError indicates if the [[Observer.onError onError]] notification may not cut ahead of onNext notification on the
* other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order
* as was received from upstream
* @return the source [[Observable]] that its [[Observer]]s are notified on the specified [[Scheduler]]
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
*/
def observeOn(scheduler: Scheduler, delayError: Boolean): Observable[T] = {
toScalaObservable[T](asJavaObservable.observeOn(scheduler, delayError))
}

/**
* Returns an Observable that reverses the effect of [[rx.lang.scala.Observable.materialize]] by
* transforming the [[rx.lang.scala.Notification]] objects emitted by the source Observable into the items
Expand Down Expand Up @@ -1809,8 +1846,46 @@ trait Observable[+T]
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Observable-Utility-Operators#cache">RxJava wiki: cache</a>
* @since 0.20
*/
@deprecated("Use [[Observable.cacheWithInitialCapacity]] instead", "0.26.1")
def cache(capacity: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.cache(capacity))
toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
}

/**
* Caches emissions from the source [[Observable]] and replays them in order to any subsequent [[Subscriber]]s.
* This method has similar behavior to [[Observable.replay:* replay]] except that this auto-subscribes to the source
* [[Observable]] rather than returning a [[rx.lang.scala.observables.ConnectableObservable ConnectableObservable]]
* for which you must call [[rx.lang.scala.observables.ConnectableObservable.connect connect]] to activate the subscription.
*
* <img width="640" height="410" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/cache.png" alt="">
*
* This is useful when you want an [[Observable]] to cache responses and you can't control the
* subscribe/unsubscribe behavior of all the [[Subscriber]]s.
*
* When you call this method, it does not yet subscribe to the source Observable and so does not yet
* begin caching items. This only happens when the first [[Subscriber]] calls the resulting [[Observable]]'s `subscribe` method.
*
* **Note:** You sacrifice the ability to unsubscribe from the origin when you use the this method.
* So be careful not to use this method on [[Observable]]s that emit an infinite or very large number
* of items that will use up memory.
*
* ===Backpressure Support:===
* This operator does not support upstream backpressure as it is purposefully requesting and caching
* everything emitted.
*
* $noDefaultScheduler
*
* **Note:** The `capacity` hint is not an upper bound on cache size. For that, consider
* [[Observable.replay(bufferSize:Int):* replay(Int)]] in combination with
* [[rx.lang.scala.observables.ConnectableObservable.autoConnect:* ConnectableObservable.autoConnect]] or similar.
*
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
* @return an [[Observable]] that, when first subscribed to, caches all of its items and notifications for the
* benefit of subsequent [[Subscriber]]s
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
def cacheWithInitialCapacity(capacity: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.cacheWithInitialCapacity(capacity))
}

/**
Expand Down Expand Up @@ -3809,7 +3884,7 @@ trait Observable[+T]
* <p>
* <img width="640" height="305" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnTerminate.png" alt="" />
* <p>
* This differs from `finallyDo` in that this happens **before** `onCompleted/onError` are emitted.
* This differs from [[Observable.doAfterTerminate doAfterTerminate]] in that this happens **before** onCompleted/onError` are emitted.
*
* @param onTerminate the action to invoke when the source Observable calls `onCompleted` or `onError`
* @return the source Observable with the side-effecting behavior applied
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,24 @@

package rx.lang.scala.observables

import rx.lang.scala.{Observable, Subscription}
import rx.annotations.Beta
import rx.lang.scala.{Observable, Subscription, Subscriber}
import rx.lang.scala.JavaConversions._

/**
* A [[ConnectableObservable]] resembles an ordinary [[Observable]], except that it does not begin
* emitting items when it is subscribed to, but only when its [[ConnectableObservable.connect connect]] method is called.
* In this way you can wait for all intended [[Subscriber]]s to subscribe to the [[Observable]]
* before the [[Observable]] begins emitting items.
*
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
*
* @define beta
* <span class="badge badge-red" style="float: right;">BETA</span>
*
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators">RxJava Wiki: Connectable Observable Operators</a>
* @tparam T the type of items emitted
*/
class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observables.ConnectableObservable[_ <: T])
extends Observable[T] {

Expand All @@ -35,4 +50,31 @@ class ConnectableObservable[+T] private[scala](val asJavaObservable: rx.observab
* @return a [[rx.lang.scala.Observable]]
*/
def refCount: Observable[T] = toScalaObservable[T](asJavaObservable.refCount())

/**
* $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
* when the first [[Subscriber]] subscribes.
*
* @return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
* when the first [[Subscriber]] subscribes
*/
@Beta
def autoConnect: Observable[T] = {
toScalaObservable(asJavaObservable.autoConnect())
}

/**
* $beta Return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
* when the specified number of [[Subscriber]]s subscribe to it.
*
* @param numberOfSubscribers the number of [[Subscriber]]s to await before calling connect
* on the [[ConnectableObservable]]. A non-positive value indicates
* an immediate connection.
* @return an [[Observable]] that automatically connects to this [[ConnectableObservable]]
* when the specified number of [[Subscriber]]s subscribe to it
*/
@Beta
def autoConnect(numberOfSubscribers: Int): Observable[T] = {
toScalaObservable(asJavaObservable.autoConnect(numberOfSubscribers))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])",
"delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])",
"delaySubscription(Func0[_ <: Observable[U]])" -> "delaySubscription(() => Observable[Any])",
"delaySubscription(Observable[U])" -> "[use `delaySubscription(() => Observable[Any])`]",
"dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])",
"doOnCompleted(Action0)" -> "doOnCompleted(=> Unit)",
"doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]",
Expand All @@ -78,7 +79,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"doOnTerminate(Action0)" -> "doOnTerminate(=> Unit)",
"elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)",
"extend(Func1[_ >: OnSubscribe[T], _ <: R])" -> "[use Scala implicit feature to extend `Observable`]",
"finallyDo(Action0)" -> "finallyDo(=> Unit)",
"doAfterTerminate(Action0)" -> "doAfterTerminate(=> Unit)",
"first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]",
Expand Down Expand Up @@ -166,6 +167,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultiMap(T => K, T => V, => M)",
"toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> commentForToMultimapWithCollectionFactory,
"toSingle()" -> "[TODO]",
"toCompletable()" -> "[TODO]",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Int)" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
Expand All @@ -188,6 +190,7 @@ class ObservableCompletenessKit extends CompletenessKit {
"create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)",
"combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])",
"combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]])(Seq[T] => R)",
"combineLatest(Iterable[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "[use `combineLatest(iter.toSeq)(Seq[T] => R)`]",
"concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Observable[_ <: Observable[_ <: T]])" -> "concatEager(<:<[Observable[T], Observable[Observable[U]]])",
"concatEager(Observable[_ <: Observable[_ <: T]], Int)" -> "concatEager(Int)(<:<[Observable[T], Observable[Observable[U]]])",
Expand All @@ -211,6 +214,8 @@ class ObservableCompletenessKit extends CompletenessKit {
"mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Observable[_ <: Observable[_ <: T]], Int)" -> "flattenDelayError(Int)(<:<[Observable[T], Observable[Observable[U]]])",
"mergeDelayError(Iterable[_ <: Observable[_ <: T]])" -> "[use `iter.toObservable.flattenDelayError`]",
"mergeDelayError(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `iter.toObservable.flattenDelayError(Int)`]",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])",
"sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqualWith(Observable[U])((U, U) => Boolean)",
"range(Int, Int)" -> commentForRange,
Expand Down