Skip to content

Commit 15f1cc6

Browse files
authored
Merge pull request #1359 from dotnet/dev/bartde/rx_nullable_part17
Enable #nullable for some straightforward operators.
2 parents 26bb960 + 48cc060 commit 15f1cc6

File tree

8 files changed

+41
-51
lines changed

8 files changed

+41
-51
lines changed

Rx.NET/Source/src/System.Reactive/Linq/Observable/Amb.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Threading;
86

97
namespace System.Reactive.Linq.ObservableImpl

Rx.NET/Source/src/System.Reactive/Linq/Observable/AmbMany.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Generic;
86
using System.Linq;
97
using System.Reactive.Disposables;
@@ -57,18 +55,22 @@ protected override IDisposable Run(IObserver<T> observer)
5755
internal sealed class AmbCoordinator<T> : IDisposable
5856
{
5957
private readonly IObserver<T> _downstream;
60-
private readonly InnerObserver[] _observers;
58+
private readonly InnerObserver?[] _observers;
6159
private int _winner;
6260

6361
internal AmbCoordinator(IObserver<T> downstream, int n)
6462
{
6563
_downstream = downstream;
66-
var o = new InnerObserver[n];
64+
65+
var o = new InnerObserver?[n];
66+
6767
for (var i = 0; i < n; i++)
6868
{
6969
o[i] = new InnerObserver(this, i);
7070
}
71+
7172
_observers = o;
73+
7274
Volatile.Write(ref _winner, -1);
7375
}
7476

@@ -98,10 +100,12 @@ internal void Subscribe(IObservable<T>[] sources)
98100
for (var i = 0; i < _observers.Length; i++)
99101
{
100102
var inner = Volatile.Read(ref _observers[i]);
103+
101104
if (inner == null)
102105
{
103106
break;
104107
}
108+
105109
inner.Run(sources[i]);
106110
}
107111
}
@@ -125,8 +129,10 @@ private bool TryWin(int index)
125129
Interlocked.Exchange(ref _observers[i], null)?.Dispose();
126130
}
127131
}
132+
128133
return true;
129134
}
135+
130136
return false;
131137
}
132138

Rx.NET/Source/src/System.Reactive/Linq/Observable/Case.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Generic;
86

97
namespace System.Reactive.Linq.ObservableImpl
108
{
119
internal sealed class Case<TValue, TResult> : Producer<TResult, Case<TValue, TResult>._>, IEvaluatableObservable<TResult>
10+
where TValue : notnull
1211
{
1312
private readonly Func<TValue> _selector;
1413
private readonly IDictionary<TValue, IObservable<TResult>> _sources;

Rx.NET/Source/src/System.Reactive/Linq/Observable/Catch.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Generic;
86
using System.Reactive.Disposables;
97
using System.Threading;
@@ -30,7 +28,7 @@ public _(IObserver<TSource> observer)
3028
{
3129
}
3230

33-
protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
31+
protected override IEnumerable<IObservable<TSource>>? Extract(IObservable<TSource> source)
3432
{
3533
if (source is Catch<TSource> @catch)
3634
{
@@ -40,7 +38,7 @@ protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource
4038
return null;
4139
}
4240

43-
private Exception _lastException;
41+
private Exception? _lastException;
4442

4543
public override void OnError(Exception error)
4644
{
@@ -99,7 +97,7 @@ public _(Func<TException, IObservable<TSource>> handler, IObserver<TSource> obse
9997
}
10098

10199
private bool _once;
102-
private IDisposable _subscription;
100+
private IDisposable? _subscription;
103101

104102
public override void Run(IObservable<TSource> source)
105103
{
@@ -112,6 +110,7 @@ protected override void Dispose(bool disposing)
112110
{
113111
Disposable.Dispose(ref _subscription);
114112
}
113+
115114
base.Dispose(disposing);
116115
}
117116

Rx.NET/Source/src/System.Reactive/Linq/Observable/Concat.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Generic;
86

97
namespace System.Reactive.Linq.ObservableImpl

Rx.NET/Source/src/System.Reactive/Linq/Observable/ConcatMany.cs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Concurrent;
86
using System.Reactive.Disposables;
97
using System.Threading;
@@ -25,9 +23,12 @@ public IDisposable Subscribe(IObserver<T> observer)
2523
{
2624
throw new ArgumentNullException(nameof(observer));
2725
}
26+
2827
var parent = new ConcatManyOuterObserver(observer);
28+
2929
var d = _sources.SubscribeSafe(parent);
3030
parent.OnSubscribe(d);
31+
3132
return parent;
3233
}
3334

@@ -36,9 +37,10 @@ internal sealed class ConcatManyOuterObserver : IObserver<IObservable<T>>, IDisp
3637
private readonly IObserver<T> _downstream;
3738
private readonly ConcurrentQueue<IObservable<T>> _queue;
3839
private readonly InnerObserver _innerObserver;
39-
private IDisposable _upstream;
40+
41+
private IDisposable? _upstream;
4042
private int _trampoline;
41-
private Exception _error;
43+
private Exception? _error;
4244
private bool _done;
4345
private int _active;
4446

@@ -176,7 +178,7 @@ internal sealed class InnerObserver : IObserver<T>, IDisposable
176178
{
177179
private readonly ConcatManyOuterObserver _parent;
178180

179-
internal IDisposable Upstream;
181+
internal IDisposable? Upstream;
180182

181183
internal InnerObserver(ConcatManyOuterObserver parent)
182184
{
@@ -191,14 +193,16 @@ internal bool SetDisposable(SingleAssignmentDisposable sad)
191193
internal bool Finish()
192194
{
193195
var sad = Volatile.Read(ref Upstream);
196+
194197
if (sad != BooleanDisposable.True)
195198
{
196199
if (Interlocked.CompareExchange(ref Upstream, null, sad) == sad)
197200
{
198-
sad.Dispose();
201+
sad!.Dispose(); // NB: Cannot be null when we get here; SetDisposable is called before Inner[Error|Completed] calls Finish.
199202
return true;
200203
}
201204
}
205+
202206
return false;
203207
}
204208

@@ -207,20 +211,11 @@ public void Dispose()
207211
Disposable.Dispose(ref Upstream);
208212
}
209213

210-
public void OnCompleted()
211-
{
212-
_parent.InnerComplete();
213-
}
214+
public void OnCompleted() => _parent.InnerComplete();
214215

215-
public void OnError(Exception error)
216-
{
217-
_parent.InnerError(error);
218-
}
216+
public void OnError(Exception error) => _parent.InnerError(error);
219217

220-
public void OnNext(T value)
221-
{
222-
_parent.InnerNext(value);
223-
}
218+
public void OnNext(T value) => _parent.InnerNext(value);
224219
}
225220
}
226221
}

Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Collections.Generic;
86
using System.Reactive.Disposables;
97
using System.Threading;
@@ -48,7 +46,7 @@ public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer)
4846
private bool _donel;
4947
private bool _doner;
5048

