Skip to content

Commit 0d00933

Browse files
committed
Use IAsyncEnumerable returned by ChannelReader.ReadAllAsync()
1 parent 23bc5d2 commit 0d00933

File tree

6 files changed

+25
-87
lines changed

6 files changed

+25
-87
lines changed

src/SignalR/samples/SignalRSamples/SignalRSamples.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
<PropertyGroup>
44
<TargetFramework>netcoreapp3.0</TargetFramework>
5+
<LangVersion>8.0</LangVersion>
56
</PropertyGroup>
67

78
<ItemGroup>

src/SignalR/server/Core/src/Internal/AsyncEnumerableAdapters.cs

Lines changed: 9 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ public static IAsyncEnumerable<object> MakeCancelableAsyncEnumerable<T>(IAsyncEn
1717
return new CancelableAsyncEnumerable<T>(asyncEnumerable, cancellationToken);
1818
}
1919

20-
public static IAsyncEnumerable<object> GetAsyncEnumerableFromChannel<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default)
20+
public static IAsyncEnumerable<object> MakeCancelableAsyncEnumerableFromChannel<T>(ChannelReader<T> channel, CancellationToken cancellationToken = default)
2121
{
22-
return new ChannelAsyncEnumerable<T>(channel, cancellationToken);
22+
return MakeCancelableAsyncEnumerable(channel.ReadAllAsync(), cancellationToken);
2323
}
2424

2525
/// <summary>Converts an IAsyncEnumerable of T to an IAsyncEnumerable of object.</summary>
@@ -36,15 +36,19 @@ public CancelableAsyncEnumerable(IAsyncEnumerable<T> asyncEnumerable, Cancellati
3636

3737
public IAsyncEnumerator<object> GetAsyncEnumerator(CancellationToken cancellationToken = default)
3838
{
39+
// Assume that this will be iterated through with await foreach which always passes a default token.
40+
// Instead use the token from the ctor.
3941
Debug.Assert(cancellationToken == default);
40-
return new CancelableAsyncEnumerator(_asyncEnumerable.GetAsyncEnumerator(_cancellationToken));
42+
43+
var enumeratorOfT = _asyncEnumerable.GetAsyncEnumerator(_cancellationToken);
44+
return enumeratorOfT as IAsyncEnumerator<object> ?? new BoxedAsyncEnumerator(enumeratorOfT);
4145
}
4246

43-
private class CancelableAsyncEnumerator : IAsyncEnumerator<object>
47+
private class BoxedAsyncEnumerator : IAsyncEnumerator<object>
4448
{
4549
private IAsyncEnumerator<T> _asyncEnumerator;
4650

47-
public CancelableAsyncEnumerator(IAsyncEnumerator<T> asyncEnumerator)
51+
public BoxedAsyncEnumerator(IAsyncEnumerator<T> asyncEnumerator)
4852
{
4953
_asyncEnumerator = asyncEnumerator;
5054
}
@@ -62,74 +66,5 @@ public ValueTask DisposeAsync()
6266
}
6367
}
6468
}
65-
66-
/// <summary>Provides an IAsyncEnumerable of object for the data in a channel.</summary>
67-
private class ChannelAsyncEnumerable<T> : IAsyncEnumerable<object>
68-
{
69-
private readonly ChannelReader<T> _channel;
70-
private readonly CancellationToken _cancellationToken;
71-
72-
public ChannelAsyncEnumerable(ChannelReader<T> channel, CancellationToken cancellationToken)
73-
{
74-
_channel = channel;
75-
_cancellationToken = cancellationToken;
76-
}
77-
78-
public IAsyncEnumerator<object> GetAsyncEnumerator(CancellationToken cancellationToken = default)
79-
{
80-
Debug.Assert(cancellationToken == default);
81-
return new ChannelAsyncEnumerator(_channel, _cancellationToken);
82-
}
83-
84-
private class ChannelAsyncEnumerator : IAsyncEnumerator<object>
85-
{
86-
/// <summary>The channel being enumerated.</summary>
87-
private readonly ChannelReader<T> _channel;
88-
/// <summary>Cancellation token used to cancel the enumeration.</summary>
89-
private readonly CancellationToken _cancellationToken;
90-
/// <summary>The current element of the enumeration.</summary>
91-
private T _current;
92-
93-
public ChannelAsyncEnumerator(ChannelReader<T> channel, CancellationToken cancellationToken)
94-
{
95-
_channel = channel;
96-
_cancellationToken = cancellationToken;
97-
}
98-
99-
public object Current => _current;
100-
101-
public ValueTask<bool> MoveNextAsync()
102-
{
103-
var result = _channel.ReadAsync(_cancellationToken);
104-
105-
if (result.IsCompletedSuccessfully)
106-
{
107-
_current = result.Result;
108-
return new ValueTask<bool>(true);
109-
}
110-
111-
return new ValueTask<bool>(MoveNextAsyncAwaited(result));
112-
}
113-
114-
private async Task<bool> MoveNextAsyncAwaited(ValueTask<T> channelReadTask)
115-
{
116-
try
117-
{
118-
_current = await channelReadTask;
119-
}
120-
catch (ChannelClosedException ex) when (ex.InnerException == null)
121-
{
122-
return false;
123-
}
124-
125-
return true;
126-
}
127-
128-
public ValueTask DisposeAsync()
129-
{
130-
return default;
131-
}
132-
}
133-
}
13469
}
13570
}

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ private ValueTask CleanupInvocation(HubConnectionContext connection, HubMethodIn
397397
return scope.DisposeAsync();
398398
}
399399

