Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 17 additions & 25 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,16 +192,10 @@ private void CreateTimer()
if (isShift)
_nextShift += _timeShift;

m.Disposable = _scheduler.Schedule(new State { isSpan = isSpan, isShift = isShift }, ts, Tick);
m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => tuple.@this.Tick(tuple.isSpan, tuple.isShift));
}

private struct State
{
public bool isSpan;
public bool isShift;
}

private IDisposable Tick(IScheduler self, State state)
private IDisposable Tick(bool isSpan, bool isShift)
{
lock (_gate)
{
Expand All @@ -211,13 +205,13 @@ private IDisposable Tick(IScheduler self, State state)
// took a breaking change in v2 to ensure consistency across overloads. For more info,
// see the comment in Tick for Window.
//
if (state.isSpan)
if (isSpan)
{
var s = _q.Dequeue();
ForwardOnNext(s);
}

if (state.isShift)
if (isShift)
{
CreateWindow();
}
Expand Down Expand Up @@ -296,7 +290,7 @@ public void Run(TimeHopping parent)
{
_list = new List<TSource>();

Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(parent._timeSpan, Tick));
Disposable.SetSingle(ref _periodicDisposable, parent._scheduler.SchedulePeriodic(this, parent._timeSpan, @this => @this.Tick()));
base.Run(parent._source);
}

Expand Down Expand Up @@ -408,10 +402,10 @@ private void CreateTimer(int id)
var m = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _timerSerial, m);

m.Disposable = _parent._scheduler.Schedule(id, _parent._timeSpan, Tick);
m.Disposable = _parent._scheduler.Schedule((@this: this, id), _parent._timeSpan, (_, tuple) => tuple.@this.Tick(tuple.id));
}

private IDisposable Tick(IScheduler self, int id)
private IDisposable Tick(int id)
{
var d = Disposable.Empty;

Expand Down Expand Up @@ -548,9 +542,9 @@ private void CreateBufferClose()
return;
}

var closingSubscription = new SingleAssignmentDisposable();
Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingSubscription);
closingSubscription.Disposable = bufferClose.SubscribeSafe(new BufferClosingObserver(this, closingSubscription));
var closingObserver = new BufferClosingObserver(this);
Disposable.TrySetSerial(ref _bufferClosingSerialDisposable, closingObserver);
closingObserver.SetResource(bufferClose.SubscribeSafe(closingObserver));
}

private void CloseBuffer(IDisposable closingSubscription)
Expand All @@ -567,30 +561,28 @@ private void CloseBuffer(IDisposable closingSubscription)
_bufferGate.Wait(this, @this => @this.CreateBufferClose());
}

private sealed class BufferClosingObserver : IObserver<TBufferClosing>
private sealed class BufferClosingObserver : SafeObserver<TBufferClosing>
{
private readonly _ _parent;
private readonly IDisposable _self;

public BufferClosingObserver(_ parent, IDisposable self)
public BufferClosingObserver(_ parent)
{
_parent = parent;
_self = self;
}

public void OnNext(TBufferClosing value)
public override void OnNext(TBufferClosing value)
{
_parent.CloseBuffer(_self);
_parent.CloseBuffer(this);
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
_parent.OnError(error);
}

public void OnCompleted()
public override void OnCompleted()
{
_parent.CloseBuffer(_self);
_parent.CloseBuffer(this);
}
}

Expand Down