Skip to content

Commit 103c60e

Browse files
authored
Merge pull request #1158 from danielcweber/Fix1157
Cancel in-flight TaskCompletionSources in ToAsyncEnumerable(Observable).
2 parents 06a94bc + fa1c97f commit 103c60e

File tree

2 files changed

+55
-1
lines changed

2 files changed

+55
-1
lines changed

Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToAsyncEnumerable.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,35 @@ public async Task ToAsyncEnumerable_Observable_Cancel()
393393
stop.WaitOne();
394394
}
395395

396+
[Fact]
397+
public async Task ToAsyncEnumerable_Observable_Cancel_InFlight()
398+
{
399+
var xs = new MyObservable<int>(obs =>
400+
{
401+
var cts = new CancellationTokenSource();
402+
403+
Task.Run(async () =>
404+
{
405+
for (var i = 0; !cts.IsCancellationRequested; i++)
406+
{
407+
await Task.Delay(10);
408+
obs.OnNext(i);
409+
}
410+
});
411+
412+
return new MyDisposable(cts.Cancel);
413+
}).ToAsyncEnumerable();
414+
415+
using var c = new CancellationTokenSource();
416+
417+
await using var e = xs.GetAsyncEnumerator(c.Token);
418+
419+
var task = e.MoveNextAsync();
420+
c.Cancel();
421+
422+
await AssertThrowsAsync<TaskCanceledException>(task.AsTask());
423+
}
424+
396425
[Fact]
397426
public async Task ToAsyncEnumerable_Observable6_Async()
398427
{

Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.Observable.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,32 @@ private void Dispose()
169169

170170
private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose();
171171

172-
private void OnCanceled(object? state) => Dispose();
172+
private void OnCanceled(object? state)
173+
{
174+
var cancelledTcs = default(TaskCompletionSource<bool>);
175+
176+
Dispose();
177+
178+
while (true)
179+
{
180+
var signal = Volatile.Read(ref _signal);
181+
182+
if (signal != null)
183+
{
184+
if (signal.TrySetCanceled(_cancellationToken))
185+
return;
186+
}
187+
188+
if (cancelledTcs == null)
189+
{
190+
cancelledTcs = new TaskCompletionSource<bool>();
191+
cancelledTcs.TrySetCanceled(_cancellationToken);
192+
}
193+
194+
if (Interlocked.CompareExchange(ref _signal, cancelledTcs, signal) == signal)
195+
return;
196+
}
197+
}
173198

174199
private Task Resume()
175200
{

0 commit comments

Comments
 (0)