Skip to content

Commit 89e630a

Browse files
committed
Merged master into buffer-operation branch
2 parents 35b3c69 + ef77169 commit 89e630a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3148
-1255
lines changed

CHANGES.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# RxJava Releases #
22

3+
### Version 0.9.1 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.9.1%22)) ###
4+
5+
* [Pull 303](https://github.com/Netflix/RxJava/pull/303) CombineLatest
6+
* [Pull 290](https://github.com/Netflix/RxJava/pull/290) Zip overload with FuncN
7+
* [Pull 302](https://github.com/Netflix/RxJava/pull/302) NPE fix when no package on class
8+
* [Pull 284](https://github.com/Netflix/RxJava/pull/284) GroupBy fixes (items still [oustanding](https://github.com/Netflix/RxJava/issues/282))
9+
* [Pull 288](https://github.com/Netflix/RxJava/pull/288) PublishSubject concurrent modification fixes
10+
* [Issue 198](https://github.com/Netflix/RxJava/issues/198) Throw if no onError handler specified
11+
* [Issue 278](https://github.com/Netflix/RxJava/issues/278) Subscribe argument validation
12+
* Javadoc improvements and many new marble diagrams
13+
314
### Version 0.9.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.9.0%22)) ###
415

516
This release includes breaking changes that move all blocking operators (such as `single`, `last`, `forEach`) to `BlockingObservable`.

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.9.1-SNAPSHOT
1+
version=0.9.2-SNAPSHOT

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,21 @@ import static org.mockito.Matchers.*;
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.Collection;
23+
import java.util.Map;
2224

2325
import org.junit.Before;
2426
import org.junit.Test;
2527
import static org.junit.Assert.*;
28+
2629
import org.mockito.Mock;
2730
import org.mockito.MockitoAnnotations;
2831

2932
import rx.Notification;
3033
import rx.Observable;
3134
import rx.Observer;
3235
import rx.Subscription;
36+
import rx.observables.GroupedObservable;
3337
import rx.subscriptions.Subscriptions;
3438
import rx.util.functions.Func1;
3539

@@ -280,6 +284,29 @@ def class ObservableTests {
280284
Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
281285
verify(a, times(1)).received(true);
282286
}
287+
288+
@Test
289+
public void testGroupBy() {
290+
int count=0;
291+
292+
Observable.from("one", "two", "three", "four", "five", "six")
293+
.groupBy({String s -> s.length()})
294+
.mapMany({
295+
groupObservable ->
296+
297+
return groupObservable.map({
298+
s ->
299+
return "Value: " + s + " Group: " + groupObservable.getKey();
300+
});
301+
}).toBlockingObservable().forEach({
302+
s ->
303+
println(s);
304+
count++;
305+
})
306+
307+
assertEquals(6, count);
308+
}
309+
283310

284311
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
285312

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1667 additions & 889 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 293 additions & 142 deletions
Large diffs are not rendered by default.

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,31 @@
2020
import rx.Subscription;
2121
import rx.util.functions.Func1;
2222

23+
/**
24+
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
25+
* emitting items when it is subscribed to, but only when its {@link #connect} method is called. In
26+
* this way you can wait for all intended {@link Observer}s to {@link Observable#subscribe} to the
27+
* Observable before the Observable begins emitting items.
28+
* <p>
29+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/publishConnect.png">
30+
* <p>
31+
* For more information see
32+
* <a href="https://github.com/Netflix/RxJava/wiki/Connectable-Observable-Operators">Connectable
33+
* Observable Operators</a> at the RxJava Wiki
34+
*
35+
* @param <T>
36+
*/
37+
2338
public abstract class ConnectableObservable<T> extends Observable<T> {
2439

2540
protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
2641
super(onSubscribe);
2742
}
2843

44+
/**
45+
* Call a ConnectableObservable's connect() method to instruct it to begin emitting the
46+
* items from its underlying {@link Observable} to its {@link Observer}s.
47+
*/
2948
public abstract Subscription connect();
3049

3150
}

rxjava-core/src/main/java/rx/observables/GroupedObservable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
import rx.util.functions.Func1;
2222

2323
/**
24-
* An {@link Observable} that has been grouped by a key whose value can be obtained using {@link #getKey()} <p>
24+
* An {@link Observable} that has been grouped by a key whose value can be obtained using
25+
* {@link #getKey()} <p>
2526
*
2627
* @see Observable#groupBy(Observable, Func1)
2728
*

rxjava-core/src/main/java/rx/operators/AtomicObserver.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import rx.Observer;
77
import rx.plugins.RxJavaPlugins;
88
import rx.util.CompositeException;
9+
import rx.util.OnErrorNotImplementedException;
910

1011
/**
1112
* Wrapper around Observer to ensure compliance with Rx contract.
@@ -70,13 +71,28 @@ public void onError(Exception e) {
7071
try {
7172
actual.onError(e);
7273
} catch (Exception e2) {
73-
// if the onError itself fails then pass to the plugin
74-
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
75-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
76-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
77-
// and throw exception despite that not being proper for Rx
78-
// https://github.com/Netflix/RxJava/issues/198
79-
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
74+
if (e2 instanceof OnErrorNotImplementedException) {
75+
/**
76+
* onError isn't implemented so throw
77+
*
78+
* https://github.com/Netflix/RxJava/issues/198
79+
*
80+
* Rx Design Guidelines 5.2
81+
*
82+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
83+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
84+
* The OnCompleted behavior in this case is to do nothing."
85+
*/
86+
throw (OnErrorNotImplementedException) e2;
87+
} else {
88+
// if the onError itself fails then pass to the plugin
89+
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
90+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
91+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
92+
// and throw exception despite that not being proper for Rx
93+
// https://github.com/Netflix/RxJava/issues/198
94+
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
95+
}
8096
}
8197
// auto-unsubscribe
8298
subscription.unsubscribe();

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
import static org.mockito.Mockito.verify;
1313
import static org.mockito.Mockito.verifyNoMoreInteractions;
1414

15+
/**
16+
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
17+
* Observable satisfy a condition.
18+
* <p>
19+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/all.png">
20+
*/
1521
public class OperationAll {
1622

1723
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,9 +398,9 @@ public void onNext(T args) {
398398
* @param <T> The type of object all internal {@link Buffer} objects record.
399399
*/
400400
private interface BufferCreator<T> {
401-
/**
402-
* Signifies a onNext event.
403-
*/
401+
/**
402+
* Signifies a onNext event.
403+
*/
404404
void onValuePushed();
405405

406406
/**

rxjava-core/src/main/java/rx/operators/OperationCache.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@
3333
import rx.util.functions.Func1;
3434

3535
/**
36-
* Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence.
36+
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
37+
* to the source Observable rather than returning a connectable Observable.
3738
* <p>
38-
* This is useful when returning an Observable that you wish to cache responses but can't control the
39+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/cache.png">
40+
* <p>
41+
* This is useful with an Observable that you want to cache responses when you can't control the
3942
* subscribe/unsubscribe behavior of all the Observers.
4043
* <p>
41-
* NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not
42-
* use this on infinite or very large sequences that will use up memory. This is similar to
43-
* the {@link Observable#toList()} operator in this caution.
44-
*
44+
* NOTE: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
45+
* careful not to use this operator on Observables that emit infinite or very large numbers of
46+
* items, as this will use up memory.
4547
*/
4648
public class OperationCache {
4749

0 commit comments

Comments
 (0)