Skip to content

Commit faa17cf

Browse files
committed
Merge pull request #4 from zsxwing/pr166
Update the signature of flatMapWith and fix the Subject doc
2 parents b48ccc7 + 6d329f0 commit faa17cf

File tree

4 files changed

+47
-28
lines changed

4 files changed

+47
-28
lines changed

examples/src/test/scala/rx/lang/scala/examples/ExperimentalAPIExamples.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ object ExperimentalAPIExamples {
128128
@Test def flatMapWithMaxConcurrentExample(): Unit = {
129129
(1 to 1000000).toObservable
130130
.doOnNext(v => println(s"Emitted Value: $v"))
131-
.flatMap(maxConcurrent = 10, (v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()))
131+
.flatMap(maxConcurrent = 10, v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()))
132132
.toBlocking.foreach(v => System.out.println("Received: " + v))
133133
}
134134

@@ -137,13 +137,23 @@ object ExperimentalAPIExamples {
137137
.doOnNext(v => println(s"Emitted Value: $v"))
138138
.flatMap(
139139
maxConcurrent = 10,
140-
(v: Int) => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
140+
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
141141
e => Observable.just(-1).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler()),
142142
() => Observable.just(Int.MaxValue).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
143143
)
144144
.toBlocking.foreach(v => System.out.println("Received: " + v))
145145
}
146146

