Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Commit a833006

Browse files
authored
Implement #1157 by adding client timeout for C# client (#1165)
1 parent 15c3bca commit a833006

File tree

17 files changed

+151
-51
lines changed

17 files changed

+151
-51
lines changed

src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ namespace Microsoft.AspNetCore.SignalR.Client
2424
{
2525
public class HubConnection
2626
{
27+
public static readonly TimeSpan DefaultServerTimeout = TimeSpan.FromSeconds(30); // Server ping rate is 15 sec, this is 2 times that.
28+
2729
private readonly ILoggerFactory _loggerFactory;
2830
private readonly ILogger _logger;
2931
private readonly IConnection _connection;
@@ -38,9 +40,17 @@ public class HubConnection
3840

3941
private int _nextId = 0;
4042
private volatile bool _startCalled;
43+
private Timer _timeoutTimer;
44+
private bool _needKeepAlive;
4145

4246
public Task Closed { get; }
4347

48+
/// <summary>
49+
/// Gets or sets the server timeout interval for the connection. Changes to this value
50+
/// will not be applied until the Keep Alive timer is next reset.
51+
/// </summary>
52+
public TimeSpan ServerTimeout { get; set; } = DefaultServerTimeout;
53+
4454
public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFactory loggerFactory)
4555
{
4656
if (connection == null)
@@ -64,6 +74,9 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
6474
Shutdown(task.Exception);
6575
return task;
6676
}).Unwrap();
77+
78+
// Create the timer for timeout, but disabled by default (we enable it when started).
79+
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
6780
}
6881

6982
public async Task StartAsync()
@@ -78,6 +91,20 @@ public async Task StartAsync()
7891
}
7992
}
8093

94+
private void TimeoutElapsed()
95+
{
96+
_connection.AbortAsync(new TimeoutException($"Server timeout ({ServerTimeout.TotalMilliseconds:0.00}ms) elapsed without receiving a message from the server."));
97+
}
98+
99+
private void ResetTimeoutTimer()
100+
{
101+
if (_needKeepAlive)
102+
{
103+
_logger.ResettingKeepAliveTimer();
104+
_timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
105+
}
106+
}
107+
81108
private async Task StartAsyncCore()
82109
{
83110
var transferModeFeature = _connection.Features.Get<ITransferModeFeature>();
@@ -94,6 +121,7 @@ private async Task StartAsyncCore()
94121

95122
transferModeFeature.TransferMode = requestedTransferMode;
96123
await _connection.StartAsync();
124+
_needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null;
97125
var actualTransferMode = transferModeFeature.TransferMode;
98126

99127
_protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode));
@@ -105,6 +133,8 @@ private async Task StartAsyncCore()
105133
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
106134
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token);
107135
}
136+
137+
ResetTimeoutTimer();
108138
}
109139

110140
private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, TransferMode actualTransferMode)
@@ -125,6 +155,7 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer
125155

126156
private async Task DisposeAsyncCore()
127157
{
158+
_timeoutTimer.Dispose();
128159
await _connection.DisposeAsync();
129160
await Closed;
130161
}
@@ -298,6 +329,7 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT
298329

