Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,35 @@ public async Task ToAsyncEnumerable_Observable_Cancel()
stop.WaitOne();
}

[Fact]
public async Task ToAsyncEnumerable_Observable_Cancel_InFlight()
{
var xs = new MyObservable<int>(obs =>
{
var cts = new CancellationTokenSource();

Task.Run(async () =>
{
for (var i = 0; !cts.IsCancellationRequested; i++)
{
await Task.Delay(10);
obs.OnNext(i);
}
});

return new MyDisposable(cts.Cancel);
}).ToAsyncEnumerable();

using var c = new CancellationTokenSource();

await using var e = xs.GetAsyncEnumerator(c.Token);

var task = e.MoveNextAsync();
c.Cancel();

await AssertThrowsAsync<TaskCanceledException>(task.AsTask());
}

[Fact]
public async Task ToAsyncEnumerable_Observable6_Async()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,32 @@ private void Dispose()

private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose();

private void OnCanceled(object? state) => Dispose();
private void OnCanceled(object? state)
{
var cancelledTcs = default(TaskCompletionSource<bool>);

Dispose();

while (true)
{
var signal = Volatile.Read(ref _signal);

if (signal != null)
{
if (signal.TrySetCanceled(_cancellationToken))
return;
}

if (cancelledTcs == null)
{
cancelledTcs = new TaskCompletionSource<bool>();
cancelledTcs.TrySetCanceled(_cancellationToken);
}

if (Interlocked.CompareExchange(ref _signal, cancelledTcs, signal) == signal)
return;
}
}

private Task Resume()
{
Expand Down