Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.
32 changes: 32 additions & 0 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
{
public class HubConnection
{
public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30); // Server ping rate is 15 sec, this is 2 times that.

private readonly ILoggerFactory _loggerFactory;
private readonly ILogger _logger;
private readonly IConnection _connection;
Expand All @@ -38,9 +40,17 @@ public class HubConnection

private int _nextId = 0;
private volatile bool _startCalled;
private Timer _timeoutTimer;
private bool _needKeepAlive;

public Task Closed { get; }

/// <summary>
/// Gets or sets the server timeout interval for the connection. Changes to this value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Changes to this value are ignored after StartAsync is called"
False, ResetTimeoutTimer is called in the OnDataReceivedAsync callback

/// will not be applied until the Keep Alive timer is next reset.
/// </summary>
public TimeSpan ServerTimeout { get; set; } = DefaultServerTimeout;

public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFactory loggerFactory)
{
if (connection == null)
Expand All @@ -64,6 +74,9 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
Shutdown(task.Exception);
return task;
}).Unwrap();

// Create the timer for timeout, but disabled by default (we enable it when started).
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just be aware that we'll need to set the timer to disabled in StopAsync when Pawel's PR goes in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

}

public async Task StartAsync()
Expand All @@ -78,6 +91,20 @@ public async Task StartAsync()
}
}

private void TimeoutElapsed()
{
_connection.AbortAsync(new TimeoutException($"Server timeout ({ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server."));
}

private void ResetTimeoutTimer()
{
if (_needKeepAlive)
{
_logger.ResettingKeepAliveTimer();
_timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
}
}

private async Task StartAsyncCore()
{
var transferModeFeature = _connection.Features.Get<ITransferModeFeature>();
Expand All @@ -94,6 +121,7 @@ private async Task StartAsyncCore()

transferModeFeature.TransferMode = requestedTransferMode;
await _connection.StartAsync();
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;
var actualTransferMode = transferModeFeature.TransferMode;

_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
Expand All @@ -105,6 +133,8 @@ private async Task StartAsyncCore()
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token);
}

ResetTimeoutTimer();
}

private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, TransferMode actualTransferMode)
Expand All @@ -125,6 +155,7 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer

private async Task DisposeAsyncCore()
{
_timeoutTimer.Dispose();
await _connection.DisposeAsync();
await Closed;
}
Expand Down Expand Up @@ -298,6 +329,7 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT

