Skip to content

Commit 2bc482e

Browse files
committed
Add ErrorDelayingObservable
1 parent da75c60 commit 2bc482e

File tree

4 files changed

+224
-139
lines changed

4 files changed

+224
-139
lines changed

examples/src/test/scala/examples/RxScalaDemo.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,9 +1511,9 @@ class RxScalaDemo extends JUnitSuite {
15111511
if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
15121512
}.subscribe(println(_), _.printStackTrace)
15131513

1514-
println("=== concatMapDelayError ===")
1514+
println("=== delayError.concatMap ===")
15151515
(1 to 10).toObservable
1516-
.concatMapDelayError{ i =>
1516+
.delayError.concatMap { i =>
15171517
if (i == 2) Observable.error(new IOException("Oops")) else Observable.just(i)
15181518
}.subscribe(println(_), _.printStackTrace)
15191519
}
@@ -1565,7 +1565,7 @@ class RxScalaDemo extends JUnitSuite {
15651565

15661566
Thread.sleep(2000)
15671567

1568-
println("=== switchDelayError ===")
1568+
println("=== delayError.switch ===")
15691569
Observable.interval(300 millis).take(3).map { n =>
15701570
if (n == 0) {
15711571
Observable.error(new RuntimeException("Oops!"))
@@ -1575,7 +1575,7 @@ class RxScalaDemo extends JUnitSuite {
15751575
.take(3)
15761576
.doOnSubscribe(println(s"subscribe to o$n"))
15771577
}
1578-
}.switchDelayError.subscribe(println(_), _.printStackTrace())
1578+
}.delayError.switch.subscribe(println(_), _.printStackTrace())
15791579
}
15801580

