Skip to content

Buffer with Observable boundary. #733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -3076,6 +3076,35 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> combineLates
return create(OperationCombineLatest.combineLatest(o1, o2, o3, o4, o5, o6, o7, o8, o9, combineFunction));
}

/**
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
* <p>
* Completion of either this or the boundary observable causes the returned observable
* to emit the latest buffer and complete.
* @param <B> the boundary value type (ignored)
* @param boundary the boundary observable
* @return an Observable that emits buffered items once the boundary observable emits an item.
* @see #buffer(rx.Observable, int)
*/
public <B> Observable<List<T>> buffer(Observable<B> boundary) {
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary));
}

/**
* Create an Observable that emits non-overlapping buffered items once the boundary observable emits an item.
* <p>
* Completion of either this or the boundary observable causes the returned observable
* to emit the latest buffer and complete.
* @param <B> the boundary value type (ignored)
* @param boundary the boundary observable
* @param initialCapacity the initial capacity of each buffer chunk
* @return an Observable that emits buffered items once the boundary observable emits an item.
* @see #buffer(rx.Observable, int)
*/
public <B> Observable<List<T>> buffer(Observable<B> boundary, int initialCapacity) {
return create(OperationBuffer.bufferWithBoundaryObservable(this, boundary, initialCapacity));
}

/**
* Creates an Observable that emits buffers of items it collects from the
* source Observable. The resulting Observable emits connected,
Expand Down
136 changes: 136 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package rx.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -397,4 +398,139 @@ public void unsubscribe() {
}
}
}

/**
* Create a buffer operator with the given observable sequence as the buffer boundary.
*/
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary) {
return new BufferWithObservableBoundary<T, B>(source, boundary, 16);
}
/**
* Create a buffer operator with the given observable sequence as the buffer boundary and
* with the given initial capacity for buffers.
*/
public static <T, B> OnSubscribeFunc<List<T>> bufferWithBoundaryObservable(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
if (initialCapacity <= 0) {
throw new IllegalArgumentException("initialCapacity > 0 required");
}
return new BufferWithObservableBoundary<T, B>(source, boundary, initialCapacity);
}

/**
* Buffer until an element is emitted from a helper observable.
* @param <T> the buffered value type
*/
private static final class BufferWithObservableBoundary<T, B> implements OnSubscribeFunc<List<T>> {
final Observable<? extends T> source;
final Observable<B> boundary;
final int initialCapacity;

public BufferWithObservableBoundary(Observable<? extends T> source, Observable<B> boundary, int initialCapacity) {
this.source = source;
this.boundary = boundary;
this.initialCapacity = initialCapacity;
}

@Override
public Subscription onSubscribe(Observer<? super List<T>> t1) {
CompositeSubscription csub = new CompositeSubscription();

SourceObserver<T> so = new SourceObserver<T>(t1, initialCapacity, csub);
csub.add(source.subscribe(so));
csub.add(boundary.subscribe(new BoundaryObserver<B>(so)));

return csub;
}
/**
* Observes the source.
*/
private static final class SourceObserver<T> implements Observer<T> {
final Observer<? super List<T>> observer;
/** The buffer, if null, that indicates a terminal state. */
List<T> buffer;
final int initialCapacity;
final Object guard;
final Subscription cancel;
public SourceObserver(Observer<? super List<T>> observer, int initialCapacity, Subscription cancel) {
this.observer = observer;
this.initialCapacity = initialCapacity;
this.guard = new Object();
this.cancel = cancel;
buffer = new ArrayList<T>(initialCapacity);
}

@Override
public void onNext(T args) {
synchronized (guard) {
buffer.add(args);
}
}

@Override
public void onError(Throwable e) {
synchronized (guard) {
if (buffer == null) {
return;
}
buffer = null;
}
observer.onError(e);
cancel.unsubscribe();
}

@Override
public void onCompleted() {
emitAndComplete();
cancel.unsubscribe();
}
void emitAndReplace() {
List<T> buf;
synchronized (guard) {
if (buffer == null) {
return;
}
buf = buffer;
buffer = new ArrayList<T>(initialCapacity);
}
observer.onNext(buf);
}
void emitAndComplete() {
List<T> buf;
synchronized (guard) {
if (buffer == null) {
return;
}
buf = buffer;
buffer = null;
}
observer.onNext(buf);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

observer.onNext, onCompleted, onError can be called from different Observables, so I suppose we need to wrap it by a SynchronizedObserver.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Observer is mostly used from the boundary observable but we can move the onXXX functions into the guard block.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to Rx contract, I think all operators should support the following observer.

        Observer<Integer> o = new Observer<Integer>() {
            private int count = 0;

            @Override
            public void onCompleted() {
                count++;
            }

            @Override
            public void onError(Throwable e) {
                count++;
            }

            @Override
            public void onNext(Integer args) {
                count++;
            }
        };

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

observer.onCompleted();
}
}
/**
* Observes the boundary.
*/
private static final class BoundaryObserver<T> implements Observer<T> {
final SourceObserver so;

public BoundaryObserver(SourceObserver so) {
this.so = so;
}

@Override
public void onNext(T args) {
so.emitAndReplace();
}

@Override
public void onError(Throwable e) {
so.onError(e);
}

@Override
public void onCompleted() {
so.onCompleted();
}
}
}
}
133 changes: 131 additions & 2 deletions rxjava-core/src/test/java/rx/operators/OperationBufferTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.*;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
Expand Down Expand Up @@ -383,4 +383,133 @@ public void testBufferStopsWhenUnsubscribed1() {

inOrder.verifyNoMoreInteractions();
}

@Test
public void bufferWithBONormal1() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onNext(1);
source.onNext(2);
source.onNext(3);

boundary.onNext(1);

inOrder.verify(o, times(1)).onNext(Arrays.asList(1, 2, 3));

source.onNext(4);
source.onNext(5);

boundary.onNext(2);

inOrder.verify(o, times(1)).onNext(Arrays.asList(4, 5));

source.onNext(6);
boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList(6));

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaBoundary() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaSource() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}
@Test
public void bufferWithBOEmptyLastViaBoth() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);
InOrder inOrder = Mockito.inOrder(o);

source.buffer(boundary).subscribe(o);

source.onCompleted();
boundary.onCompleted();

inOrder.verify(o, times(1)).onNext(Arrays.asList());

inOrder.verify(o).onCompleted();

verify(o, never()).onError(any(Throwable.class));
}

@Test
public void bufferWithBOSourceThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

source.buffer(boundary).subscribe(o);
source.onNext(1);
source.onError(new OperationReduceTest.CustomException());

verify(o).onError(any(OperationReduceTest.CustomException.class));
verify(o, never()).onCompleted();
verify(o, never()).onNext(any());
}

@Test
public void bufferWithBOBoundaryThrows() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();

@SuppressWarnings("unchecked")
Observer<Object> o = mock(Observer.class);

source.buffer(boundary).subscribe(o);

source.onNext(1);
boundary.onError(new OperationReduceTest.CustomException());

verify(o).onError(any(OperationReduceTest.CustomException.class));
verify(o, never()).onCompleted();
verify(o, never()).onNext(any());
}
}