Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading;
using BenchmarkDotNet.Attributes;

namespace Benchmarks.System.Reactive
{
[MemoryDiagnoser]
public class BufferCountBenchmark
{
IList<int> _store;

[Benchmark]
public void Exact()
{
Observable.Range(1, 1000)
.Buffer(1)
.Subscribe(v => Volatile.Write(ref _store, v));
}

[Benchmark]
public void Skip()
{
Observable.Range(1, 1000)
.Buffer(1, 2)
.Subscribe(v => Volatile.Write(ref _store, v));
}

[Benchmark]
public void Overlap()
{
Observable.Range(1, 1000)
.Buffer(2, 1)
.Subscribe(v => Volatile.Write(ref _store, v));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ static void Main()
var switcher = new BenchmarkSwitcher(new[] {
typeof(ZipBenchmark),
typeof(CombineLatestBenchmark),
typeof(SwitchBenchmark)
typeof(SwitchBenchmark),
typeof(BufferCountBenchmark)
});

switcher.Run();
Expand Down
191 changes: 171 additions & 20 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,196 @@ namespace System.Reactive.Linq.ObservableImpl
{
internal static class Buffer<TSource>
{
internal sealed class Count : Producer<IList<TSource>, Count._>
internal sealed class CountExact : Producer<IList<TSource>, CountExact.ExactSink>
{
readonly IObservable<TSource> _source;

readonly int _count;

public CountExact(IObservable<TSource> source, int count)
{
_source = source;
_count = count;
}

protected override ExactSink CreateSink(IObserver<IList<TSource>> observer) => new ExactSink(observer, _count);

protected override void Run(ExactSink sink) => sink.Run(_source);

internal sealed class ExactSink : Sink<TSource, IList<TSource>>
{
readonly int _count;

int _index;

IList<TSource> _buffer;

internal ExactSink(IObserver<IList<TSource>> observer, int count) : base(observer)
{
_count = count;
}

public override void OnNext(TSource value)
{
var buffer = _buffer;
if (buffer == null)
{
buffer = new List<TSource>();
_buffer = buffer;
}

buffer.Add(value);

var idx = _index + 1;
if (idx == _count)
{
_buffer = null;
_index = 0;
ForwardOnNext(buffer);
}
else
{
_index = idx;
}
}

public override void OnError(Exception error)
{
_buffer = null;
ForwardOnError(error);
}

public override void OnCompleted()
{
var buffer = _buffer;
_buffer = null;

if (buffer != null)
{
ForwardOnNext(buffer);
}
ForwardOnCompleted();
}
}
}

internal sealed class CountSkip : Producer<IList<TSource>, CountSkip.SkipSink>
{
readonly IObservable<TSource> _source;

readonly int _count;

readonly int _skip;

public CountSkip(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}

protected override SkipSink CreateSink(IObserver<IList<TSource>> observer) => new SkipSink(observer, _count, _skip);

protected override void Run(SkipSink sink) => sink.Run(_source);

internal sealed class SkipSink : Sink<TSource, IList<TSource>>
{
readonly int _count;

readonly int _skip;

int _index;

IList<TSource> _buffer;

internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer)
{
_count = count;
_skip = skip;
}

public override void OnNext(TSource value)
{
var idx = _index;
var buffer = _buffer;
if (idx == 0)
{
buffer = new List<TSource>();
_buffer = buffer;
}

buffer?.Add(value);

if (++idx == _count)
{
_buffer = null;
ForwardOnNext(buffer);
}

if (idx == _skip)
{
_index = 0;
}
else
{
_index = idx;
}
}

public override void OnError(Exception error)
{
_buffer = null;
ForwardOnError(error);
}

public override void OnCompleted()
{
var buffer = _buffer;
_buffer = null;

if (buffer != null)
{
ForwardOnNext(buffer);
}
ForwardOnCompleted();
}
}
}

internal sealed class CountOverlap : Producer<IList<TSource>, CountOverlap.OverlapSink>
{
private readonly IObservable<TSource> _source;
private readonly int _count;
private readonly int _skip;

public Count(IObservable<TSource> source, int count, int skip)
public CountOverlap(IObservable<TSource> source, int count, int skip)
{
_source = source;
_count = count;
_skip = skip;
}

protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(this, observer);
protected override OverlapSink CreateSink(IObserver<IList<TSource>> observer) => new OverlapSink(observer, _count, _skip);

protected override void Run(_ sink) => sink.Run(_source);
protected override void Run(OverlapSink sink) => sink.Run(_source);

internal sealed class _ : Sink<TSource, IList<TSource>>
internal sealed class OverlapSink : Sink<TSource, IList<TSource>>
{
private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>();
private readonly Queue<IList<TSource>> _queue;

private readonly int _count;
private readonly int _skip;

public _(Count parent, IObserver<IList<TSource>> observer)
: base(observer)
{
_count = parent._count;
_skip = parent._skip;
}

private int _n;
int _index;
int _n;

public override void Run(IObservable<TSource> source)
public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip)
: base(observer)
{
_n = 0;

_queue = new Queue<IList<TSource>>();
_count = count;
_skip = skip;
CreateWindow();
base.Run(source);
}

private void CreateWindow()
Expand Down Expand Up @@ -77,8 +228,8 @@ public override void OnNext(TSource value)

public override void OnError(Exception error)
{
while (_queue.Count > 0)
_queue.Dequeue().Clear();
// just drop the ILists on the GC floor, no reason to clear them
_queue.Clear();

ForwardOnError(error);
}
Expand Down
18 changes: 11 additions & 7 deletions Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,21 @@ public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> s

public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
{
return Buffer_<TSource>(source, count, count);
return new Buffer<TSource>.CountExact(source, count);
}

public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
{
return Buffer_<TSource>(source, count, skip);
}

private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
{
return new Buffer<TSource>.Count(source, count, skip);
if (count > skip)
{
return new Buffer<TSource>.CountOverlap(source, count, skip);
}
else if (count < skip)
{
return new Buffer<TSource>.CountSkip(source, count, skip);
}
// count == skip
return new Buffer<TSource>.CountExact(source, count);
}

#endregion
Expand Down