15811581
@Test def switchMapExample() {
@@ -1600,8 +1600,8 @@ class RxScalaDemo extends JUnitSuite {
16001600

16011601
Thread.sleep(2000)
16021602

1603-
println("=== switchMapDelayError ===")
1604-
Observable.interval(300 millis).take(3).switchMapDelayError { n =>
1603+
println("=== delayError.switchMap ===")
1604+
Observable.interval(300 millis).take(3).delayError.switchMap { n =>
16051605
if (n == 0) {
16061606
Observable.error(new RuntimeException("Oops!"))
16071607
} else {
@@ -1820,7 +1820,7 @@ class RxScalaDemo extends JUnitSuite {
18201820
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
18211821
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
18221822
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
1823-
Observable.just(o1, o2, o3, o4).flattenDelayError.subscribe(println(_), _.printStackTrace())
1823+
Observable.just(o1, o2, o3, o4).delayError.flatten.subscribe(println(_), _.printStackTrace())
18241824
}
18251825

18261826
@Test def flattenDelayErrorExample2() {
@@ -1829,7 +1829,7 @@ class RxScalaDemo extends JUnitSuite {
18291829
val o2 = Observable.interval(100 millis).map(l => s"o2 emit $l").take(3).doOnSubscribe(println(s"subscribe to o2"))
18301830
val o3 = Observable.interval(100 millis).map(l => s"o3 emit $l").take(3).doOnSubscribe(println(s"subscribe to o3"))
18311831
val o4 = Observable.interval(100 millis).map(l => s"o4 emit $l").take(3).doOnSubscribe(println(s"subscribe to o4"))
1832-
Observable.just(o1, o2, o3, o4).flattenDelayError(2).subscribe(println(_), _.printStackTrace())
1832+
Observable.just(o1, o2, o3, o4).delayError.flatten(2).subscribe(println(_), _.printStackTrace())
18331833
}
18341834

18351835
@Test def blockingObservableSubscribeExample(): Unit = {
@@ -1885,7 +1885,7 @@ class RxScalaDemo extends JUnitSuite {
18851885
val o2 = Observable.just(1, 2, 3)
18861886
val os = Observable.just(o1, o2)
18871887
os.concat.subscribe(i => println("concat: " + i), e => println("concat: " + e.getMessage))
1888-
os.concatDelayError.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage))
1888+
os.delayError.concat.subscribe(i => println("concatDelayError: " + i), e => println("concatDelayError: " + e.getMessage))
18891889
}
18901890

18911891
def onTerminateDetachExample(): Unit = {

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 14 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ package rx.lang.scala
1919
import rx.annotations.{Beta, Experimental}
2020
import rx.exceptions.OnErrorNotImplementedException
2121
import rx.functions.FuncN
22-
import rx.lang.scala.observables.ConnectableObservable
22+
import rx.lang.scala.observables.{ConnectableObservable, ErrorDelayingObservable}
2323
import scala.concurrent.duration
2424
import java.util
25+
2526
import collection.JavaConversions._
2627
import scala.collection.generic.CanBuildFrom
2728
import scala.annotation.unchecked.uncheckedVariance
@@ -316,26 +317,6 @@ trait Observable[+T]
316317
toScalaObservable[U](o5)
317318
}
318319

319-
/**
320-
* $experimental Concatenates the [[Observable]] sequence of [[Observable]]s into a single sequence by subscribing to
321-
* each inner [[Observable]], one after the other, one at a time and delays any errors till the all inner and the
322-
* outer [[Observable]]s terminate.
323-
*
324-
* $supportBackpressure
325-
*
326-
* $noDefaultScheduler
327-
*
328-
* @return the new [[Observable]] with the concatenating behavior
329-
*/
330-
@Experimental
331-
def concatDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
332-
val o2: Observable[Observable[U]] = this
333-
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
334-
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
335-
val o5 = rx.Observable.concatDelayError[U](o4)
336-
toScalaObservable[U](o5)
337-
}
338-
339320
/**
340321
* Returns a new Observable that emits items resulting from applying a function that you supply to each item
341322
* emitted by the source Observable, where that function returns an Observable, and then emitting the items
@@ -355,27 +336,6 @@ trait Observable[+T]
355336
}))
356337
}
357338

358-
/**
359-
* $experimental Maps each of the items into an [[Observable]], subscribes to them one after the other,
360-
* one at a time and emits their values in order while delaying any error from either this or any of the inner [[Observable]]s
361-
* till all of them terminate.
362-
*
363-
* $supportBackpressure
364-
*
365-
* $noDefaultScheduler
366-
*
367-
* @param f the function that maps the items of this [[Observable]] into the inner [[Observable]]s.
368-
* @return the new [[Observable]] instance with the concatenation behavior
369-
*/
370-
@Experimental
371-
def concatMapDelayError[R](f: T => Observable[R]): Observable[R] = {
372-
toScalaObservable[R](asJavaObservable.concatMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] {
373-
def call(t1: T): rx.Observable[_ <: R] = {
374-
f(t1).asJavaObservable
375-
}
376-
}))
377-
}
378-
379339
/**
380340
* $experimental Concatenates `this` and `that` source [[Observable]]s eagerly into a single stream of values.
381341
*
@@ -2682,27 +2642,6 @@ trait Observable[+T]
26822642
}))
26832643
}
26842644

2685-
/**
2686-
* $experimental Returns a new [[Observable]] by applying a function that you supply to each item emitted by the source
2687-
* [[Observable]] that returns an [[Observable]], and then emitting the items emitted by the most recently emitted
2688-
* of these [[Observable]]s and delays any error until all [[Observable]]s terminate.
2689-
*
2690-
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="" />
2691-
*
2692-
* $noDefaultScheduler
2693-
*
2694-
* @param f a function that, when applied to an item emitted by the source [[Observable]], returns an [[Observable]]
2695-
* @return an [[Observable]] that emits the items emitted by the [[Observable]] returned from applying `f` to the most
2696-
* recently emitted item emitted by the source [[Observable]]
2697-
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
2698-
*/
2699-
@Experimental
2700-
def switchMapDelayError[R](f: T => Observable[R]): Observable[R] = {
2701-
toScalaObservable[R](asJavaObservable.switchMapDelayError[R](new Func1[T, rx.Observable[_ <: R]] {
2702-
def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable
2703-
}))
2704-
}
2705-
27062645
/**
27072646
* $experimental Returns an [[Observable]] that emits the items emitted by the source [[Observable]] or the items of an alternate
27082647
* [[Observable]] if the source [[Observable]] is empty.
@@ -2744,33 +2683,6 @@ trait Observable[+T]
27442683
}
27452684
// Naming: We follow C# (switch), not Java (switchOnNext), because Java just had to avoid clash with keyword
27462685

2747-
2748-
/**
2749-
* $experimental Converts this [[Observable]] that emits [[Observable]]s into an [[Observable]] that emits the items emitted by the
2750-
* most recently emitted of those [[Observable]]s and delays any exception until all [[Observable]]s terminate.
2751-
*
2752-
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchDo.png" alt="" />
2753-
*
2754-
* It subscribes to an [[Observable]] that emits [[Observable]]s. Each time it observes one of
2755-
* these emitted [[Observable]]s, the [[Observable]] returned by this method begins emitting the items
2756-
* emitted by that [[Observable]]. When a new [[Observable]] is emitted, it stops emitting items
2757-
* from the earlier-emitted [[Observable]] and begins emitting items from the new one.
2758-
*
2759-
* $noDefaultScheduler
2760-
*
2761-
* @return an [[Observable]] that emits the items emitted by the [[Observable]] most recently emitted by the source
2762-
* [[Observable]]
2763-
* @see <a href="http://reactivex.io/documentation/operators/switch.html">ReactiveX operators documentation: Switch</a>
2764-
*/
2765-
@Experimental
2766-
def switchDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
2767-
val o2: Observable[Observable[U]] = this
2768-
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
2769-
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
2770-
val o5 = rx.Observable.switchOnNextDelayError[U](o4)
2771-
toScalaObservable[U](o5)
2772-
}
2773-
27742686
/**
27752687
* Flattens two Observables into one Observable, without any transformation.
27762688
*
@@ -2790,7 +2702,7 @@ trait Observable[+T]
27902702
toScalaObservable[U](rx.Observable.merge(thisJava, thatJava))
27912703
}
27922704

2793-
/**
2705+
/**
27942706
* This behaves like [[rx.lang.scala.Observable.merge]] except that if any of the merged Observables
27952707
* notify of an error via [[rx.lang.scala.Observer.onError onError]], `mergeDelayError` will
27962708
* refrain from propagating that error notification until all of the merged Observables have
@@ -2809,6 +2721,7 @@ trait Observable[+T]
28092721
* @return an Observable that emits items that are the result of flattening the items emitted by
28102722
* `this` and `that`
28112723
*/
2724+
@deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.merge delayError.merge]]] instead", "0.26.2")
28122725
def mergeDelayError[U >: T](that: Observable[U]): Observable[U] = {
28132726
toScalaObservable[U](rx.Observable.mergeDelayError[U](this.asJavaObservable, that.asJavaObservable))
28142727
}
@@ -2881,6 +2794,7 @@ trait Observable[+T]
28812794
* @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
28822795
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
28832796
*/
2797+
@deprecated("Use [[[rx.lang.scala.observables.ErrorDelayingObservable.flatten[U](implicit* delayError.flatten]]] instead", "0.26.2")
28842798
def flattenDelayError[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
28852799
val o2: Observable[Observable[U]] = this
28862800
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
@@ -2889,37 +2803,6 @@ trait Observable[+T]
28892803
toScalaObservable[U](o5)
28902804
}
28912805