private async Task OnDataReceivedAsync(byte[] data)
{
ResetTimeoutTimer();
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))
{
foreach (var message in messages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ internal static class SignalRClientLoggerExtensions
private static readonly Action<ILogger, string, string, string, int, Exception> _preparingStreamingInvocation =
LoggerMessage.Define<string, string, string, int>(LogLevel.Trace, new EventId(24, nameof(PreparingStreamingInvocation)), "Preparing streaming invocation '{invocationId}' of '{target}', with return type '{returnType}' and {argumentCount} argument(s).");

private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");

// Category: Streaming and NonStreaming
private static readonly Action<ILogger, string, Exception> _invocationCreated =
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
Expand Down Expand Up @@ -282,7 +285,12 @@ public static void StreamItemOnNonStreamInvocation(this ILogger logger, string i

public static void ErrorInvokingClientSideMethod(this ILogger logger, string methodName, Exception exception)
{
_errorInvokingClientSideMethod(logger, methodName, exception);
_errorInvokingClientSideMethod(logger, methodName, exception);
}

public static void ResettingKeepAliveTimer(this ILogger logger)
{
_resettingKeepAliveTimer(logger, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<PropertyGroup>
<Description>Client for ASP.NET Core SignalR</Description>
<TargetFramework>netstandard2.0</TargetFramework>
<RootNamespace>Microsoft.AspNetCore.SignalR.Client</RootNamespace>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
{
internal static class HubProtocolConstants
public static class HubProtocolConstants
{
public const int InvocationMessageType = 1;
public const int StreamItemMessageType = 2;
Expand Down
3 changes: 2 additions & 1 deletion src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
Expand All @@ -13,6 +13,7 @@ public interface IConnection
Task StartAsync();
Task SendAsync(byte[] data, CancellationToken cancellationToken);
Task DisposeAsync();
Task AbortAsync(Exception ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a sign we should unify the client and sever connection objects. This will turn into ConnectionContext

Copy link
Contributor Author

@analogrelay analogrelay Nov 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but I don't want to block this PR on that right now since I don't need to unify them to implement this.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1177 to track that


IDisposable OnReceived(Func<byte[], object, Task> callback, object state);

Expand Down
23 changes: 19 additions & 4 deletions src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public HttpConnection(Uri url, ITransportFactory transportFactory, ILoggerFactor
_logger = _loggerFactory.CreateLogger<HttpConnection>();
_httpOptions = httpOptions;
_httpClient = _httpOptions?.HttpMessageHandler == null ? new HttpClient() : new HttpClient(_httpOptions?.HttpMessageHandler);
_httpClient.Timeout = HttpClientTimeout;
_httpClient.Timeout = HttpClientTimeout;
_transportFactory = transportFactory ?? throw new ArgumentNullException(nameof(transportFactory));
}

Expand Down Expand Up @@ -303,7 +303,7 @@ private async Task StartTransport(Uri connectUrl)
// Start the transport, giving it one end of the pipeline
try
{
await _transport.StartAsync(connectUrl, applicationSide, requestedTransferMode: GetTransferMode(), connectionId: _connectionId);
await _transport.StartAsync(connectUrl, applicationSide, GetTransferMode(), _connectionId, this);

// actual transfer mode can differ from the one that was requested so set it on the feature
Debug.Assert(_transport.Mode.HasValue, "transfer mode not set after transport started");
Expand Down Expand Up @@ -435,11 +435,25 @@ private async Task SendAsyncCore(byte[] data, CancellationToken cancellationToke
}
}

public async Task AbortAsync(Exception ex) => await DisposeAsyncCore(ex ?? new InvalidOperationException("Connection aborted")).ForceAsync();

public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();

private async Task DisposeAsyncCore()
private async Task DisposeAsyncCore(Exception ex = null)
{
_logger.StoppingClient(_connectionId);
if (ex != null)
{
_logger.AbortingClient(_connectionId, ex);

// Immediately fault the close task. When the transport shuts down,
// it will trigger the close task to be completed, so we want it to be
// marked faulted before that happens
_closedTcs.TrySetException(ex);
}
else
{
_logger.StoppingClient(_connectionId);
}

if (Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected) == ConnectionState.Initial)
{
Expand Down Expand Up @@ -472,6 +486,7 @@ private async Task DisposeAsyncCore()
await _receiveLoopTask;
}

// If we haven't already done so, trigger the Closed task.
_closedTcs.TrySetResult(null);
_httpClient?.Dispose();
}
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
{
public interface ITransport
{
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId);
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't very clean. We shouldn't need to pass the IConnection here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How else should transports add features? I actually think it makes sense, it's much like how it works on the server.

Task StopAsync();
TransferMode? Mode { get; }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ internal static class SocketClientLoggerExtensions
private static readonly Action<ILogger, DateTime, string, string, Exception> _exceptionThrownFromCallback =
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, new EventId(19, nameof(ExceptionThrownFromCallback)), "{time}: Connection Id {connectionId}: An exception was thrown from the '{callback}' callback");

private static readonly Action<ILogger, DateTime, string, Exception> _abortingClient =
LoggerMessage.Define<DateTime, string>(LogLevel.Error, new EventId(20, nameof(AbortingClient)), "{time}: Connection Id {connectionId}: Aborting client.");


public static void StartTransport(this ILogger logger, string connectionId, TransferMode transferMode)
{
Expand Down Expand Up @@ -506,6 +509,14 @@ public static void SendingMessage(this ILogger logger, string connectionId)
}
}

public static void AbortingClient(this ILogger logger, string connectionId, Exception ex)
{
if (logger.IsEnabled(LogLevel.Error))
{
_abortingClient(logger, DateTime.Now, connectionId, ex);
}
}

public static void StoppingClient(this ILogger logger, string connectionId)
{
if (logger.IsEnabled(LogLevel.Information))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Sockets.Client.Http;
using Microsoft.AspNetCore.Sockets.Client.Internal;
using Microsoft.AspNetCore.Sockets.Features;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

Expand Down Expand Up @@ -42,13 +42,15 @@ public LongPollingTransport(HttpClient httpClient, HttpOptions httpOptions, ILog
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
}

public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
{
throw new ArgumentException("Invalid transfer mode.", nameof(requestedTransferMode));
}

connection.Features.Set<IConnectionInherentKeepAliveFeature>(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout));

_application = application;
Mode = requestedTransferMode;
_connectionId = connectionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public ServerSentEventsTransport(HttpClient httpClient, HttpOptions httpOptions,
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
}

public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public WebSocketsTransport(HttpOptions httpOptions, ILoggerFactory loggerFactory
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketsTransport>();
}

public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
{
if (url == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public async Task CanStopStartingConnection()
releaseDisposeTcs.SetResult(null);
await disposeTask.OrTimeout();

transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()), Times.Never);
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()), Times.Never);
}

[Fact]
Expand Down Expand Up @@ -263,8 +263,8 @@ public async Task ReceivedCallbackNotRaisedAfterConnectionIsDisposed()

var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
Expand Down Expand Up @@ -311,8 +311,8 @@ public async Task EventsAreNotRunningOnMainLoop()

var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
Expand Down Expand Up @@ -368,8 +368,8 @@ public async Task EventQueueTimeout()

var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
Expand Down Expand Up @@ -413,8 +413,8 @@ public async Task EventQueueTimeoutWithException()

var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
Expand Down Expand Up @@ -925,8 +925,8 @@ public async Task CanStartConnectionWithoutSettingTransferModeFeature()

var mockTransport = new Mock<ITransport>();
Channel<byte[], SendMessage> channel = null;
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
{
channel = c;
return Task.CompletedTask;
Expand All @@ -947,7 +947,7 @@ public async Task CanStartConnectionWithoutSettingTransferModeFeature()
await connection.DisposeAsync().OrTimeout();

mockTransport.Verify(t => t.StartAsync(
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>()), Times.Once);
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>(), It.IsAny<IConnection>()), Times.Once);
Assert.NotNull(transferModeFeature);
Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode);
}
Expand Down
Loading