-
Notifications
You must be signed in to change notification settings - Fork 782
Description
I think I've found a bug or at least missing feature in System.Interactive.Async, version 6.0.3, tested on .NET 9.0.9 (on Windows).
When using AsyncEnumerableEx.Merge, it seems that the async enumeration of the resulting IAsyncEnumerable sequence cannot be aborted as long as one of the underlying sequence(s) are still active.
The following is a complete repro sample. It is based on an infinite sequence that produces a single item, then waits indefinitely until canceled. The sequence is enumerated by an await foreach loop, producing a derived sequence that optionally throws an exception or yields the item. The outermost code again loops through the resulting sequence, breaking the loop after the first item or exception.
When the infinite sequence is wrapped by Merge (shouldMerge = true), the program will hang, i.e., breaking the loop doesn't work and the program will continue waiting for the infinite sequence. The exception, if thrown, isn't observed at all.
When Merge is not used (shouldMerge = false), the program will run correctly and terminate, either after the first item or the exception.
using System.Runtime.CompilerServices;
internal static class Program
{
public static async Task Main(string[] args)
{
const bool shouldMerge = true;
const bool shouldThrow = true;
var inner = Enumerate(shouldMerge, shouldThrow, CancellationToken.None);
try
{
await foreach (var message in inner)
{
Console.WriteLine(message);
break;
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
Console.WriteLine("After");
}
private static async IAsyncEnumerable<string> Enumerate(bool shouldMerge, bool shouldThrow, [EnumeratorCancellation] CancellationToken cancellationToken)
{
var infinite = shouldMerge
? AsyncEnumerableEx.Merge(Infinite(cancellationToken))
: Infinite(cancellationToken);
await foreach (var item in infinite.WithCancellation(cancellationToken))
{
if (shouldThrow)
throw new Exception("Test");
yield return item;
}
}
private static async IAsyncEnumerable<string> Infinite([EnumeratorCancellation] CancellationToken cancellationToken)
{
yield return "first item";
while (true)
{
await Task.Delay(1000, cancellationToken);
}
}
}
A zipped solution can be downloaded here:
2025-11-01 AsyncEnumerableCatch.zip
As I'm not an expert with working with IAsyncEnumerable or Ix.NET, it might well be that I'm missing something. If so, please tell me so. :)