299330
private async Task OnDataReceivedAsync(byte[] data)
300331
{
332+
ResetTimeoutTimer();
301333
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))
302334
{
303335
foreach (var message in messages)

src/Microsoft.AspNetCore.SignalR.Client.Core/Internal/SignalRClientLoggerExtensions.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ internal static class SignalRClientLoggerExtensions
8585
private static readonly Action<ILogger, string, string, string, int, Exception> _preparingStreamingInvocation =
8686
LoggerMessage.Define<string, string, string, int>(LogLevel.Trace, new EventId(24, nameof(PreparingStreamingInvocation)), "Preparing streaming invocation '{invocationId}' of '{target}', with return type '{returnType}' and {argumentCount} argument(s).");
8787

88+
private static readonly Action<ILogger, Exception> _resettingKeepAliveTimer =
89+
LoggerMessage.Define(LogLevel.Trace, new EventId(25, nameof(ResettingKeepAliveTimer)), "Resetting keep-alive timer, received a message from the server.");
90+
8891
// Category: Streaming and NonStreaming
8992
private static readonly Action<ILogger, string, Exception> _invocationCreated =
9093
LoggerMessage.Define<string>(LogLevel.Trace, new EventId(0, nameof(InvocationCreated)), "Invocation {invocationId} created.");
@@ -282,7 +285,12 @@ public static void StreamItemOnNonStreamInvocation(this ILogger logger, string i
282285

283286
public static void ErrorInvokingClientSideMethod(this ILogger logger, string methodName, Exception exception)
284287
{
285-
_errorInvokingClientSideMethod(logger, methodName, exception);
288+
_errorInvokingClientSideMethod(logger, methodName, exception);
289+
}
290+
291+
public static void ResettingKeepAliveTimer(this ILogger logger)
292+
{
293+
_resettingKeepAliveTimer(logger, null);
286294
}
287295
}
288296
}

src/Microsoft.AspNetCore.SignalR.Client.Core/Microsoft.AspNetCore.SignalR.Client.Core.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
<PropertyGroup>
44
<Description>Client for ASP.NET Core SignalR</Description>
55
<TargetFramework>netstandard2.0</TargetFramework>
6+
<RootNamespace>Microsoft.AspNetCore.SignalR.Client</RootNamespace>
67
</PropertyGroup>
78

89
<ItemGroup>

src/Microsoft.AspNetCore.SignalR.Common/Internal/Protocol/HubProtocolConstants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
namespace Microsoft.AspNetCore.SignalR.Internal.Protocol
55
{
6-
internal static class HubProtocolConstants
6+
public static class HubProtocolConstants
77
{
88
public const int InvocationMessageType = 1;
99
public const int StreamItemMessageType = 2;

src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) .NET Foundation. All rights reserved.
1+
// Copyright (c) .NET Foundation. All rights reserved.
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
@@ -13,6 +13,7 @@ public interface IConnection
1313
Task StartAsync();
1414
Task SendAsync(byte[] data, CancellationToken cancellationToken);
1515
Task DisposeAsync();
16+
Task AbortAsync(Exception ex);
1617

1718
IDisposable OnReceived(Func<byte[], object, Task> callback, object state);
1819

src/Microsoft.AspNetCore.Sockets.Client.Http/HttpConnection.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public HttpConnection(Uri url, ITransportFactory transportFactory, ILoggerFactor
9595
_logger = _loggerFactory.CreateLogger<HttpConnection>();
9696
_httpOptions = httpOptions;
9797
_httpClient = _httpOptions?.HttpMessageHandler == null ? new HttpClient() : new HttpClient(_httpOptions?.HttpMessageHandler);
98-
_httpClient.Timeout = HttpClientTimeout;
98+
_httpClient.Timeout = HttpClientTimeout;
9999
_transportFactory = transportFactory ?? throw new ArgumentNullException(nameof(transportFactory));
100100
}
101101

@@ -303,7 +303,7 @@ private async Task StartTransport(Uri connectUrl)
303303
// Start the transport, giving it one end of the pipeline
304304
try
305305
{
306-
await _transport.StartAsync(connectUrl, applicationSide, requestedTransferMode: GetTransferMode(), connectionId: _connectionId);
306+
await _transport.StartAsync(connectUrl, applicationSide, GetTransferMode(), _connectionId, this);
307307

308308
// actual transfer mode can differ from the one that was requested so set it on the feature
309309
Debug.Assert(_transport.Mode.HasValue, "transfer mode not set after transport started");
@@ -435,11 +435,25 @@ private async Task SendAsyncCore(byte[] data, CancellationToken cancellationToke
435435
}
436436
}
437437

438+
public async Task AbortAsync(Exception ex) => await DisposeAsyncCore(ex ?? new InvalidOperationException("Connection aborted")).ForceAsync();
439+
438440
public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();
439441

440-
private async Task DisposeAsyncCore()
442+
private async Task DisposeAsyncCore(Exception ex = null)
441443
{
442-
_logger.StoppingClient(_connectionId);
444+
if (ex != null)
445+
{
446+
_logger.AbortingClient(_connectionId, ex);
447+
448+
// Immediately fault the close task. When the transport shuts down,
449+
// it will trigger the close task to be completed, so we want it to be
450+
// marked faulted before that happens
451+
_closedTcs.TrySetException(ex);
452+
}
453+
else
454+
{
455+
_logger.StoppingClient(_connectionId);
456+
}
443457

444458
if (Interlocked.Exchange(ref _connectionState, ConnectionState.Disconnected) == ConnectionState.Initial)
445459
{
@@ -472,6 +486,7 @@ private async Task DisposeAsyncCore()
472486
await _receiveLoopTask;
473487
}
474488

489+
// If we haven't already done so, trigger the Closed task.
475490
_closedTcs.TrySetResult(null);
476491
_httpClient?.Dispose();
477492
}

src/Microsoft.AspNetCore.Sockets.Client.Http/ITransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace Microsoft.AspNetCore.Sockets.Client
99
{
1010
public interface ITransport
1111
{
12-
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId);
12+
Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection);
1313
Task StopAsync();
1414
TransferMode? Mode { get; }
1515
}

