Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 29 additions & 48 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public S(TParent parent, IObserver<TSource> observer)
_scheduler = parent._scheduler;
}

private IDisposable _sourceSubscription;

protected IStopwatch _watch;
protected TimeSpan _delay;
protected bool _ready;
Expand All @@ -62,30 +60,21 @@ public S(TParent parent, IObserver<TSource> observer)

public override void Run(TParent parent)
{
_active = false;
_running = false;
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
_hasCompleted = false;
_completeAt = default(TimeSpan);
_hasFailed = false;
_exception = default(Exception);

_watch = _scheduler.StartStopwatch();

RunCore(parent);

Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
base.Run(parent._source);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest removing those init-to-default lines above this and move _queue = ... into the constructor/field initializer.

}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
Disposable.TryDispose(ref _sourceSubscription);
Disposable.TryDispose(ref _cancelable);
}

base.Dispose(disposing);
}

protected abstract void RunCore(TParent parent);
Expand All @@ -112,7 +101,7 @@ public override void OnNext(TSource value)

public override void OnError(Exception error)
{
Disposable.TryDispose(ref _sourceSubscription);
DisposeUpstream();

var shouldRun = false;

Expand All @@ -134,7 +123,7 @@ public override void OnError(Exception error)

public override void OnCompleted()
{
Disposable.TryDispose(ref _sourceSubscription);
DisposeUpstream();

var shouldRun = false;

Expand Down Expand Up @@ -276,11 +265,9 @@ public L(TParent parent, IObserver<TSource> observer)
_scheduler = parent._scheduler;
}

private IDisposable _sourceSubscription;

protected IStopwatch _watch;
protected TimeSpan _delay;
protected Queue<System.Reactive.TimeInterval<TSource>> _queue;
protected Queue<System.Reactive.TimeInterval<TSource>> _queue = new Queue<System.Reactive.TimeInterval<TSource>>();

private CancellationTokenSource _stop;
private bool _hasCompleted;
Expand All @@ -290,27 +277,21 @@ public L(TParent parent, IObserver<TSource> observer)

public override void Run(TParent parent)
{
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
_hasCompleted = false;
_completeAt = default(TimeSpan);
_hasFailed = false;
_exception = default(Exception);

_watch = _scheduler.StartStopwatch();

RunCore(parent);

Disposable.SetSingle(ref _sourceSubscription, parent._source.SubscribeSafe(this));
base.Run(parent._source);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, no need to initialize fields to their default value.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the default value initializations.

}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);

if (disposing)
{
Disposable.TryDispose(ref _sourceSubscription);
Disposable.TryDispose(ref _cancelable);
}
base.Dispose(disposing);
}

protected abstract void RunCore(TParent parent);
Expand All @@ -325,7 +306,6 @@ protected void ScheduleDrain()

public override void OnNext(TSource value)
{

lock (_gate)
{
var next = _watch.Elapsed.Add(_delay);
Expand All @@ -338,7 +318,7 @@ public override void OnNext(TSource value)

public override void OnError(Exception error)
{
Disposable.TryDispose(ref _sourceSubscription);
DisposeUpstream();

lock (_gate)
{
Expand All @@ -353,8 +333,7 @@ public override void OnError(Exception error)

public override void OnCompleted()
{
Disposable.TryDispose(ref _sourceSubscription);

DisposeUpstream();

lock (_gate)
{
Expand Down Expand Up @@ -490,10 +469,10 @@ protected override void RunCore(Absolute parent)
{
_ready = false;

Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
}

private void Start()
private IDisposable Start()
{
var next = default(TimeSpan);
var shouldRun = false;
Expand Down Expand Up @@ -526,6 +505,8 @@ private void Start()
{
Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
}

return Disposable.Empty;
}
}

Expand All @@ -541,10 +522,10 @@ protected override void RunCore(Absolute parent)
// ScheduleDrain might have already set a newer disposable
// using TrySetSerial would cancel it, stopping the emission
// and hang the consumer
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(parent._dueTime, Start));
Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
}

private void Start()
private IDisposable Start()
{
lock (_gate)
{
Expand All @@ -561,6 +542,8 @@ private void Start()
}

ScheduleDrain();

return Disposable.Empty;
}
}
}
Expand Down Expand Up @@ -674,9 +657,9 @@ public override void OnNext(TSource value)
return;
}

var d = new SingleAssignmentDisposable();
_delays.Add(d);
d.Disposable = delay.SubscribeSafe(new DelayObserver(this, value, d));
var observer = new DelayObserver(this, value);
_delays.Add(observer);
observer.SetResource(delay.SubscribeSafe(observer));
}

public override void OnError(Exception error)
Expand Down Expand Up @@ -706,45 +689,43 @@ private void CheckDone()
}
}

private sealed class DelayObserver : IObserver<TDelay>
private sealed class DelayObserver : SafeObserver<TDelay>
{
private readonly _ _parent;
private readonly TSource _value;
private readonly IDisposable _self;

public DelayObserver(_ parent, TSource value, IDisposable self)
public DelayObserver(_ parent, TSource value)
{
_parent = parent;
_value = value;
_self = self;
}

public void OnNext(TDelay value)
public override void OnNext(TDelay value)
{
lock (_parent._gate)
{
_parent.ForwardOnNext(_value);

_parent._delays.Remove(_self);
_parent._delays.Remove(this);
_parent.CheckDone();
}
}

public void OnError(Exception error)
public override void OnError(Exception error)
{
lock (_parent._gate)
{
_parent.ForwardOnError(error);
}
}

public void OnCompleted()
public override void OnCompleted()
{
lock (_parent._gate)
{
_parent.ForwardOnNext(_value);

_parent._delays.Remove(_self);
_parent._delays.Remove(this);
_parent.CheckDone();
}
}
Expand Down