2892-
/**
2893-
* $experimental Flattens an [[Observable]] that emits [[Observable]]s into one [[Observable]], in a way that allows an [[Observer]] to
2894-
* receive all successfully emitted items from all of the source [[Observable]]s without being interrupted by
2895-
* an error notification from one of them, while limiting the
2896-
* number of concurrent subscriptions to these [[Observable]]s.
2897-
*
2898-
* This behaves like `flatten` except that if any of the merged [[Observable]]s notify of an
2899-
* error via `onError`, `flattenDelayError` will refrain from propagating that
2900-
* error notification until all of the merged [[Observable]]s have finished emitting items.
2901-
*
2902-
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2903-
*
2904-
* Even if multiple merged [[Observable]]s send `onError` notifications, `flattenDelayError` will only
2905-
* invoke the `onError` method of its `Observer`s once.
2906-
*
2907-
* $noDefaultScheduler
2908-
*
2909-
* @param maxConcurrent the maximum number of [[Observable]]s that may be subscribed to concurrently
2910-
* @return an [[Observable]] that emits all of the items emitted by the [[Observable]]s emitted by `this`
2911-
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2912-
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2913-
*/
2914-
@Experimental
2915-
def flattenDelayError[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
2916-
val o2: Observable[Observable[U]] = this
2917-
val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable)
2918-
val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable
2919-
val o5 = rx.Observable.mergeDelayError[U](o4, maxConcurrent)
2920-
toScalaObservable[U](o5)
2921-
}
2922-
29232806
/**
29242807
* Combines two observables, emitting a pair of the latest values of each of
29252808
* the source observables each time an event is received from one of the source observables.
@@ -3890,6 +3773,15 @@ trait Observable[+T]
38903773
new BlockingObservable[T](this)
38913774
}
38923775

3776+
/**
3777+
* $experimental Converts an [[Observable]] into a [[rx.lang.scala.observables.ErrorDelayingObservable ErrorDelayingObservable]]
3778+
* that provides operators which delay errors when composing multiple [[Observable]]s.
3779+
*/
3780+
@Experimental
3781+
def delayError: ErrorDelayingObservable[T] = {
3782+
new ErrorDelayingObservable[T](this)
3783+
}
3784+
38933785
/** Tests whether a predicate holds for some of the elements of this `Observable`.
38943786
*
38953787
* @param p the predicate used to test elements.

0 commit comments

Comments
 (0)