400-
private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerable<object> enumerable, IServiceScope scope,
400+
private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerable<object> enumerable, IServiceScope scope,
401401
IHubActivator<THub> hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage)
402402
{
403403
string error = null;

src/SignalR/server/Core/src/Internal/HubMethodDescriptor.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@ internal class HubMethodDescriptor
1919
.GetRuntimeMethods()
2020
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable)) && m.IsGenericMethod);
2121

22-
private static readonly MethodInfo GetAsyncEnumerableFromChannelMethod = typeof(AsyncEnumerableAdapters)
22+
private static readonly MethodInfo MakeCancelableAsyncEnumerableFromChannelMethod = typeof(AsyncEnumerableAdapters)
2323
.GetRuntimeMethods()
24-
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.GetAsyncEnumerableFromChannel)) && m.IsGenericMethod);
24+
.Single(m => m.Name.Equals(nameof(AsyncEnumerableAdapters.MakeCancelableAsyncEnumerableFromChannel)) && m.IsGenericMethod);
2525

26-
private readonly MethodInfo _convertToEnumerableMethodInfo;
27-
private Func<object, CancellationToken, IAsyncEnumerable<object>> _convertToEnumerable;
26+
private readonly MethodInfo _makeCancelableEnumerableMethodInfo;
27+
private Func<object, CancellationToken, IAsyncEnumerable<object>> _makeCancelableEnumerable;
2828

2929
public HubMethodDescriptor(ObjectMethodExecutor methodExecutor, IEnumerable<IAuthorizeData> policies)
3030
{
@@ -46,14 +46,14 @@ public HubMethodDescriptor(ObjectMethodExecutor methodExecutor, IEnumerable<IAut
4646
if (openReturnType == typeof(IAsyncEnumerable<>))
4747
{
4848
StreamReturnType = returnType.GetGenericArguments()[0];
49-
_convertToEnumerableMethodInfo = MakeCancelableAsyncEnumerableMethod;
49+
_makeCancelableEnumerableMethodInfo = MakeCancelableAsyncEnumerableMethod;
5050
break;
5151
}
5252

5353
if (openReturnType == typeof(ChannelReader<>))
5454
{
5555
StreamReturnType = returnType.GetGenericArguments()[0];
56-
_convertToEnumerableMethodInfo = GetAsyncEnumerableFromChannelMethod;
56+
_makeCancelableEnumerableMethodInfo = MakeCancelableAsyncEnumerableFromChannelMethod;
5757
break;
5858
}
5959
}
@@ -110,19 +110,19 @@ public HubMethodDescriptor(ObjectMethodExecutor methodExecutor, IEnumerable<IAut
110110
public IAsyncEnumerable<object> FromReturnedStream(object stream, CancellationToken cancellationToken)
111111
{
112112
// there is the potential for compile to be called times but this has no harmful effect other than perf
113-
if (_convertToEnumerable == null)
113+
if (_makeCancelableEnumerable == null)
114114
{
115-
_convertToEnumerable = CompileConvertToEnumerable(_convertToEnumerableMethodInfo, StreamReturnType);
115+
_makeCancelableEnumerable = CompileConvertToEnumerable(_makeCancelableEnumerableMethodInfo, StreamReturnType);
116116
}
117117

118-
return _convertToEnumerable.Invoke(stream, cancellationToken);
118+
return _makeCancelableEnumerable.Invoke(stream, cancellationToken);
119119
}
120120

121121
private static Func<object, CancellationToken, IAsyncEnumerable<object>> CompileConvertToEnumerable(MethodInfo adapterMethodInfo, Type streamReturnType)
122122
{
123123
// This will call one of two adapter methods to wrap the passed in streamable value into an IAsyncEnumerable<object>:
124-
// - AsyncEnumerableAdapters.GetAsyncEnumerableFromAsyncEnumerable<T>(asyncEnumerable, cancellationToken);
125-
// - AsyncEnumerableAdapters.GetAsyncEnumerableFromChannel<T>(channelReader, cancellationToken);
124+
// - AsyncEnumerableAdapters.MakeCancelableAsyncEnumerable<T>(asyncEnumerable, cancellationToken);
125+
// - AsyncEnumerableAdapters.MakeCancelableAsyncEnumerableFromChannel<T>(channelReader, cancellationToken);
126126

127127
var parameters = new[]
128128
{

src/SignalR/server/Core/src/Microsoft.AspNetCore.SignalR.Core.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<Description>Real-time communication framework for ASP.NET Core.</Description>
55
<TargetFramework>netcoreapp3.0</TargetFramework>
66
<IsAspNetCoreApp>true</IsAspNetCoreApp>
77
<RootNamespace>Microsoft.AspNetCore.SignalR</RootNamespace>
8+
<LangVersion>8.0</LangVersion>
89
</PropertyGroup>
910

1011
<ItemGroup>

src/SignalR/server/SignalR/test/Microsoft.AspNetCore.SignalR.Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
<PropertyGroup>
44
<TargetFramework>netcoreapp3.0</TargetFramework>
5+
<LangVersion>8.0</LangVersion>
56
</PropertyGroup>
67

78

0 commit comments

Comments
 (0)