Skip to content

Commit 93c36e0

Browse files
authored
Less allocations in SubscribeOnCtxObservable (#642)
1 parent e0aec89 commit 93c36e0

File tree

1 file changed

+35
-13
lines changed

1 file changed

+35
-13
lines changed

Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,27 +102,49 @@ public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> sou
102102

103103
sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
104104
{
105-
readonly IObservable<TSource> source;
105+
private sealed class Subscription : IDisposable
106+
{
107+
private readonly IObservable<TSource> _source;
108+
private readonly IObserver<TSource> _observer;
109+
private readonly SynchronizationContext _context;
106110

107-
readonly SynchronizationContext context;
111+
private IDisposable _cancel;
112+
113+
public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer)
114+
{
115+
_source = source;
116+
_context = context;
117+
_observer = observer;
118+
119+
context.PostWithStartComplete(
120+
@this =>
121+
{
122+
if (!Disposable.GetIsDisposed(ref @this._cancel))
123+
{
124+
Disposable.SetSingle(ref @this._cancel, new ContextDisposable(@this._context, @this._source.SubscribeSafe(@this._observer)));
125+
}
126+
},
127+
this);
128+
}
129+
130+
public void Dispose()
131+
{
132+
Disposable.TryDispose(ref _cancel);
133+
}
134+
}
135+
136+
readonly IObservable<TSource> _source;
137+
readonly SynchronizationContext _context;
108138

109139
public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
110140
{
111-
this.source = source;
112-
this.context = context;
141+
this._source = source;
142+
this._context = context;
113143
}
114144

115145
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
116146
{
117-
var subscription = new SingleAssignmentDisposable();
118-
context.PostWithStartComplete(() =>
119-
{
120-
if (!subscription.IsDisposed)
121-
{
122-
subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
123-
}
124-
});
125-
return subscription;
147+
return new Subscription(_source, _context, observer);
126148
}
127149
}
128150

0 commit comments

Comments
 (0)