Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Making HttpConnection restartable (C#) #1147

Merged
merged 3 commits into from
Dec 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion SignalR.sln
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{DA69F624-539
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{83B2C3EB-A3D8-4E6F-9A3C-A380B005EF31}"
ProjectSection(SolutionItems) = preProject
build\dependencies.props = build\dependencies.props
Directory.Build.props = Directory.Build.props
Directory.Build.targets = Directory.Build.targets
build\Key.snk = build\Key.snk
Expand Down Expand Up @@ -85,7 +86,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtSample", "samples\JwtSam
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JwtClientSample", "samples\JwtClientSample\JwtClientSample.csproj", "{1A953296-E869-4DE2-A693-FD5FCDE27057}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.AspNetCore.SignalR.Tests.Utils", "test\Microsoft.AspNetCore.SignalR.Tests.Utils\Microsoft.AspNetCore.SignalR.Tests.Utils.csproj", "{0A0A6135-EA24-4307-95C2-CE1B7E164A5E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down
28 changes: 14 additions & 14 deletions client-ts/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 18 additions & 14 deletions samples/ClientSample/HubSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,22 @@ public static async Task<int> ExecuteAsync(string baseUrl)
baseUrl = string.IsNullOrEmpty(baseUrl) ? "http://localhost:5000/default" : baseUrl;

Console.WriteLine("Connecting to {0}", baseUrl);
HubConnection connection = await ConnectAsync(baseUrl);
Console.WriteLine("Connected to {0}", baseUrl);
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.WithConsoleLogger(LogLevel.Trace)
.Build();

try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
// Set up handler
connection.On<string>("Send", Console.WriteLine);

await ConnectAsync(connection);

Console.WriteLine("Connected to {0}", baseUrl);

var sendCts = new CancellationTokenSource();

Console.CancelKeyPress += async (sender, a) =>
Expand All @@ -45,13 +56,10 @@ public static async Task<int> ExecuteAsync(string baseUrl)
await connection.DisposeAsync();
};

// Set up handler
connection.On<string>("Send", Console.WriteLine);

while (!connection.Closed.IsCompleted)
while (!closeTcs.Task.IsCompleted)
{
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), connection.Closed);
if (completedTask == connection.Closed)
var completedTask = await Task.WhenAny(Task.Run(() => Console.ReadLine()), closeTcs.Task);
if (completedTask == closeTcs.Task)
{
break;
}
Expand Down Expand Up @@ -79,19 +87,15 @@ public static async Task<int> ExecuteAsync(string baseUrl)
return 0;
}

