Skip to content

Commit 24c8bc9

Browse files
committed
Use async iterators in Scan.
1 parent a43c59f commit 24c8bc9

File tree

1 file changed

+126
-0
lines changed
  • Ix.NET/Source/System.Interactive.Async/System/Linq/Operators

1 file changed

+126
-0
lines changed

Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,42 @@ namespace System.Linq
1111
{
1212
public static partial class AsyncEnumerableEx
1313
{
14+
// NB: Implementations of Scan never yield the first element, unlike the behavior of Aggregate on a sequence with one
15+
// element, which returns the first element (or the seed if given an empty sequence). This is compatible with Rx
16+
// but one could argue whether it was the right default.
17+
1418
public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
1519
{
1620
if (source == null)
1721
throw Error.ArgumentNull(nameof(source));
1822
if (accumulator == null)
1923
throw Error.ArgumentNull(nameof(accumulator));
2024

25+
#if USE_ASYNC_ITERATOR
26+
return AsyncEnumerable.Create(Core);
27+
28+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
29+
{
30+
await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
31+
{
32+
if (!await e.MoveNextAsync())
33+
{
34+
yield break;
35+
}
36+
37+
TSource res = e.Current;
38+
39+
while (await e.MoveNextAsync())
40+
{
41+
res = accumulator(res, e.Current);
42+
43+
yield return res;
44+
}
45+
}
46+
}
47+
#else
2148
return new ScanAsyncEnumerable<TSource>(source, accumulator);
49+
#endif
2250
}
2351

2452
public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
@@ -28,7 +56,23 @@ public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsy
2856
if (accumulator == null)
2957
throw Error.ArgumentNull(nameof(accumulator));
3058

59+
#if USE_ASYNC_ITERATOR
60+
return AsyncEnumerable.Create(Core);
61+
62+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
63+
{
64+
TAccumulate res = seed;
65+
66+
await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
67+
{
68+
res = accumulator(res, item);
69+
70+
yield return res;
71+
}
72+
}
73+
#else
3174
return new ScanAsyncEnumerable<TSource, TAccumulate>(source, seed, accumulator);
75+
#endif
3276
}
3377

3478
public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator)
@@ -38,7 +82,31 @@ public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSou
3882
if (accumulator == null)
3983
throw Error.ArgumentNull(nameof(accumulator));
4084

85+
#if USE_ASYNC_ITERATOR
86+
return AsyncEnumerable.Create(Core);
87+
88+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
89+
{
90+
await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
91+
{
92+
if (!await e.MoveNextAsync())
93+
{
94+
yield break;
95+
}
96+
97+
TSource res = e.Current;
98+
99+
while (await e.MoveNextAsync())
100+
{
101+
res = await accumulator(res, e.Current).ConfigureAwait(false);
102+
103+
yield return res;
104+
}
105+
}
106+
}
107+
#else
41108
return new ScanAsyncEnumerableWithTask<TSource>(source, accumulator);
109+
#endif
42110
}
43111

44112
#if !NO_DEEP_CANCELLATION
@@ -49,7 +117,31 @@ public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSou
49117
if (accumulator == null)
50118
throw Error.ArgumentNull(nameof(accumulator));
51119

120+
#if USE_ASYNC_ITERATOR
121+
return AsyncEnumerable.Create(Core);
122+
123+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
124+
{
125+
await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
126+
{
127+
if (!await e.MoveNextAsync())
128+
{
129+
yield break;
130+
}
131+
132+
TSource res = e.Current;
133+
134+
while (await e.MoveNextAsync())
135+
{
136+
res = await accumulator(res, e.Current, cancellationToken).ConfigureAwait(false);
137+
138+
yield return res;
139+
}
140+
}
141+
}
142+
#else
52143
return new ScanAsyncEnumerableWithTaskAndCancellation<TSource>(source, accumulator);
144+
#endif
53145
}
54146
#endif
55147

@@ -60,7 +152,23 @@ public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsy
60152
if (accumulator == null)
61153
throw Error.ArgumentNull(nameof(accumulator));
62154

155+
#if USE_ASYNC_ITERATOR
156+
return AsyncEnumerable.Create(Core);
157+
158+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
159+
{
160+
TAccumulate res = seed;
161+
162+
await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
163+
{
164+
res = await accumulator(res, item).ConfigureAwait(false);
165+
166+
yield return res;
167+
}
168+
}
169+
#else
63170
return new ScanAsyncEnumerableWithTask<TSource, TAccumulate>(source, seed, accumulator);
171+
#endif
64172
}
65173

66174
#if !NO_DEEP_CANCELLATION
@@ -71,10 +179,27 @@ public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsy
71179
if (accumulator == null)
72180
throw Error.ArgumentNull(nameof(accumulator));
73181

182+
#if USE_ASYNC_ITERATOR
183+
return AsyncEnumerable.Create(Core);
184+
185+
async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
186+
{
187+
TAccumulate res = seed;
188+
189+
await foreach (TSource item in source.WithCancellation(cancellationToken).ConfigureAwait(false))
190+
{
191+
res = await accumulator(res, item, cancellationToken).ConfigureAwait(false);
192+
193+
yield return res;
194+
}
195+
}
196+
#else
74197
return new ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate>(source, seed, accumulator);
198+
#endif
75199
}
76200
#endif
77201

202+
#if !USE_ASYNC_ITERATOR
78203
private sealed class ScanAsyncEnumerable<TSource> : AsyncIterator<TSource>
79204
{
80205
private readonly Func<TSource, TSource, TSource> _accumulator;
@@ -496,6 +621,7 @@ protected override async ValueTask<bool> MoveNextCore()
496621
return false;
497622
}
498623
}
624+
#endif
499625
#endif
500626
}
501627
}

0 commit comments

Comments
 (0)