Skip to content

ConnectableObservable autoConnect operator #3023

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3533,13 +3533,13 @@ public final Observable<T> cache() {
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
* @param capacityHint hint for number of items to cache (for optimizing underlying data structure)
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
* benefit of subsequent subscribers
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
*/
public final Observable<T> cache(int capacity) {
return CachedObservable.from(this, capacity);
public final Observable<T> cache(int capacityHint) {
return CachedObservable.from(this, capacityHint);
}

/**
Expand Down
55 changes: 55 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeAutoConnect.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable.OnSubscribe;
import rx.*;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;

/**
* Wraps a ConnectableObservable and calls its connect() method once
* the specified number of Subscribers have subscribed.
*
* @param <T> the value type of the chain
*/
public final class OnSubscribeAutoConnect<T> implements OnSubscribe<T> {
final ConnectableObservable<? extends T> source;
final int numberOfSubscribers;
final Action1<? super Subscription> connection;
final AtomicInteger clients;

public OnSubscribeAutoConnect(ConnectableObservable<? extends T> source,
int numberOfSubscribers,
Action1<? super Subscription> connection) {
if (numberOfSubscribers <= 0) {
throw new IllegalArgumentException("numberOfSubscribers > 0 required");
}
this.source = source;
this.numberOfSubscribers = numberOfSubscribers;
this.connection = connection;
this.clients = new AtomicInteger();
}
@Override
public void call(Subscriber<? super T> child) {
source.unsafeSubscribe(child);
if (clients.incrementAndGet() == numberOfSubscribers) {
source.connect(connection);
}
}
}
61 changes: 56 additions & 5 deletions src/main/java/rx/observables/ConnectableObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
*/
package rx.observables;

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.internal.operators.OnSubscribeRefCount;
import rx.*;
import rx.annotations.Experimental;
import rx.functions.*;
import rx.internal.operators.*;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
Expand Down Expand Up @@ -80,4 +79,56 @@ public void call(Subscription t1) {
public Observable<T> refCount() {
return create(new OnSubscribeRefCount<T>(this));
}

/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes.
*
* @return an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public Observable<T> autoConnect() {
return autoConnect(1);
}
/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it.
*
* @param numberOfSubscribers the number of subscribers to await before calling connect
* on the ConnectableObservable. A non-positive value indicates
* an immediate connection.
* @return an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public Observable<T> autoConnect(int numberOfSubscribers) {
return autoConnect(numberOfSubscribers, Actions.empty());
}

/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it and calls the
* specified callback with the Subscription associated with the established connection.
*
* @param numberOfSubscribers the number of subscribers to await before calling connect
* on the ConnectableObservable. A non-positive value indicates
* an immediate connection.
* @param connection the callback Action1 that will receive the Subscription representing the
* established connection
* @return an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it and calls the
* specified callback with the Subscription associated with the established connection
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public Observable<T> autoConnect(int numberOfSubscribers, Action1<? super Subscription> connection) {
if (numberOfSubscribers <= 0) {
this.connect(connection);
return this;
}
return create(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
}
}
175 changes: 175 additions & 0 deletions src/test/java/rx/observables/ConnectableObservableTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observables;

import java.util.concurrent.atomic.*;

import org.junit.*;

import rx.*;
import rx.functions.*;
import rx.observers.TestSubscriber;

public class ConnectableObservableTest {
@Test
public void testAutoConnect() {
final AtomicInteger run = new AtomicInteger();

ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(run.incrementAndGet());
}
}).publish();

Observable<Integer> source = co.autoConnect();

Assert.assertEquals(0, run.get());

TestSubscriber<Integer> ts1 = TestSubscriber.create();
source.subscribe(ts1);

ts1.assertCompleted();
ts1.assertNoErrors();
ts1.assertValue(1);

Assert.assertEquals(1, run.get());

TestSubscriber<Integer> ts2 = TestSubscriber.create();
source.subscribe(ts2);

ts2.assertNotCompleted();
ts2.assertNoErrors();
ts2.assertNoValues();

Assert.assertEquals(1, run.get());
}
@Test
public void testAutoConnect0() {
final AtomicInteger run = new AtomicInteger();

ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(run.incrementAndGet());
}
}).publish();

Observable<Integer> source = co.autoConnect(0);

Assert.assertEquals(1, run.get());

TestSubscriber<Integer> ts1 = TestSubscriber.create();
source.subscribe(ts1);

ts1.assertNotCompleted();
ts1.assertNoErrors();
ts1.assertNoValues();

Assert.assertEquals(1, run.get());

TestSubscriber<Integer> ts2 = TestSubscriber.create();
source.subscribe(ts2);

ts2.assertNotCompleted();
ts2.assertNoErrors();
ts2.assertNoValues();

Assert.assertEquals(1, run.get());
}
@Test
public void testAutoConnect2() {
final AtomicInteger run = new AtomicInteger();

ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.just(run.incrementAndGet());
}
}).publish();

Observable<Integer> source = co.autoConnect(2);

Assert.assertEquals(0, run.get());

TestSubscriber<Integer> ts1 = TestSubscriber.create();
source.subscribe(ts1);

ts1.assertNotCompleted();
ts1.assertNoErrors();
ts1.assertNoValues();

Assert.assertEquals(0, run.get());

TestSubscriber<Integer> ts2 = TestSubscriber.create();
source.subscribe(ts2);

Assert.assertEquals(1, run.get());

ts1.assertCompleted();
ts1.assertNoErrors();
ts1.assertValue(1);

ts2.assertCompleted();
ts2.assertNoErrors();
ts2.assertValue(1);

}

@Test
public void testAutoConnectUnsubscribe() {
final AtomicInteger run = new AtomicInteger();

ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return Observable.range(run.incrementAndGet(), 10);
}
}).publish();

final AtomicReference<Subscription> conn = new AtomicReference<Subscription>();

Observable<Integer> source = co.autoConnect(1, new Action1<Subscription>() {
@Override
public void call(Subscription t) {
conn.set(t);
}
});

Assert.assertEquals(0, run.get());

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
Subscription s = conn.get();
if (s != null) {
s.unsubscribe();
} else {
onError(new NullPointerException("No connection reference"));
}
}
};

source.subscribe(ts);

ts.assertNotCompleted();
ts.assertNoErrors();
ts.assertValue(1);

Assert.assertTrue("Connection not unsubscribed?", conn.get().isUnsubscribed());
}
}