Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit 018d79f

Browse files
author
Pawel Kadluczka
committed
Making HttpConnection restartable (C#)
~~(a.k.a. Opening Pandora box)~~
1 parent 4394b57 commit 018d79f

File tree

12 files changed

+548
-106
lines changed

12 files changed

+548
-106
lines changed

samples/ClientSample/HubSample.cs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,22 @@ public static async Task<int> ExecuteAsync(string baseUrl)
3030
baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;
3131

3232
Console.WriteLine("Connecting to {0}", baseUrl);
33-
HubConnection connection = await ConnectAsync(baseUrl);
34-
Console.WriteLine("Connected to {0}", baseUrl);
33+
var connection = new HubConnectionBuilder()
34+
.WithUrl(baseUrl)
35+
.WithConsoleLogger(LogLevel.Trace)
36+
.Build();
3537

3638
try
3739
{
40+
var closeTcs = new TaskCompletionSource<object>();
41+
connection.Closed += e => closeTcs.SetResult(null);
42+
// Set up handler
43+
connection.On<string>("Send", Console.WriteLine);
44+
45+
await ConnectAsync(connection);
46+
47+
Console.WriteLine("Connected to {0}", baseUrl);
48+
3849
var sendCts = new CancellationTokenSource();
3950

4051
Console.CancelKeyPress += async (sender, a) =>
@@ -45,13 +56,10 @@ public static async Task<int> ExecuteAsync(string baseUrl)
4556
await connection.DisposeAsync();
4657
};
4758

48-
// Set up handler
49-
connection.On<string>("Send", Console.WriteLine);
50-
51-
while (!connection.Closed.IsCompleted)
59+
while (!closeTcs.Task.IsCompleted)
5260
{
53-
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), connection.Closed);
54-
if (completedTask == connection.Closed)
61+
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), closeTcs.Task);
62+
if (completedTask == closeTcs.Task)
5563
{
5664
break;
5765
}
@@ -79,19 +87,15 @@ public static async Task<int> ExecuteAsync(string baseUrl)
7987
return 0;
8088
}
8189

82-
private static async Task<HubConnection> ConnectAsync(string baseUrl)
90+
private static async Task ConnectAsync(HubConnection connection)
8391
{
8492
// Keep trying to until we can start
8593
while (true)
8694
{
87-
var connection = new HubConnectionBuilder()
88-
.WithUrl(baseUrl)
89-
.WithConsoleLogger(LogLevel.Trace)
90-
.Build();
95+
9196
try
9297
{
9398
await connection.StartAsync();
94-
return connection;
9599
}
96100
catch (Exception)
97101
{

samples/ClientSample/RawSample.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public static async Task<int> ExecuteAsync(string baseUrl)
3939
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
4040
try
4141
{
42+
var closeTcs = new TaskCompletionSource<object>();
43+
connection.Closed += e => closeTcs.SetResult(null);
4244
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));
43-
4445
await connection.StartAsync();
4546

4647
Console.WriteLine($"Connected to {baseUrl}");
@@ -51,7 +52,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
5152
await connection.DisposeAsync();
5253
};
5354

54-
while (!connection.Closed.IsCompleted)
55+
while (!closeTcs.Task.IsCompleted)
5556
{
5657
var line = await Task.Run(() => Console.ReadLine(), cts.Token);
5758

samples/JwtClientSample/Program.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ private async Task RunConnection(TransportType transportType)
3434
.WithJwtBearer(() => _tokens[userId])
3535
.Build();
3636

37+
var closedTcs = new TaskCompletionSource<object>();
38+
hubConnection.Closed += e => closedTcs.SetResult(null);
39+
3740
hubConnection.On<string, string>("Message", (sender, message) => Console.WriteLine($"[{userId}] {sender}: {message}"));
3841
await hubConnection.StartAsync();
3942
Console.WriteLine($"[{userId}] Connection Started");
@@ -43,7 +46,7 @@ private async Task RunConnection(TransportType transportType)
4346

4447
try
4548
{
46-
while (!hubConnection.Closed.IsCompleted)
49+
while (!closedTcs.Task.IsCompleted)
4750
{
4851
await Task.Delay(1000);
4952
ticks++;

src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,14 @@ public class HubConnection
3232
private HubProtocolReaderWriter _protocolReaderWriter;
3333

3434
private readonly object _pendingCallsLock = new object();
35-
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
3635
private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>();
3736
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
37+
private CancellationTokenSource _connectionActive;
3838

3939
private int _nextId = 0;
4040
private volatile bool _startCalled;
4141

42-
public Task Closed { get; }
42+
public event Action<Exception> Closed;
4343

4444
public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFactory loggerFactory)
4545
{
@@ -59,11 +59,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
5959
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
6060
_logger = _loggerFactory.CreateLogger<HubConnection>();
6161
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
62-
Closed = _connection.Closed.ContinueWith(task =>
63-
{
64-
Shutdown(task.Exception);
65-
return task;
66-
}).Unwrap();
62+
_connection.Closed += e => Shutdown(e);
6763
}
6864

6965
public async Task StartAsync()
@@ -94,12 +90,14 @@ private async Task StartAsyncCore()
9490

9591
transferModeFeature.TransferMode = requestedTransferMode;
9692
await _connection.StartAsync();
93+
9794
var actualTransferMode = transferModeFeature.TransferMode;
9895

9996
_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
10097

10198
_logger.HubProtocol(_protocol.Name);
10299

100+
_connectionActive = new CancellationTokenSource();
103101
using (var memoryStream = new MemoryStream())
104102
{
105103
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
@@ -121,12 +119,15 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer
121119
return new PassThroughEncoder();
122120
}
123121

122+
public async Task StopAsync() => await StopAsyncCore().ForceAsync();
123+
124+
private Task StopAsyncCore() => _connection.StopAsync();
125+
124126
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
125127

126128
private async Task DisposeAsyncCore()
127129
{
128130
await _connection.DisposeAsync();
129-
await Closed;
130131
}
131132

132133
// TODO: Client return values/tasks?
@@ -364,6 +365,8 @@ private void Shutdown(Exception ex = null)
364365
}
365366
_pendingCalls.Clear();
366367
}
368+
369+
Closed?.Invoke(ex);
367370
}
368371

369372
private async Task DispatchInvocationAsync(InvocationMessage invocation, CancellationToken cancellationToken)

src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ public interface IConnection
1212
{
1313
Task StartAsync();
1414
Task SendAsync(byte[] data, CancellationToken cancellationToken);
15+
Task StopAsync();
1516
Task DisposeAsync();
1617

1718
IDisposable OnReceived(Func<byte[], object, Task> callback, object state);
1819

19-
Task Closed { get; }
20+
event Action<Exception> Closed;
2021

2122
IFeatureCollection Features { get; }
2223
}

0 commit comments

Comments
 (0)