147+
@Test def flatMapWithMaxConcurrentExample3() {
148+
(1 to 1000000).toObservable
149+
.doOnNext(v => println(s"Emitted Value: $v"))
150+
.flatMapWith(
151+
maxConcurrent = 10,
152+
v => Observable.just(v).doOnNext(_ => Thread.sleep(1)).subscribeOn(IOScheduler())
153+
)(_ * _).subscribeOn(IOScheduler())
154+
.toBlocking.foreach(v => System.out.println("Received: " + v))
155+
}
156+
147157
@Test def onBackpressureDropDoExample(): Unit = {
148158
Observable[Int](subscriber => {
149159
(1 to 200).foreach(subscriber.onNext)

src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,7 @@ trait Observable[+T]
10731073
* source Observable and the collection Observable
10741074
*/
10751075
@Beta
1076-
def flatMapWith[U, R](maxConcurrent: Int)(collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
1076+
def flatMapWith[U, R](maxConcurrent: Int, collectionSelector: T => Observable[U])(resultSelector: (T, U) => R): Observable[R] = {
10771077
val jCollectionSelector = new Func1[T, rx.Observable[_ <: U]] {
10781078
override def call(t: T): rx.Observable[_ <: U] = collectionSelector(t).asJavaObservable
10791079
}

src/main/scala/rx/lang/scala/Subject.scala

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -65,67 +65,76 @@ trait Subject[T] extends Observable[T] with Observer[T] {
6565
}
6666

6767
/**
68-
* $experimental Check if the Subject has terminated with an exception.
69-
* <p>The operation is threadsafe.
68+
* $experimental Check if the [[Subject]] has terminated with an exception.
7069
*
71-
* @return `true` if the subject has received a throwable through { @code onError}.
70+
* The operation is threadsafe.
71+
*
72+
* @return `true` if the [[Subject]] has received a throwable through `onError`.
7273
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
7374
*/
7475
@Experimental
7576
def hasThrowable: Boolean = asJavaSubject.hasThrowable
7677

7778
/**
78-
* $experimental Check if the Subject has terminated normally.
79-
* <p>The operation is threadsafe.
79+
* $experimental Check if the [[Subject]] has terminated normally.
80+
*
81+
* The operation is threadsafe.
8082
*
81-
* @return `true` if the subject completed normally via { @code onCompleted}
83+
* @return `true` if the [[Subject]] completed normally via `onCompleted`
8284
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
8385
*/
8486
@Experimental
8587
def hasCompleted: Boolean = asJavaSubject.hasCompleted
8688

8789
/**
88-
* $experimental Returns the Throwable that terminated the Subject.
89-
* <p>The operation is threadsafe.
90+
* $experimental Returns the `Throwable` that terminated the [[Subject]].
91+
*
92+
* The operation is threadsafe.
9093
*
91-
* @return the Throwable that terminated the Subject or { @code null} if the subject hasn't terminated yet or
92-
* if it terminated normally.
94+
* @return the `Throwable` that terminated the [[Subject]] or `null` if the subject hasn't terminated yet or
95+
* if it terminated normally.
9396
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
9497
*/
9598
@Experimental
9699
def getThrowable: Throwable = asJavaSubject.getThrowable
97100

98101
/**
99-
* $experimental Check if the Subject has any value.
100-
* <p>Use the `#getValue()` method to retrieve such a value.
101-
* <p>Note that unless `#hasCompleted()` or `#hasThrowable()` returns true, the value
102-
* retrieved by `getValue()` may get outdated.
103-
* <p>The operation is threadsafe.
102+
* $experimental Check if the [[Subject]] has any value.
103+
*
104+
* Use the [[Subject.getValue]] method to retrieve such a value.
105+
*
106+
* Note that unless [[Subject.hasCompleted]] or [[Subject.hasThrowable]] returns true, the value
107+
* retrieved by [[Subject.getValue]] may get outdated.
104108
*
105-
* @return { @code true} if and only if the subject has some value but not an error
109+
* The operation is threadsafe.
110+
*
111+
* @return `true` if and only if the [[Subject]] has some value but not an error
106112
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
107113
*/
108114
@Experimental
109115
def hasValue: Boolean = asJavaSubject.hasValue
110116

111117
/**
112-
* $experimental Returns the current or latest value of the Subject if there is such a value and
118+
* $experimental Returns the current or latest value of the [[Subject]] if there is such a value and
113119
* the subject hasn't terminated with an exception.
114-
* <p>The method can return `null` for various reasons. Use `#hasValue()`, `#hasThrowable()`
115-
* and `#hasCompleted()` to determine if such `null` is a valid value, there was an
116-
* exception or the Subject terminated without receiving any value.
117-
* <p>The operation is threadsafe.
118120
*
119-
* @return the current value or { @code null} if the Subject doesn't have a value, has terminated with an
120-
* exception or has an actual { @code null} as a value.
121+
* The method can return `null` for various reasons. Use [[Subject.hasValue]], [[Subject.hasThrowable]]
122+
* and [[Subject.hasCompleted]] to determine if such `null` is a valid value, there was an
123+
* exception or the [[Subject]] terminated without receiving any value.
124+
*
125+
* The operation is threadsafe.
126+
*
127+
* @return the current value or `null` if the [[Subject]] doesn't have a value, has terminated with an
128+
* exception or has an actual `null` as a value.
121129
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)
122130
*/
123131
@Experimental
124132
def getValue: T = asJavaSubject.getValue.asInstanceOf[T]
125133

126134
/**
127135
* $experimental Returns a snapshot of the currently buffered non-terminal events.
128-
* <p>The operation is threadsafe.
136+
*
137+
* The operation is threadsafe.
129138
*
130139
* @return a snapshot of the currently buffered non-terminal events.
131140
* @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number)

src/test/scala/rx/lang/scala/CompletenessTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class CompletenessTest extends JUnitSuite {
121121
"limit(Int)" -> "take(Int)",
122122
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapWith(T => Observable[U])((T, U) => R)",
123123
"flatMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterableWith(T => Iterable[U])((T, U) => R)",
124-
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int)(T => Observable[U])((T, U) => R)",
124+
"flatMap(Func1[_ >: T, _ <: Observable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R], Int)" -> "flatMapWith(Int, T => Observable[U])((T, U) => R)",
125125
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R])",
126126
"flatMap(Func1[_ >: T, _ <: Observable[_ <: R]], Func1[_ >: Throwable, _ <: Observable[_ <: R]], Func0[_ <: Observable[_ <: R]], Int)" -> "flatMap(Int, T => Observable[R], Throwable => Observable[R], () => Observable[R])",
127127
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "groupBy(T => K, T => V)",

0 commit comments

Comments
 (0)