Skip to content

Remove XResource classes #4126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public interface CompletableTransformer extends Function<Completable, Completabl
* @return the source or its wrapper Completable
* @throws NullPointerException if source is null
*/
static Completable wrap(CompletableConsumable source) {
public static Completable wrap(CompletableConsumable source) {
Objects.requireNonNull(source, "source is null");
if (source instanceof Completable) {
return (Completable)source;
Expand Down Expand Up @@ -175,10 +175,8 @@ public static Completable concat(Publisher<? extends CompletableConsumable> sour
* when the Completable is subscribed to.
* @return the created Completable instance
* @throws NullPointerException if onSubscribe is null
* @deprecated
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Deprecated // FIXME temporary
public static Completable create(CompletableConsumable onSubscribe) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
if (onSubscribe instanceof Completable) {
Expand Down Expand Up @@ -798,7 +796,6 @@ public final Throwable get(long timeout, TimeUnit unit) {
* @throws NullPointerException if onLift is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Deprecated // FIXME temporary
public final Completable lift(final CompletableOperator onLift) {
Objects.requireNonNull(onLift, "onLift is null");
return new CompletableLift(this, onLift);
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/io/reactivex/disposables/CompositeDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
import java.util.*;

import io.reactivex.exceptions.CompositeException;
import io.reactivex.internal.disposables.DisposableContainer;
import io.reactivex.internal.functions.Objects;
import io.reactivex.internal.util.*;

/**
* A disposable container that can hold onto multiple other disposables.
*/
public final class CompositeDisposable implements Disposable {
public final class CompositeDisposable implements Disposable, DisposableContainer {

OpenHashSet<Disposable> resources;

Expand Down Expand Up @@ -156,6 +157,18 @@ public void clear() {
dispose(set);
}

public int size() {
if (disposed) {
return 0;
}
synchronized (this) {
if (disposed) {
return 0;
}
return resources.size();
}
}

void dispose(OpenHashSet<Disposable> set) {
if (set == null) {
return;
Expand Down
24 changes: 4 additions & 20 deletions src/main/java/io/reactivex/disposables/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@

package io.reactivex.disposables;

import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.Functions;
import java.util.concurrent.Future;

import org.reactivestreams.Subscription;

import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.Functions;

/**
* Utility class to help create disposables by wrapping
* other types.
Expand Down Expand Up @@ -52,21 +53,4 @@ public static Disposable empty() {
public static Disposable disposed() {
return EmptyDisposable.INSTANCE;
}

private static final Consumer<Disposable> DISPOSER = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
d.dispose();
}
};

/**
* Returns a consumer that calls dispose on the received Disposable.
* @return the consumer that calls dispose on the received Disposable.
* @deprecated that generic resource management will be removed
*/
@Deprecated
public static Consumer<Disposable> consumeAndDispose() {
return DISPOSER;
}
}
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/disposables/FutureDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import java.util.concurrent.Future;

final class FutureDisposable extends ReferenceDisposable<Future<?>> {
/** */
private static final long serialVersionUID = 6545242830671168775L;

private final boolean allowInterrupt;

FutureDisposable(Future<?> run, boolean allowInterrupt) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public boolean isDisposed() {
}

static final class InnerDisposable extends ReferenceDisposable<RefCountDisposable> {
/** */
private static final long serialVersionUID = -6066815451193282256L;

InnerDisposable(RefCountDisposable parent) {
super(parent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import java.util.concurrent.atomic.AtomicReference;

abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disposable {
/** */
private static final long serialVersionUID = 6537757548749041217L;

ReferenceDisposable(T value) {
super(Objects.requireNonNull(value, "value is null"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
package io.reactivex.disposables;

final class RunnableDisposable extends ReferenceDisposable<Runnable> {
/** */
private static final long serialVersionUID = -8219729196779211169L;

RunnableDisposable(Runnable value) {
super(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import org.reactivestreams.Subscription;

final class SubscriptionDisposable extends ReferenceDisposable<Subscription> {
/** */
private static final long serialVersionUID = -707001650852963139L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these for? We shouldn't be encouraging people to serialize these instances.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eclipse complains about serializable classes (i.e., extending AtomicX) need these fields, even though we don't care about serialization. A small inconvenience.


SubscriptionDisposable(Subscription value) {
super(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,20 @@
import java.util.concurrent.atomic.AtomicReferenceArray;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

/**
* A composite resource with a fixed number of slots.
* A composite disposable with a fixed number of slots.
*
* <p>Note that since the implementation leaks the methods of AtomicReferenceArray, one must be
* careful to only call setResource, replaceResource and dispose on it. All other methods may lead to undefined behavior
* and should be used by internal means only.
*
* @param <T> the resource tpye
* @deprecated Use more type-specific and inlined resource management
*/
@Deprecated
public final class ArrayCompositeResource<T> extends AtomicReferenceArray<Object> implements Disposable {
public final class ArrayCompositeDisposable extends AtomicReferenceArray<Disposable> implements Disposable {
/** */
private static final long serialVersionUID = 2746389416410565408L;

final Consumer<? super T> disposer;

static final Object DISPOSED = new Object();

public ArrayCompositeResource(int capacity, Consumer<? super T> disposer) {
public ArrayCompositeDisposable(int capacity) {
super(capacity);
this.disposer = disposer;
}

/**
Expand All @@ -48,17 +38,16 @@ public ArrayCompositeResource(int capacity, Consumer<? super T> disposer) {
* @param resource
* @return true if the resource has ben set, false if the composite has been disposed
*/
@SuppressWarnings("unchecked")
public boolean setResource(int index, T resource) {
public boolean setResource(int index, Disposable resource) {
for (;;) {
Object o = get(index);
if (o == DISPOSED) {
disposer.accept(resource);
Disposable o = get(index);
if (o == DisposableHelper.DISPOSED) {
resource.dispose();;
return false;
}
if (compareAndSet(index, o, resource)) {
if (o != null) {
disposer.accept((T)o);
o.dispose();
}
return true;
}
Expand All @@ -71,31 +60,29 @@ public boolean setResource(int index, T resource) {
* @param resource
* @return the old resource, can be null
*/
@SuppressWarnings("unchecked")
public T replaceResource(int index, T resource) {
public Disposable replaceResource(int index, Disposable resource) {
for (;;) {
Object o = get(index);
if (o == DISPOSED) {
disposer.accept(resource);
Disposable o = get(index);
if (o == DisposableHelper.DISPOSED) {
resource.dispose();
return null;
}
if (compareAndSet(index, o, resource)) {
return (T)o;
return o;
}
}
}

@Override
@SuppressWarnings("unchecked")
public void dispose() {
if (get(0) != DISPOSED) {
if (get(0) != DisposableHelper.DISPOSED) {
int s = length();
for (int i = 0; i < s; i++) {
Object o = get(i);
if (o != DISPOSED) {
o = getAndSet(i, DISPOSED);
if (o != DISPOSED && o != null) {
disposer.accept((T)o);
Disposable o = get(i);
if (o != DisposableHelper.DISPOSED) {
o = getAndSet(i, DisposableHelper.DISPOSED);
if (o != DisposableHelper.DISPOSED && o != null) {
o.dispose();
}
}
}
Expand All @@ -104,6 +91,6 @@ public void dispose() {

@Override
public boolean isDisposed() {
return get(0) == DISPOSED;
return get(0) == DisposableHelper.DISPOSED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@

package io.reactivex.internal.disposables;

public interface CompositeResource<T> {
import io.reactivex.disposables.Disposable;

/**
* Common interface to add and remove disposables from a container.
*/
public interface DisposableContainer {

boolean add(T resource);
boolean add(Disposable d);

boolean remove(T resource);
boolean remove(Disposable d);

boolean delete(T resource);
boolean delete(Disposable d);
}
Loading