Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit 66ab939

Browse files
moozzykanalogrelay
authored andcommitted
Making HttpConnection restartable (C#) (#1147)
🎉
1 parent 75e102f commit 66ab939

File tree

26 files changed

+1146
-282
lines changed

26 files changed

+1146
-282
lines changed

SignalR.sln

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-539
99
EndProject
1010
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{83B2C3EB-A3D8-4E6F-9A3C-A380B005EF31}"
1111
ProjectSection(SolutionItems) = preProject
12+
build\dependencies.props = build\dependencies.props
1213
Directory.Build.props = Directory.Build.props
1314
Directory.Build.targets = Directory.Build.targets
1415
build\Key.snk = build\Key.snk
@@ -85,7 +86,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtSample", "samples\JwtSam
8586
EndProject
8687
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtClientSample", "samples\JwtClientSample\JwtClientSample.csproj", "{1A953296-E869-4DE2-A693-FD5FCDE27057}"
8788
EndProject
88-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
89+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
8990
EndProject
9091
Global
9192
GlobalSection(SolutionConfigurationPlatforms) = preSolution

client-ts/package-lock.json

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

samples/ClientSample/HubSample.cs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,22 @@ public static async Task<int> ExecuteAsync(string baseUrl)
3030
baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;
3131

3232
Console.WriteLine("Connecting to {0}", baseUrl);
33-
HubConnection connection = await ConnectAsync(baseUrl);
34-
Console.WriteLine("Connected to {0}", baseUrl);
33+
var connection = new HubConnectionBuilder()
34+
.WithUrl(baseUrl)
35+
.WithConsoleLogger(LogLevel.Trace)
36+
.Build();
3537

3638
try
3739
{
40+
var closeTcs = new TaskCompletionSource<object>();
41+
connection.Closed += e => closeTcs.SetResult(null);
42+
// Set up handler
43+
connection.On<string>("Send", Console.WriteLine);
44+
45+
await ConnectAsync(connection);
46+
47+
Console.WriteLine("Connected to {0}", baseUrl);
48+
3849
var sendCts = new CancellationTokenSource();
3950

4051
Console.CancelKeyPress += async (sender, a) =>
@@ -45,13 +56,10 @@ public static async Task<int> ExecuteAsync(string baseUrl)
4556
await connection.DisposeAsync();
4657
};
4758

48-
// Set up handler
49-
connection.On<string>("Send", Console.WriteLine);
50-
51-
while (!connection.Closed.IsCompleted)
59+
while (!closeTcs.Task.IsCompleted)
5260
{
53-
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), connection.Closed);
54-
if (completedTask == connection.Closed)
61+
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), closeTcs.Task);
62+
if (completedTask == closeTcs.Task)
5563
{
5664
break;
5765
}
@@ -79,19 +87,15 @@ public static async Task<int> ExecuteAsync(string baseUrl)
7987
return 0;
8088
}
8189