src/Microsoft.AspNetCore.Sockets.Client.Http/Internal/SocketClientLoggerExtensions.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ internal static class SocketClientLoggerExtensions
153153
private static readonly Action<ILogger, DateTime, string, string, Exception> _exceptionThrownFromCallback =
154154
LoggerMessage.Define<DateTime, string, string>(LogLevel.Error, new EventId(19, nameof(ExceptionThrownFromCallback)), "{time}: Connection Id {connectionId}: An exception was thrown from the '{callback}' callback");
155155

156+
private static readonly Action<ILogger, DateTime, string, Exception> _abortingClient =
157+
LoggerMessage.Define<DateTime, string>(LogLevel.Error, new EventId(20, nameof(AbortingClient)), "{time}: Connection Id {connectionId}: Aborting client.");
158+
156159

157160
public static void StartTransport(this ILogger logger, string connectionId, TransferMode transferMode)
158161
{
@@ -506,6 +509,14 @@ public static void SendingMessage(this ILogger logger, string connectionId)
506509
}
507510
}
508511

512+
public static void AbortingClient(this ILogger logger, string connectionId, Exception ex)
513+
{
514+
if (logger.IsEnabled(LogLevel.Error))
515+
{
516+
_abortingClient(logger, DateTime.Now, connectionId, ex);
517+
}
518+
}
519+
509520
public static void StoppingClient(this ILogger logger, string connectionId)
510521
{
511522
if (logger.IsEnabled(LogLevel.Information))

src/Microsoft.AspNetCore.Sockets.Client.Http/LongPollingTransport.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5-
using System.Collections.Generic;
65
using System.Net;
76
using System.Net.Http;
87
using System.Threading;
9-
using System.Threading.Tasks;
108
using System.Threading.Channels;
9+
using System.Threading.Tasks;
1110
using Microsoft.AspNetCore.Sockets.Client.Http;
1211
using Microsoft.AspNetCore.Sockets.Client.Internal;
12+
using Microsoft.AspNetCore.Sockets.Features;
1313
using Microsoft.Extensions.Logging;
1414
using Microsoft.Extensions.Logging.Abstractions;
1515

@@ -42,13 +42,15 @@ public LongPollingTransport(HttpClient httpClient, HttpOptions httpOptions, ILog
4242
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<LongPollingTransport>();
4343
}
4444

45-
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
45+
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
4646
{
4747
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
4848
{
4949
throw new ArgumentException("Invalid transfer mode.", nameof(requestedTransferMode));
5050
}
5151

52+
connection.Features.Set<IConnectionInherentKeepAliveFeature>(new ConnectionInherentKeepAliveFeature(_httpClient.Timeout));
53+
5254
_application = application;
5355
Mode = requestedTransferMode;
5456
_connectionId = connectionId;

src/Microsoft.AspNetCore.Sockets.Client.Http/ServerSentEventsTransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public ServerSentEventsTransport(HttpClient httpClient, HttpOptions httpOptions,
4949
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<ServerSentEventsTransport>();
5050
}
5151

52-
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
52+
public Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
5353
{
5454
if (requestedTransferMode != TransferMode.Binary && requestedTransferMode != TransferMode.Text)
5555
{

src/Microsoft.AspNetCore.Sockets.Client.Http/WebSocketsTransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public WebSocketsTransport(HttpOptions httpOptions, ILoggerFactory loggerFactory
5555
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketsTransport>();
5656
}
5757

58-
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId)
58+
public async Task StartAsync(Uri url, Channel<byte[], SendMessage> application, TransferMode requestedTransferMode, string connectionId, IConnection connection)
5959
{
6060
if (url == null)
6161
{

test/Microsoft.AspNetCore.SignalR.Client.Tests/HttpConnectionTests.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public async Task CanStopStartingConnection()
155155
releaseDisposeTcs.SetResult(null);
156156
await disposeTask.OrTimeout();
157157

158-
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()), Times.Never);
158+
transport.Verify(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()), Times.Never);
159159
}
160160

161161
[Fact]
@@ -263,8 +263,8 @@ public async Task ReceivedCallbackNotRaisedAfterConnectionIsDisposed()
263263

264264
var mockTransport = new Mock<ITransport>();
265265
Channel<byte[], SendMessage> channel = null;
266-
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
267-
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
266+
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
267+
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
268268
{
269269
channel = c;
270270
return Task.CompletedTask;
@@ -311,8 +311,8 @@ public async Task EventsAreNotRunningOnMainLoop()
311311

312312
var mockTransport = new Mock<ITransport>();
313313
Channel<byte[], SendMessage> channel = null;
314-
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
315-
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
314+
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
315+
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
316316
{
317317
channel = c;
318318
return Task.CompletedTask;
@@ -368,8 +368,8 @@ public async Task EventQueueTimeout()
368368

369369
var mockTransport = new Mock<ITransport>();
370370
Channel<byte[], SendMessage> channel = null;
371-
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
372-
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
371+
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
372+
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
373373
{
374374
channel = c;
375375
return Task.CompletedTask;
@@ -413,8 +413,8 @@ public async Task EventQueueTimeoutWithException()
413413

414414
var mockTransport = new Mock<ITransport>();
415415
Channel<byte[], SendMessage> channel = null;
416-
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
417-
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
416+
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
417+
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
418418
{
419419
channel = c;
420420
return Task.CompletedTask;
@@ -925,8 +925,8 @@ public async Task CanStartConnectionWithoutSettingTransferModeFeature()
925925

926926
var mockTransport = new Mock<ITransport>();
927927
Channel<byte[], SendMessage> channel = null;
928-
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>()))
929-
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string>((url, c, transferMode, connectionId) =>
928+
mockTransport.Setup(t => t.StartAsync(It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), It.IsAny<TransferMode>(), It.IsAny<string>(), It.IsAny<IConnection>()))
929+
.Returns<Uri, Channel<byte[], SendMessage>, TransferMode, string, IConnection>((url, c, transferMode, connectionId, _) =>
930930
{
931931
channel = c;
932932
return Task.CompletedTask;
@@ -947,7 +947,7 @@ public async Task CanStartConnectionWithoutSettingTransferModeFeature()
947947
await connection.DisposeAsync().OrTimeout();
948948

949949
mockTransport.Verify(t => t.StartAsync(
950-
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>()), Times.Once);
950+
It.IsAny<Uri>(), It.IsAny<Channel<byte[], SendMessage>>(), TransferMode.Text, It.IsAny<string>(), It.IsAny<IConnection>()), Times.Once);
951951
Assert.NotNull(transferModeFeature);
952952
Assert.Equal(TransferMode.Binary, transferModeFeature.TransferMode);
953953
}

0 commit comments

Comments
 (0)