Skip to content

Commit 8d279c6

Browse files
authored
Merge pull request #888 from dotnet/IxAsyncCancellation
Initial work to support EnumeratorCancellation.
2 parents 161e4ad + dc1a66c commit 8d279c6

File tree

30 files changed

+515
-9
lines changed

30 files changed

+515
-9
lines changed

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ public static IAsyncEnumerable<TSource> Amb<TSource>(this IAsyncEnumerable<TSour
2525
if (second == null)
2626
throw Error.ArgumentNull(nameof(second));
2727

28+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
29+
return Core(first, second);
30+
31+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
32+
#else
2833
return AsyncEnumerable.Create(Core);
2934

3035
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
36+
#endif
3137
{
3238
IAsyncEnumerator<TSource>? firstEnumerator = null;
3339
IAsyncEnumerator<TSource>? secondEnumerator = null;

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,15 @@ public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumer
2626
if (count <= 0)
2727
throw Error.ArgumentOutOfRange(nameof(count));
2828

29+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
30+
return Core(source, count);
31+
32+
static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source, int count, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
33+
#else
2934
return AsyncEnumerable.Create(Core);
3035

3136
async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
37+
#endif
3238
{
3339
var buffer = new List<TSource>(count);
3440

@@ -70,9 +76,15 @@ public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(this IAsyncEnumer
7076
if (skip <= 0)
7177
throw Error.ArgumentOutOfRange(nameof(skip));
7278

79+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
80+
return Core(source, count, skip);
81+
82+
static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source, int count, int skip, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
83+
#else
7384
return AsyncEnumerable.Create(Core);
7485

7586
async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
87+
#endif
7688
{
7789
var buffers = new Queue<IList<TSource>>();
7890

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,15 @@ public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEn
3333
if (handler == null)
3434
throw Error.ArgumentNull(nameof(handler));
3535

36+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
37+
return Core(source, handler);
38+
39+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
40+
#else
3641
return AsyncEnumerable.Create(Core);
3742

3843
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
44+
#endif
3945
{
4046
// REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
4147
// using the try statement either. A more trivial implementation would use await foreach
@@ -96,9 +102,15 @@ public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEn
96102
if (handler == null)
97103
throw Error.ArgumentNull(nameof(handler));
98104

105+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
106+
return Core(source, handler);
107+
108+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, ValueTask<IAsyncEnumerable<TSource>>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
109+
#else
99110
return AsyncEnumerable.Create(Core);
100111

101112
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
113+
#endif
102114
{
103115
// REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
104116
// using the try statement either. A more trivial implementation would use await foreach
@@ -160,9 +172,15 @@ public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEn
160172
if (handler == null)
161173
throw Error.ArgumentNull(nameof(handler));
162174

175+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
176+
return Core(source, handler);
177+
178+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
179+
#else
163180
return AsyncEnumerable.Create(Core);
164181

165182
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
183+
#endif
166184
{
167185
// REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
168186
// using the try statement either. A more trivial implementation would use await foreach

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Concat.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<IA
2222
if (sources == null)
2323
throw Error.ArgumentNull(nameof(sources));
2424

25+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
26+
return Core(sources);
27+
28+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
29+
#else
2530
return AsyncEnumerable.Create(Core);
2631

2732
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
33+
#endif
2834
{
2935
await foreach (var source in sources.WithCancellation(cancellationToken).ConfigureAwait(false))
3036
{
@@ -48,9 +54,15 @@ public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncE
4854
if (sources == null)
4955
throw Error.ArgumentNull(nameof(sources));
5056

57+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
58+
return Core(sources);
59+
60+
static async IAsyncEnumerable<TSource> Core(IEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
61+
#else
5162
return AsyncEnumerable.Create(Core);
5263

5364
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
65+
#endif
5466
{
5567
foreach (var source in sources)
5668
{
@@ -74,9 +86,15 @@ public static IAsyncEnumerable<TSource> Concat<TSource>(params IAsyncEnumerable<
7486
if (sources == null)
7587
throw Error.ArgumentNull(nameof(sources));
7688

89+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
90+
return Core(sources);
91+
92+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
93+
#else
7794
return AsyncEnumerable.Create(Core);
7895

7996
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
97+
#endif
8098
{
8199
foreach (var source in sources)
82100
{

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,15 @@ public static IAsyncEnumerable<TSource> Defer<TSource>(Func<IAsyncEnumerable<TSo
2222
if (factory == null)
2323
throw Error.ArgumentNull(nameof(factory));
2424

25+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
26+
return Core(factory);
27+
28+
static async IAsyncEnumerable<TSource> Core(Func<IAsyncEnumerable<TSource>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
29+
#else
2530
return AsyncEnumerable.Create(Core);
2631

2732
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
33+
#endif
2834
{
2935
await foreach (var item in factory().WithCancellation(cancellationToken).ConfigureAwait(false))
3036
{
@@ -46,9 +52,15 @@ public static IAsyncEnumerable<TSource> Defer<TSource>(Func<Task<IAsyncEnumerabl
4652
if (factory == null)
4753
throw Error.ArgumentNull(nameof(factory));
4854

55+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
56+
return Core(factory);
57+
58+
static async IAsyncEnumerable<TSource> Core(Func<Task<IAsyncEnumerable<TSource>>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
59+
#else
4960
return AsyncEnumerable.Create(Core);
5061

5162
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
63+
#endif
5264
{
5365
await foreach (var item in (await factory().ConfigureAwait(false)).WithCancellation(cancellationToken).ConfigureAwait(false))
5466
{
@@ -73,9 +85,15 @@ public static IAsyncEnumerable<TSource> Defer<TSource>(Func<CancellationToken, T
7385
if (factory == null)
7486
throw Error.ArgumentNull(nameof(factory));
7587

88+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
89+
return Core(factory);
90+
91+
static async IAsyncEnumerable<TSource> Core(Func<CancellationToken, Task<IAsyncEnumerable<TSource>>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
92+
#else
7693
return AsyncEnumerable.Create(Core);
7794

7895
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
96+
#endif
7997
{
8098
await foreach (var item in (await factory(cancellationToken).ConfigureAwait(false)).WithCancellation(cancellationToken).ConfigureAwait(false))
8199
{

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,15 @@ private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource>(IAsyn
166166
{
167167
comparer ??= EqualityComparer<TSource>.Default;
168168

169+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
170+
return Core(source, comparer);
171+
172+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
173+
#else
169174
return AsyncEnumerable.Create(Core);
170175

171176
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
177+
#endif
172178
{
173179
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
174180

@@ -201,9 +207,15 @@ private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource, TKey>
201207
{
202208
comparer ??= EqualityComparer<TKey>.Default;
203209

210+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
211+
return Core(source, keySelector, comparer);
212+
213+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
214+
#else
204215
return AsyncEnumerable.Create(Core);
205216

206217
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
218+
#endif
207219
{
208220
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
209221

@@ -240,9 +252,15 @@ private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource, TKey>
240252
{
241253
comparer ??= EqualityComparer<TKey>.Default;
242254

255+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
256+
return Core(source, keySelector, comparer);
257+
258+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
259+
#else
243260
return AsyncEnumerable.Create(Core);
244261

245262
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
263+
#endif
246264
{
247265
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
248266

@@ -280,9 +298,15 @@ private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource, TKey>
280298
{
281299
comparer ??= EqualityComparer<TKey>.Default;
282300

301+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
302+
return Core(source, keySelector, comparer);
303+
304+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
305+
#else
283306
return AsyncEnumerable.Create(Core);
284307

285308
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
309+
#endif
286310
{
287311
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
288312

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,9 +299,16 @@ public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSourc
299299

300300
private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception>? onError, Action? onCompleted)
301301
{
302+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
303+
return Core(source, onNext, onError, onCompleted);
304+
305+
// TODO: Can remove local function.
306+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception>? onError, Action? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
307+
#else
302308
return AsyncEnumerable.Create(Core);
303309

304310
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
311+
#endif
305312
{
306313
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
307314

@@ -339,9 +346,16 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
339346

340347
private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task>? onError, Func<Task>? onCompleted)
341348
{
349+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
350+
return Core(source, onNext, onError, onCompleted);
351+
352+
// TODO: Can remove local function.
353+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task>? onError, Func<Task>? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
354+
#else
342355
return AsyncEnumerable.Create(Core);
343356

344357
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
358+
#endif
345359
{
346360
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
347361

@@ -383,9 +397,16 @@ async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
383397
#if !NO_DEEP_CANCELLATION
384398
private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task>? onError, Func<CancellationToken, Task>? onCompleted)
385399
{
400+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
401+
return Core(source, onNext, onError, onCompleted);
402+
403+
// TODO: Can remove local function.
404+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task>? onError, Func<CancellationToken, Task>? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
405+
#else
386406
return AsyncEnumerable.Create(Core);
387407

388408
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
409+
#endif
389410
{
390411
await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
391412

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,15 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
2424
if (selector == null)
2525
throw Error.ArgumentNull(nameof(selector));
2626

27+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
28+
return Core(source, selector);
29+
30+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TSource>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
31+
#else
2732
return AsyncEnumerable.Create(Core);
2833

2934
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
35+
#endif
3036
{
3137
var queue = new Queue<IAsyncEnumerable<TSource>>();
3238

@@ -58,9 +64,15 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
5864
if (selector == null)
5965
throw Error.ArgumentNull(nameof(selector));
6066

67+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
68+
return Core(source, selector);
69+
70+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
71+
#else
6172
return AsyncEnumerable.Create(Core);
6273

6374
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
75+
#endif
6476
{
6577
var queue = new Queue<IAsyncEnumerable<TSource>>();
6678

@@ -93,9 +105,15 @@ public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TS
93105
if (selector == null)
94106
throw Error.ArgumentNull(nameof(selector));
95107

108+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
109+
return Core(source, selector);
110+
111+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
112+
#else
96113
return AsyncEnumerable.Create(Core);
97114

98115
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
116+
#endif
99117
{
100118
var queue = new Queue<IAsyncEnumerable<TSource>>();
101119

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,15 @@ public static IAsyncEnumerable<TSource> Finally<TSource>(this IAsyncEnumerable<T
2525
if (finallyAction == null)
2626
throw Error.ArgumentNull(nameof(finallyAction));
2727

28+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
29+
return Core(source, finallyAction);
30+
31+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Action finallyAction, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
32+
#else
2833
return AsyncEnumerable.Create(Core);
2934

3035
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
36+
#endif
3137
{
3238
try
3339
{
@@ -58,9 +64,15 @@ public static IAsyncEnumerable<TSource> Finally<TSource>(this IAsyncEnumerable<T
5864
if (finallyAction == null)
5965
throw Error.ArgumentNull(nameof(finallyAction));
6066

67+
#if HAS_ASYNC_ENUMERABLE_CANCELLATION
68+
return Core(source, finallyAction);
69+
70+
static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<Task> finallyAction, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
71+
#else
6172
return AsyncEnumerable.Create(Core);
6273

6374
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
75+
#endif
6476
{
6577
try
6678
{

0 commit comments

Comments
 (0)