Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 64 additions & 37 deletions Rx.NET/Source/src/System.Reactive/EventPatternSourceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,66 @@ namespace System.Reactive
/// <typeparam name="TEventArgs">The type of the event data generated by the event.</typeparam>
public abstract class EventPatternSourceBase<TSender, TEventArgs>
{
private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>
{
private bool _isDone;
private bool _isAdded;
private readonly Delegate _handler;
private readonly object _gate = new object();
private readonly Action<TSender, TEventArgs> _invoke;
private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase;

public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, Action<TSender, TEventArgs> invoke)
{
_handler = handler;
_invoke = invoke;
_sourceBase = sourceBase;
}

protected override void OnNextCore(EventPattern<TSender, TEventArgs> value)
{
_sourceBase._invokeHandler(_invoke, value);
}

protected override void OnErrorCore(Exception error)
{
Remove();
error.Throw();
}

protected override void OnCompletedCore()
{
Remove();
}

private void Remove()
{
lock (_gate)
{
if (_isAdded)
{
_sourceBase.Remove(_handler);
}
else
{
_isDone = true;
}
}
}

public void SetResource(IDisposable resource)
{
lock (_gate)
{
if (!_isDone)
{
_sourceBase.Add(_handler, resource);
_isAdded = true;
}
}
}
}

private readonly IObservable<EventPattern<TSender, TEventArgs>> _source;
private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
private readonly Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> _invokeHandler;
Expand Down Expand Up @@ -50,50 +110,18 @@ protected void Add(Delegate handler, Action<TSender, TEventArgs> invoke)
throw new ArgumentNullException(nameof(invoke));
}

var gate = new object();
var isAdded = false;
var isDone = false;

var remove = new Action(() =>
{
lock (gate)
{
if (isAdded)
{
Remove(handler);
}
else
{
isDone = true;
}
}
});

var observer = new Observer(this, handler, invoke);
//
// [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
//
var d = _source.Subscribe/*Unsafe*/(
x => _invokeHandler(invoke, /*this,*/ x),
ex => { remove(); ex.Throw(); },
remove
);

lock (gate)
{
if (!isDone)
{
Add(handler, d);
isAdded = true;
}
}
observer.SetResource(_source.Subscribe(observer));
}

private void Add(Delegate handler, IDisposable disposable)
{
lock (_subscriptions)
{
var l = new Stack<IDisposable>();
if (!_subscriptions.TryGetValue(handler, out l))
if (!_subscriptions.TryGetValue(handler, out var l))
{
_subscriptions[handler] = l = new Stack<IDisposable>();
}
Expand All @@ -118,8 +146,7 @@ protected void Remove(Delegate handler)

lock (_subscriptions)
{
var l = new Stack<IDisposable>();
if (_subscriptions.TryGetValue(handler, out l))
if (_subscriptions.TryGetValue(handler, out var l))
{
d = l.Pop();

Expand Down