Skip to content

Commit e18d821

Browse files
Add CompositeSubscription
- also a utility method for creating a Subscription around a Future
1 parent f3427b0 commit e18d821

File tree

3 files changed

+143
-0
lines changed

3 files changed

+143
-0
lines changed

rxjava-core/src/main/java/rx/subscriptions/BooleanSubscription.java

+15
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.subscriptions;
217

318
import java.util.concurrent.atomic.AtomicBoolean;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import java.util.List;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
25+
import rx.Subscription;
26+
import rx.util.functions.Functions;
27+
28+
/**
29+
* Subscription that represents a group of Subscriptions that are unsubscribed together.
30+
*
31+
* @see Rx.Net equivalent CompositeDisposable at http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx
32+
*/
33+
public class CompositeSubscription implements Subscription {
34+
35+
private static final Logger logger = LoggerFactory.getLogger(Functions.class);
36+
37+
/*
38+
* The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
39+
*
40+
* TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
41+
*/
42+
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
43+
private final ConcurrentLinkedQueue<Subscription> subscriptions = new ConcurrentLinkedQueue<Subscription>();
44+
45+
public CompositeSubscription(List<Subscription> subscriptions) {
46+
this.subscriptions.addAll(subscriptions);
47+
}
48+
49+
public CompositeSubscription(Subscription... subscriptions) {
50+
for (Subscription s : subscriptions) {
51+
this.subscriptions.add(s);
52+
}
53+
}
54+
55+
public boolean isUnsubscribed() {
56+
return unsubscribed.get();
57+
}
58+
59+
public synchronized void add(Subscription s) {
60+
if (unsubscribed.get()) {
61+
s.unsubscribe();
62+
} else {
63+
subscriptions.add(s);
64+
}
65+
}
66+
67+
@Override
68+
public synchronized void unsubscribe() {
69+
if (unsubscribed.compareAndSet(false, true)) {
70+
for (Subscription s : subscriptions) {
71+
try {
72+
s.unsubscribe();
73+
} catch (Exception e) {
74+
logger.error("Failed to unsubscribe.", e);
75+
}
76+
}
77+
}
78+
}
79+
80+
}

rxjava-core/src/main/java/rx/subscriptions/Subscriptions.java

+48
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,22 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.subscriptions;
217

18+
import java.util.concurrent.Future;
19+
320
import rx.Subscription;
421
import rx.util.functions.Action0;
522
import rx.util.functions.FuncN;
@@ -31,6 +48,37 @@ public void unsubscribe() {
3148
};
3249
}
3350

51+
/**
52+
* A {@link Subscription} that wraps a {@link Future} and cancels it when unsubscribed.
53+
*
54+
*
55+
* @param f
56+
* {@link Future}
57+
* @return {@link Subscription}
58+
*/
59+
public static Subscription create(final Future<?> f) {
60+
return new Subscription() {
61+
62+
@Override
63+
public void unsubscribe() {
64+
f.cancel(true);
65+
}
66+
67+
};
68+
}
69+
70+
/**
71+
* A {@link Subscription} that groups multiple Subscriptions together and unsubscribes from all of them together.
72+
*
73+
* @param subscriptions
74+
* Subscriptions to group together
75+
* @return {@link Subscription}
76+
*/
77+
78+
public static CompositeSubscription create(Subscription... subscriptions) {
79+
return new CompositeSubscription(subscriptions);
80+
}
81+
3482
/**
3583
* A {@link Subscription} implemented via an anonymous function (such as closures from other languages).
3684
*

0 commit comments

Comments
 (0)