private static async Task<HubConnection> ConnectAsync(string baseUrl)
private static async Task ConnectAsync(HubConnection connection)
{
// Keep trying to until we can start
while (true)
{
var connection = new HubConnectionBuilder()
.WithUrl(baseUrl)
.WithConsoleLogger(LogLevel.Trace)
.Build();

try
{
await connection.StartAsync();
return connection;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to return here? Otherwise this method will never return... or am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value was changed from Task<HubConnection> to Task

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes indeed, but that still means you need a return statement right? Just without the connection...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand it, the await keyword will just wait for the execution of StartAsync() to finish before continuing with the method. It will not return from the method. As this is inside a while(true) loop, I think it will never return.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What @fretje means is that this is an infinite loop:

while (true)
{
    try
    {
        await connection.StartAsync();
    }
    catch (Exception)
    {
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@acjh Thanks! Yes you guys are right, this is broken. Would either of you like to make a PR to fix it?

catch (Exception)
{
Expand Down
5 changes: 3 additions & 2 deletions samples/ClientSample/RawSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public static async Task<int> ExecuteAsync(string baseUrl)
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));

await connection.StartAsync();

Console.WriteLine($"Connected to {baseUrl}");
Expand All @@ -51,7 +52,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
await connection.DisposeAsync();
};

while (!connection.Closed.IsCompleted)
while (!closeTcs.Task.IsCompleted)
{
var line = await Task.Run(() => Console.ReadLine(), cts.Token);

Expand Down
5 changes: 4 additions & 1 deletion samples/JwtClientSample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ private async Task RunConnection(TransportType transportType)
.WithJwtBearer(() => _tokens[userId])
.Build();

var closedTcs = new TaskCompletionSource<object>();
hubConnection.Closed += e => closedTcs.SetResult(null);

hubConnection.On<string, string>("Message", (sender, message) => Console.WriteLine($"[{userId}] {sender}: {message}"));
await hubConnection.StartAsync();
Console.WriteLine($"[{userId}] Connection Started");
Expand All @@ -43,7 +46,7 @@ private async Task RunConnection(TransportType transportType)

try
{
while (!hubConnection.Closed.IsCompleted)
while (!closedTcs.Task.IsCompleted)
{
await Task.Delay(1000);
ticks++;
Expand Down
36 changes: 23 additions & 13 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ public class HubConnection
private HubProtocolReaderWriter _protocolReaderWriter;

private readonly object _pendingCallsLock = new object();
private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource();
private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>();
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private CancellationTokenSource _connectionActive;

private int _nextId = 0;
private volatile bool _startCalled;
private Timer _timeoutTimer;
private bool _needKeepAlive;

public Task Closed { get; }
public event Action<Exception> Closed;

/// <summary>
/// Gets or sets the server timeout interval for the connection. Changes to this value
Expand All @@ -69,11 +69,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
Closed = _connection.Closed.ContinueWith(task =>
{
Shutdown(task.Exception);
return task;
}).Unwrap();
_connection.Closed += e => Shutdown(e);

// Create the timer for timeout, but disabled by default (we enable it when started).
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
Expand Down Expand Up @@ -122,12 +118,14 @@ private async Task StartAsyncCore()
transferModeFeature.TransferMode = requestedTransferMode;
await _connection.StartAsync();
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;

var actualTransferMode = transferModeFeature.TransferMode;

_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));

_logger.HubProtocol(_protocol.Name);

_connectionActive = new CancellationTokenSource();
using (var memoryStream = new MemoryStream())
{
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
Expand All @@ -151,13 +149,16 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer
return new PassThroughEncoder();
}

public async Task StopAsync() => await StopAsyncCore().ForceAsync();

private Task StopAsyncCore() => _connection.StopAsync();

public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();

private async Task DisposeAsyncCore()
{
_timeoutTimer.Dispose();
await _connection.DisposeAsync();
await Closed;
}

// TODO: Client return values/tasks?
Expand Down Expand Up @@ -370,12 +371,12 @@ private async Task OnDataReceivedAsync(byte[] data)
}
}

private void Shutdown(Exception ex = null)
private void Shutdown(Exception exception = null)
{
_logger.ShutdownConnection();
if (ex != null)
if (exception != null)
{
_logger.ShutdownWithError(ex);
_logger.ShutdownWithError(exception);
}

lock (_pendingCallsLock)
Expand All @@ -388,14 +389,23 @@ private void Shutdown(Exception ex = null)
foreach (var outstandingCall in _pendingCalls.Values)
{
_logger.RemoveInvocation(outstandingCall.InvocationId);
if (ex != null)
if (exception != null)
{
outstandingCall.Fail(ex);
outstandingCall.Fail(exception);
}
outstandingCall.Dispose();
}
_pendingCalls.Clear();
}

try
{
Closed?.Invoke(exception);
}
catch (Exception ex)
{
_logger.ErrorDuringClosedEvent(ex);
}
}

private async Task DispatchInvocationAsync(InvocationMessage invocation, CancellationToken cancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ internal static class SignalRClientLoggerExtensions
private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");

private static readonly Action<ILogger, Exception> _errorDuringClosedEvent =
LoggerMessage.Define(LogLevel.Error, new EventId(26, nameof(ErrorDuringClosedEvent)), "An exception was thrown in the handler for the Closed event.");

// Category: Streaming and NonStreaming
private static readonly Action<ILogger, string, Exception> _invocationCreated =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
Expand Down Expand Up @@ -292,5 +295,10 @@ public static void ResettingKeepAliveTimer(this ILogger logger)
{
_resettingKeepAliveTimer(logger, null);
}

public static void ErrorDuringClosedEvent(this ILogger logger, Exception exception)
{
_errorDuringClosedEvent(logger, exception);
}
}
}
3 changes: 2 additions & 1 deletion src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ public interface IConnection
{
Task StartAsync();
Task SendAsync(byte[] data, CancellationToken cancellationToken);
Task StopAsync();
Task DisposeAsync();
Task AbortAsync(Exception ex);

IDisposable OnReceived(Func<byte[], object, Task> callback, object state);

Task Closed { get; }
event Action<Exception> Closed;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to be Func<Exception, Task>, any reason we don't go back to that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we want it (and what we had was not implemented correctly). I think we need to try which pattern works better for restarting the connection from this event.


IFeatureCollection Features { get; }
}
Expand Down
Loading