Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 18 additions & 28 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Collect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,38 @@ public Collect(IObservable<TSource> source, Func<TResult> getInitialCollector, F
_getNewCollector = getNewCollector;
}

protected override PushToPullSink<TSource, TResult> Run(IDisposable subscription)
{
var sink = new _(this, subscription);
sink.Initialize();
return sink;
}
protected override PushToPullSink<TSource, TResult> Run() => new _(_merge, _getNewCollector, _getInitialCollector());

private sealed class _ : PushToPullSink<TSource, TResult>
{
// CONSIDER: This sink has a parent reference that can be considered for removal.
readonly object _gate;
readonly Func<TResult, TSource, TResult> _merge;
readonly Func<TResult, TResult> _getNewCollector;

private readonly Collect<TSource, TResult> _parent;

public _(Collect<TSource, TResult> parent, IDisposable subscription)
: base(subscription)
public _(Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector, TResult collector)
{
_parent = parent;
_gate = new object();
_merge = merge;
_getNewCollector = getNewCollector;
_collector = collector;
}

private object _gate;
private TResult _collector;
private bool _hasFailed;
private Exception _error;
private bool _hasCompleted;
private bool _done;

public void Initialize()
{
_gate = new object();
_collector = _parent._getInitialCollector();
}

public override void OnNext(TSource value)
{
lock (_gate)
{
try
{
_collector = _parent._merge(_collector, value);
_collector = _merge(_collector, value);
}
catch (Exception ex)
{
_error = ex;
_hasFailed = true;

Dispose();
}
Expand All @@ -75,7 +63,6 @@ public override void OnError(Exception error)
lock (_gate)
{
_error = error;
_hasFailed = true;
}
}

Expand All @@ -93,18 +80,21 @@ public override bool TryMoveNext(out TResult current)
{
lock (_gate)
{
if (_hasFailed)
var error = _error;
if (error != null)
{
current = default(TResult);
_error.Throw();
current = default;
_collector = default;
error.Throw();
}
else
{
if (_hasCompleted)
{
if (_done)
{
current = default(TResult);
current = default;
_collector = default;
return false;
}

Expand All @@ -117,7 +107,7 @@ public override bool TryMoveNext(out TResult current)

try
{
_collector = _parent._getNewCollector(current);
_collector = _getNewCollector(current);
}
catch
{
Expand Down
7 changes: 3 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ public Latest(IObservable<TSource> source)
{
}

protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
protected override PushToPullSink<TSource, TSource> Run()
{
return new _(subscription);
return new _();
}

private sealed class _ : PushToPullSink<TSource, TSource>
{
private readonly object _gate;
private readonly SemaphoreSlim _semaphore;

public _(IDisposable subscription)
: base(subscription)
public _()
{
_gate = new object();
_semaphore = new SemaphoreSlim(0, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ public MostRecent(IObservable<TSource> source, TSource initialValue)
_initialValue = initialValue;
}

protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
protected override PushToPullSink<TSource, TSource> Run()
{
return new _(_initialValue, subscription);
return new _(_initialValue);
}

private sealed class _ : PushToPullSink<TSource, TSource>
{
public _(TSource initialValue, IDisposable subscription)
: base(subscription)
public _(TSource initialValue)
{
_kind = NotificationKind.OnNext;
_value = initialValue;
Expand Down
7 changes: 3 additions & 4 deletions Rx.NET/Source/src/System.Reactive/Linq/Observable/Next.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,17 @@ public Next(IObservable<TSource> source)
{
}

protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
protected override PushToPullSink<TSource, TSource> Run()
{
return new _(subscription);
return new _();
}

private sealed class _ : PushToPullSink<TSource, TSource>
{
private readonly object _gate;
private readonly SemaphoreSlim _semaphore;

public _(IDisposable subscription)
: base(subscription)
public _()
{
_gate = new object();
_semaphore = new SemaphoreSlim(0, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,17 @@ public PushToPullAdapter(IObservable<TSource> source)

public IEnumerator<TResult> GetEnumerator()
{
var d = new SingleAssignmentDisposable();
var res = Run(d);
d.Disposable = _source.SubscribeSafe(res);
var res = Run();
res.SetUpstream(_source.SubscribeSafe(res));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please evaluate inheriting PushToPullSink from Sink and using base.Run here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sink is not good here as there is no downstream IObserver to talk to.

return res;
}

protected abstract PushToPullSink<TSource, TResult> Run(IDisposable subscription);
protected abstract PushToPullSink<TSource, TResult> Run();
}

internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>, IDisposable
{
private readonly IDisposable _subscription;

public PushToPullSink(IDisposable subscription)
{
_subscription = subscription;
}
private IDisposable _upstream;

public abstract void OnNext(TSource value);
public abstract void OnError(Exception error);
Expand All @@ -59,7 +53,7 @@ public bool MoveNext()
else
{
_done = true;
_subscription.Dispose();
Dispose();
}
}

Expand All @@ -81,7 +75,12 @@ public void Reset()

public void Dispose()
{
_subscription.Dispose();
Disposable.TryDispose(ref _upstream);
}

public void SetUpstream(IDisposable d)
{
Disposable.SetSingle(ref _upstream, d);
}
}
}