82-
private static async Task<HubConnection> ConnectAsync(string baseUrl)
90+
private static async Task ConnectAsync(HubConnection connection)
8391
{
8492
// Keep trying to until we can start
8593
while (true)
8694
{
87-
var connection = new HubConnectionBuilder()
88-
.WithUrl(baseUrl)
89-
.WithConsoleLogger(LogLevel.Trace)
90-
.Build();
95+
9196
try
9297
{
9398
await connection.StartAsync();
94-
return connection;
9599
}
96100
catch (Exception)
97101
{

samples/ClientSample/RawSample.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public static async Task<int> ExecuteAsync(string baseUrl)
3939
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
4040
try
4141
{
42+
var closeTcs = new TaskCompletionSource<object>();
43+
connection.Closed += e => closeTcs.SetResult(null);
4244
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));
43-
4445
await connection.StartAsync();
4546

4647
Console.WriteLine($"Connected to {baseUrl}");
@@ -51,7 +52,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
5152
await connection.DisposeAsync();
5253
};
5354

54-
while (!connection.Closed.IsCompleted)
55+
while (!closeTcs.Task.IsCompleted)
5556
{
5657
var line = await Task.Run(() => Console.ReadLine(), cts.Token);
5758

samples/JwtClientSample/Program.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ private async Task RunConnection(TransportType transportType)
3434
.WithJwtBearer(() => _tokens[userId])
3535
.Build();
3636

37+
var closedTcs = new TaskCompletionSource<object>();
38+
hubConnection.Closed += e => closedTcs.SetResult(null);
39+
3740
hubConnection.On<string, string>("Message", (sender, message) => Console.WriteLine($"[{userId}] {sender}: {message}"));
3841
await hubConnection.StartAsync();
3942
Console.WriteLine($"[{userId}] Connection Started");
@@ -43,7 +46,7 @@ private async Task RunConnection(TransportType transportType)
4346

4447
try
4548
{
46-
while (!hubConnection.Closed.IsCompleted)
49+
while (!closedTcs.Task.IsCompleted)
4750
{
4851
await Task.Delay(1000);
4952
ticks++;

src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ public class HubConnection
3434
private HubProtocolReaderWriter _protocolReaderWriter;
3535

3636
private readonly object _pendingCallsLock = new object();
37-
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
3837
private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>();
3938
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
39+
private CancellationTokenSource _connectionActive;
4040

4141
private int _nextId = 0;
4242
private volatile bool _startCalled;
4343
private Timer _timeoutTimer;
4444
private bool _needKeepAlive;
4545

46-
public Task Closed { get; }
46+
public event Action<Exception> Closed;
4747

4848
/// <summary>
4949
/// Gets or sets the server timeout interval for the connection. Changes to this value
@@ -69,11 +69,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
6969
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
7070
_logger = _loggerFactory.CreateLogger<HubConnection>();
7171
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
72-
Closed = _connection.Closed.ContinueWith(task =>
73-
{
74-
Shutdown(task.Exception);
75-
return task;
76-
}).Unwrap();
72+
_connection.Closed += e => Shutdown(e);
7773

7874
// Create the timer for timeout, but disabled by default (we enable it when started).
7975
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
@@ -122,12 +118,14 @@ private async Task StartAsyncCore()
122118
transferModeFeature.TransferMode = requestedTransferMode;
123119
await _connection.StartAsync();
124120
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;
121+
125122
var actualTransferMode = transferModeFeature.TransferMode;
126123

127124
_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
128125

129126
_logger.HubProtocol(_protocol.Name);
130127

128+
_connectionActive = new CancellationTokenSource();
131129
using (var memoryStream = new MemoryStream())
132130
{
133131
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
@@ -151,13 +149,16 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer
151149
return new PassThroughEncoder();
152150
}
153151

152+
public async Task StopAsync() => await StopAsyncCore().ForceAsync();
153+
154+
private Task StopAsyncCore() => _connection.StopAsync();
155+
154156
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
155157

156158
private async Task DisposeAsyncCore()
157159
{
158160
_timeoutTimer.Dispose();
159161
await _connection.DisposeAsync();
160-
await Closed;
161162
}
162163

163164
// TODO: Client return values/tasks?
@@ -370,12 +371,12 @@ private async Task OnDataReceivedAsync(byte[] data)
370371
}
371372
}
372373

373-
private void Shutdown(Exception ex = null)
374+
private void Shutdown(Exception exception = null)
374375
{
375376
_logger.ShutdownConnection();
376-
if (ex != null)
377+
if (exception != null)
377378
{
378-
_logger.ShutdownWithError(ex);
379+
_logger.ShutdownWithError(exception);
379380
}
380381

381382
lock (_pendingCallsLock)
@@ -388,14 +389,23 @@ private void Shutdown(Exception ex = null)
388389
foreach (var outstandingCall in _pendingCalls.Values)
389390
{
390391
_logger.RemoveInvocation(outstandingCall.InvocationId);
391-
if (ex != null)
392+
if (exception != null)
392393
{
393-
outstandingCall.Fail(ex);
394+
outstandingCall.Fail(exception);
394395
}
395396
outstandingCall.Dispose();
396397
}
397398
_pendingCalls.Clear();
398399
}
400+
401+
try
402+
{
403+
Closed?.Invoke(exception);
404+
}
405+
catch (Exception ex)
406+
{
407+
_logger.ErrorDuringClosedEvent(ex);
408+
}
399409
}
400410

401411
private async Task DispatchInvocationAsync(InvocationMessage invocation, CancellationToken cancellationToken)

src/Microsoft.AspNetCore.SignalR.Client.Core/Internal/SignalRClientLoggerExtensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ internal static class SignalRClientLoggerExtensions
8888
private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
8989
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");
9090

91+
private static readonly Action<ILogger, Exception> _errorDuringClosedEvent =
92+
LoggerMessage.Define(LogLevel.Error, new EventId(26, nameof(ErrorDuringClosedEvent)), "An exception was thrown in the handler for the Closed event.");
93+
9194
// Category: Streaming and NonStreaming
9295
private static readonly Action<ILogger, string, Exception> _invocationCreated =
9396
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
@@ -292,5 +295,10 @@ public static void ResettingKeepAliveTimer(this ILogger logger)
292295
{
293296
_resettingKeepAliveTimer(logger, null);
294297
}
298+
299+
public static void ErrorDuringClosedEvent(this ILogger logger, Exception exception)
300+
{
301+
_errorDuringClosedEvent(logger, exception);
302+
}
295303
}
296304
}

src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ public interface IConnection
1212
{
1313
Task StartAsync();
1414
Task SendAsync(byte[] data, CancellationToken cancellationToken);
15+
Task StopAsync();
1516
Task DisposeAsync();
1617
Task AbortAsync(Exception ex);
1718

1819
IDisposable OnReceived(Func<byte[], object, Task> callback, object state);
1920

20-
Task Closed { get; }
21+
event Action<Exception> Closed;
2122

2223
IFeatureCollection Features { get; }
2324
}

0 commit comments

Comments
 (0)