Skip to content

Commit ad99fb5

Browse files
authored
Rework SelectMany (#604)
1 parent 77f5113 commit ad99fb5

File tree

2 files changed

+64
-74
lines changed

2 files changed

+64
-74
lines changed

Rx.NET/Source/src/System.Reactive/Internal/Sink.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ protected void SetUpstream(IDisposable upstream)
5656
{
5757
Disposable.SetSingle(ref _upstream, upstream);
5858
}
59+
60+
protected void DisposeUpstream()
61+
{
62+
Disposable.TryDispose(ref _upstream);
63+
}
5964
}
6065

6166
/// <summary>

Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

Lines changed: 59 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -856,28 +856,15 @@ public ObservableSelector(IObservable<TSource> source, Func<TSource, IObservable
856856
internal class _ : Sink<TSource, TResult>
857857
{
858858
protected readonly object _gate = new object();
859-
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
860-
private readonly CompositeDisposable _group = new CompositeDisposable();
861-
862859
private readonly Func<TSource, IObservable<TResult>> _selector;
860+
private readonly CompositeDisposable _group = new CompositeDisposable();
861+
862+
private bool _isStopped;
863863

864864
public _(ObservableSelector parent, IObserver<TResult> observer)
865865
: base(observer)
866866
{
867867
_selector = parent._selector;
868-
869-
_group.Add(_sourceSubscription);
870-
}
871-
872-
private bool _isStopped;
873-
874-
public override void Run(IObservable<TSource> source)
875-
{
876-
_isStopped = false;
877-
878-
_sourceSubscription.Disposable = source.SubscribeSafe(this);
879-
880-
SetUpstream(_group);
881868
}
882869

883870
public override void OnNext(TSource value)
@@ -913,14 +900,22 @@ public override void OnCompleted()
913900
Final();
914901
}
915902

903+
protected override void Dispose(bool disposing)
904+
{
905+
base.Dispose(disposing);
906+
907+
if (disposing)
908+
_group.Dispose();
909+
}
910+
916911
protected void Final()
917912
{
918913
_isStopped = true;
919-
if (_group.Count == 1)
914+
if (_group.Count == 0)
920915
{
921916
//
922917
// Notice there can be a race between OnCompleted of the source and any
923-
// of the inner sequences, where both see _group.Count == 1, and one is
918+
// of the inner sequences, where both see _group.Count == 0, and one is
924919
// waiting for the lock. There won't be a double OnCompleted observation
925920
// though, because the call to Dispose silences the observer by swapping
926921
// in a NopObserver<T>.
@@ -932,46 +927,45 @@ protected void Final()
932927
}
933928
else
934929
{
935-
_sourceSubscription.Dispose();
930+
DisposeUpstream();
936931
}
937932
}
938933

939934
protected void SubscribeInner(IObservable<TResult> inner)
940935
{
941-
var innerSubscription = new SingleAssignmentDisposable();
942-
_group.Add(innerSubscription);
943-
innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
936+
var innerObserver = new InnerObserver(this);
937+
938+
_group.Add(innerObserver);
939+
innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
944940
}
945941

946-
private sealed class InnerObserver : IObserver<TResult>
942+
private sealed class InnerObserver : SafeObserver<TResult>
947943
{
948944
private readonly _ _parent;
949-
private readonly IDisposable _self;
950945

951-
public InnerObserver(_ parent, IDisposable self)
946+
public InnerObserver(_ parent)
952947
{
953948
_parent = parent;
954-
_self = self;
955949
}
956950

957-
public void OnNext(TResult value)
951+
public override void OnNext(TResult value)
958952
{
959953
lock (_parent._gate)
960954
_parent.ForwardOnNext(value);
961955
}
962956

963-
public void OnError(Exception error)
957+
public override void OnError(Exception error)
964958
{
965959
lock (_parent._gate)
966960
{
967961
_parent.ForwardOnError(error);
968962
}
969963
}
970964

971-
public void OnCompleted()
965+
public override void OnCompleted()
972966
{
973-
_parent._group.Remove(_self);
974-
if (_parent._isStopped && _parent._group.Count == 1)
967+
_parent._group.Remove(this);
968+
if (_parent._isStopped && _parent._group.Count == 0)
975969
{
976970
//
977971
// Notice there can be a race between OnCompleted of the source and any
@@ -1090,29 +1084,17 @@ public ObservableSelectorIndexed(IObservable<TSource> source, Func<TSource, int,
10901084
internal class _ : Sink<TSource, TResult>
10911085
{
10921086
private readonly object _gate = new object();
1093-
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
10941087
private readonly CompositeDisposable _group = new CompositeDisposable();
10951088

10961089
protected readonly Func<TSource, int, IObservable<TResult>> _selector;
10971090

1091+
private int _index;
1092+
private bool _isStopped;
1093+
10981094
public _(ObservableSelectorIndexed parent, IObserver<TResult> observer)
10991095
: base(observer)
11001096
{
11011097
_selector = parent._selector;
1102-
1103-
_group.Add(_sourceSubscription);
1104-
}
1105-
1106-
private bool _isStopped;
1107-
private int _index;
1108-
1109-
public override void Run(IObservable<TSource> source)
1110-
{
1111-
_isStopped = false;
1112-
1113-
_sourceSubscription.Disposable = source.SubscribeSafe(this);
1114-
1115-
SetUpstream(_group);
11161098
}
11171099

11181100
public override void OnNext(TSource value)
@@ -1148,10 +1130,18 @@ public override void OnCompleted()
11481130
Final();
11491131
}
11501132

1133+
protected override void Dispose(bool disposing)
1134+
{
1135+
base.Dispose(disposing);
1136+
1137+
if (disposing)
1138+
_group.Dispose();
1139+
}
1140+
11511141
protected void Final()
11521142
{
11531143
_isStopped = true;
1154-
if (_group.Count == 1)
1144+
if (_group.Count == 0)
11551145
{
11561146
//
11571147
// Notice there can be a race between OnCompleted of the source and any
@@ -1167,46 +1157,45 @@ protected void Final()
11671157
}
11681158
else
11691159
{
1170-
_sourceSubscription.Dispose();
1160+
DisposeUpstream();
11711161
}
11721162
}
11731163

11741164
protected void SubscribeInner(IObservable<TResult> inner)
11751165
{
1176-
var innerSubscription = new SingleAssignmentDisposable();
1177-
_group.Add(innerSubscription);
1178-
innerSubscription.Disposable = inner.SubscribeSafe(new InnerObserver(this, innerSubscription));
1166+
var innerObserver = new InnerObserver(this);
1167+
1168+
_group.Add(innerObserver);
1169+
innerObserver.SetResource(inner.SubscribeSafe(innerObserver));
11791170
}
11801171

1181-
private sealed class InnerObserver : IObserver<TResult>
1172+
private sealed class InnerObserver : SafeObserver<TResult>
11821173
{
11831174
private readonly _ _parent;
1184-
private readonly IDisposable _self;
11851175

1186-
public InnerObserver(_ parent, IDisposable self)
1176+
public InnerObserver(_ parent)
11871177
{
11881178
_parent = parent;
1189-
_self = self;
11901179
}
11911180

1192-
public void OnNext(TResult value)
1181+
public override void OnNext(TResult value)
11931182
{
11941183
lock (_parent._gate)
11951184
_parent.ForwardOnNext(value);
11961185
}
11971186

1198-
public void OnError(Exception error)
1187+
public override void OnError(Exception error)
11991188
{
12001189
lock (_parent._gate)
12011190
{
12021191
_parent.ForwardOnError(error);
12031192
}
12041193
}
12051194

1206-
public void OnCompleted()
1195+
public override void OnCompleted()
12071196
{
1208-
_parent._group.Remove(_self);
1209-
if (_parent._isStopped && _parent._group.Count == 1)
1197+
_parent._group.Remove(this);
1198+
if (_parent._isStopped && _parent._group.Count == 0)
12101199
{
12111200
//
12121201
// Notice there can be a race between OnCompleted of the source and any
@@ -1242,8 +1231,6 @@ public ObservableSelectorsIndexed(IObservable<TSource> source, Func<TSource, int
12421231
new internal sealed class _ : ObservableSelectorIndexed._
12431232
{
12441233
private readonly object _gate = new object();
1245-
private readonly SingleAssignmentDisposable _sourceSubscription = new SingleAssignmentDisposable();
1246-
private readonly CompositeDisposable _group = new CompositeDisposable();
12471234

12481235
private readonly Func<Exception, IObservable<TResult>> _selectorOnError;
12491236
private readonly Func<IObservable<TResult>> _selectorOnCompleted;
@@ -1253,8 +1240,6 @@ public _(ObservableSelectorsIndexed parent, IObserver<TResult> observer)
12531240
{
12541241
_selectorOnError = parent._selectorOnError;
12551242
_selectorOnCompleted = parent._selectorOnCompleted;
1256-
1257-
_group.Add(_sourceSubscription);
12581243
}
12591244

12601245
public override void OnError(Exception error)
@@ -1497,7 +1482,7 @@ public TaskSelector(IObservable<TSource> source, Func<TSource, CancellationToken
14971482
internal sealed class _ : Sink<TSource, TResult>
14981483
{
14991484
private readonly object _gate = new object();
1500-
private readonly CancellationDisposable _cancel = new CancellationDisposable();
1485+
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
15011486

15021487
private readonly Func<TSource, CancellationToken, Task<TResult>> _selector;
15031488

@@ -1520,7 +1505,7 @@ protected override void Dispose(bool disposing)
15201505
{
15211506
if (disposing)
15221507
{
1523-
_cancel.Dispose();
1508+
_cts.Cancel();
15241509
}
15251510
base.Dispose(disposing);
15261511
}
@@ -1531,7 +1516,7 @@ public override void OnNext(TSource value)
15311516
try
15321517
{
15331518
Interlocked.Increment(ref _count);
1534-
task = _selector(value, _cancel.Token);
1519+
task = _selector(value, _cts.Token);
15351520
}
15361521
catch (Exception ex)
15371522
{
@@ -1549,7 +1534,7 @@ public override void OnNext(TSource value)
15491534
}
15501535
else
15511536
{
1552-
task.ContinueWith(OnCompletedTask);
1537+
task.ContinueWith((closureTask, @thisObject) => ((_)@thisObject).OnCompletedTask(closureTask), this);
15531538
}
15541539
}
15551540

@@ -1575,7 +1560,7 @@ private void OnCompletedTask(Task<TResult> task)
15751560
break;
15761561
case TaskStatus.Canceled:
15771562
{
1578-
if (!_cancel.IsDisposed)
1563+
if (!_cts.IsCancellationRequested)
15791564
{
15801565
lock (_gate)
15811566
{
@@ -1626,7 +1611,7 @@ public TaskSelectorIndexed(IObservable<TSource> source, Func<TSource, int, Cance
16261611
internal sealed class _ : Sink<TSource, TResult>
16271612
{
16281613
private readonly object _gate = new object();
1629-
private readonly CancellationDisposable _cancel = new CancellationDisposable();
1614+
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
16301615

16311616
private readonly Func<TSource, int, CancellationToken, Task<TResult>> _selector;
16321617

@@ -1650,7 +1635,7 @@ protected override void Dispose(bool disposing)
16501635
{
16511636
if (disposing)
16521637
{
1653-
_cancel.Dispose();
1638+
_cts.Cancel();
16541639
}
16551640
base.Dispose(disposing);
16561641
}
@@ -1661,7 +1646,7 @@ public override void OnNext(TSource value)
16611646
try
16621647
{
16631648
Interlocked.Increment(ref _count);
1664-
task = _selector(value, checked(_index++), _cancel.Token);
1649+
task = _selector(value, checked(_index++), _cts.Token);
16651650
}
16661651
catch (Exception ex)
16671652
{
@@ -1679,7 +1664,7 @@ public override void OnNext(TSource value)
16791664
}
16801665
else
16811666
{
1682-
task.ContinueWith(OnCompletedTask);
1667+
task.ContinueWith((closureTask, @thisObject) => ((_)@thisObject).OnCompletedTask(closureTask), this);
16831668
}
16841669
}
16851670

@@ -1705,7 +1690,7 @@ private void OnCompletedTask(Task<TResult> task)
17051690
break;
17061691
case TaskStatus.Canceled:
17071692
{
1708-
if (!_cancel.IsDisposed)
1693+
if (!_cts.IsCancellationRequested)
17091694
{
17101695
lock (_gate)
17111696
{

0 commit comments

Comments
 (0)