51-
private IDisposable _second;
49+
private IDisposable? _second;
5250

5351
public void Run(Observable parent)
5452
{
@@ -62,6 +60,7 @@ protected override void Dispose(bool disposing)
6260
{
6361
Disposable.Dispose(ref _second);
6462
}
63+
6564
base.Dispose(disposing);
6665
}
6766

@@ -242,7 +241,7 @@ public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer)
242241
_comparer = comparer;
243242
}
244243

245-
private IEnumerator<TSource> _enumerator;
244+
private IEnumerator<TSource>? _enumerator;
246245

247246
private static readonly IEnumerator<TSource> DisposedEnumerator = MakeDisposedEnumerator();
248247

@@ -286,6 +285,7 @@ protected override void Dispose(bool disposing)
286285
{
287286
Interlocked.Exchange(ref _enumerator, DisposedEnumerator)?.Dispose();
288287
}
288+
289289
base.Dispose(disposing);
290290
}
291291

@@ -295,7 +295,7 @@ public override void OnNext(TSource value)
295295

296296
try
297297
{
298-
if (_enumerator.MoveNext())
298+
if (_enumerator!.MoveNext()) // NB: Non-null after Run is called.
299299
{
300300
var current = _enumerator.Current;
301301
equal = _comparer.Equals(value, current);
@@ -319,7 +319,7 @@ public override void OnCompleted()
319319
bool hasNext;
320320
try
321321
{
322-
hasNext = _enumerator.MoveNext();
322+
hasNext = _enumerator!.MoveNext(); // NB: Non-null after Run is called.
323323
}
324324
catch (Exception exception)
325325
{

Rx.NET/Source/src/System.Reactive/Linq/Observable/WithLatestFrom.cs

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
// The .NET Foundation licenses this file to you under the MIT License.
33
// See the LICENSE file in the project root for more information.
44

5-
#nullable disable
6-
75
using System.Reactive.Disposables;
86

97
namespace System.Reactive.Linq.ObservableImpl
@@ -27,6 +25,9 @@ public WithLatestFrom(IObservable<TFirst> first, IObservable<TSecond> second, Fu
2725

2826
internal sealed class _ : IdentitySink<TResult>
2927
{
28+
private readonly object _gate = new object();
29+
private readonly object _latestGate = new object();
30+
3031
private readonly Func<TFirst, TSecond, TResult> _resultSelector;
3132

3233
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)
@@ -35,19 +36,13 @@ public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> obser
3536
_resultSelector = resultSelector;
3637
}
3738

38-
private object _gate;
3939
private volatile bool _hasLatest;
40-
private TSecond _latest;
41-
42-
private object _latestGate;
40+
private TSecond? _latest;
4341

44-
private IDisposable _secondDisposable;
42+
private IDisposable? _secondDisposable;
4543

4644
public void Run(IObservable<TFirst> first, IObservable<TSecond> second)
4745
{
48-
_gate = new object();
49-
_latestGate = new object();
50-
5146
var fstO = new FirstObserver(this);
5247
var sndO = new SecondObserver(this);
5348

@@ -61,6 +56,7 @@ protected override void Dispose(bool disposing)
6156
{
6257
Disposable.Dispose(ref _secondDisposable);
6358
}
59+
6460
base.Dispose(disposing);
6561
}
6662

@@ -93,12 +89,11 @@ public void OnNext(TFirst value)
9389
{
9490
if (_parent._hasLatest) // Volatile read
9591
{
96-
9792
TSecond latest;
9893

9994
lock (_parent._latestGate)
10095
{
101-
latest = _parent._latest;
96+
latest = _parent._latest!; // NB: Not null when hasLatest is true.
10297
}
10398

10499
TResult res;

0 commit comments

Comments
 (0)