-
Notifications
You must be signed in to change notification settings - Fork 447
Making HttpConnection restartable (C#) #1147
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,16 +34,16 @@ public class HubConnection | |
| private HubProtocolReaderWriter _protocolReaderWriter; | ||
|
|
||
| private readonly object _pendingCallsLock = new object(); | ||
| private readonly CancellationTokenSource _connectionActive = new CancellationTokenSource(); | ||
| private readonly Dictionary<string, InvocationRequest> _pendingCalls = new Dictionary<string, InvocationRequest>(); | ||
| private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>(); | ||
| private CancellationTokenSource _connectionActive; | ||
|
|
||
| private int _nextId = 0; | ||
| private volatile bool _startCalled; | ||
| private Timer _timeoutTimer; | ||
| private bool _needKeepAlive; | ||
|
|
||
| public Task Closed { get; } | ||
| public event Action<Exception> Closed; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be a wrapper around
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No. This way the only event that is hooked to HttpConnection is the one from hubConnection and the user subscribes to the object they are using. |
||
|
|
||
| /// <summary> | ||
| /// Gets or sets the server timeout interval for the connection. Changes to this value | ||
|
|
@@ -69,11 +69,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto | |
| _loggerFactory = loggerFactory ?? NullLoggerFactory.Instance; | ||
| _logger = _loggerFactory.CreateLogger<HubConnection>(); | ||
| _connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this); | ||
| Closed = _connection.Closed.ContinueWith(task => | ||
| { | ||
| Shutdown(task.Exception); | ||
| return task; | ||
| }).Unwrap(); | ||
| _connection.Closed += e => Shutdown(e); | ||
|
|
||
| // Create the timer for timeout, but disabled by default (we enable it when started). | ||
| _timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite); | ||
|
|
@@ -122,12 +118,14 @@ private async Task StartAsyncCore() | |
| transferModeFeature.TransferMode = requestedTransferMode; | ||
| await _connection.StartAsync(); | ||
| _needKeepAlive = _connection.Features.Get<IConnectionInherentKeepAliveFeature>() == null; | ||
|
|
||
| var actualTransferMode = transferModeFeature.TransferMode; | ||
|
|
||
| _protocolReaderWriter = new HubProtocolReaderWriter(_protocol, GetDataEncoder(requestedTransferMode, actualTransferMode)); | ||
|
|
||
| _logger.HubProtocol(_protocol.Name); | ||
|
|
||
| _connectionActive = new CancellationTokenSource(); | ||
| using (var memoryStream = new MemoryStream()) | ||
| { | ||
| NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream); | ||
|
|
@@ -151,13 +149,16 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer | |
| return new PassThroughEncoder(); | ||
| } | ||
|
|
||
| public async Task StopAsync() => await StopAsyncCore().ForceAsync(); | ||
|
|
||
| private Task StopAsyncCore() => _connection.StopAsync(); | ||
|
|
||
| public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync(); | ||
|
|
||
| private async Task DisposeAsyncCore() | ||
| { | ||
| _timeoutTimer.Dispose(); | ||
| await _connection.DisposeAsync(); | ||
| await Closed; | ||
| } | ||
|
|
||
| // TODO: Client return values/tasks? | ||
|
|
@@ -370,12 +371,12 @@ private async Task OnDataReceivedAsync(byte[] data) | |
| } | ||
| } | ||
|
|
||
| private void Shutdown(Exception ex = null) | ||
| private void Shutdown(Exception exception = null) | ||
| { | ||
| _logger.ShutdownConnection(); | ||
| if (ex != null) | ||
| if (exception != null) | ||
| { | ||
| _logger.ShutdownWithError(ex); | ||
| _logger.ShutdownWithError(exception); | ||
| } | ||
|
|
||
| lock (_pendingCallsLock) | ||
|
|
@@ -388,14 +389,23 @@ private void Shutdown(Exception ex = null) | |
| foreach (var outstandingCall in _pendingCalls.Values) | ||
| { | ||
| _logger.RemoveInvocation(outstandingCall.InvocationId); | ||
| if (ex != null) | ||
| if (exception != null) | ||
| { | ||
| outstandingCall.Fail(ex); | ||
| outstandingCall.Fail(exception); | ||
| } | ||
| outstandingCall.Dispose(); | ||
| } | ||
| _pendingCalls.Clear(); | ||
| } | ||
|
|
||
| try | ||
| { | ||
| Closed?.Invoke(exception); | ||
| } | ||
| catch (Exception ex) | ||
| { | ||
| _logger.ErrorDuringClosedEvent(ex); | ||
| } | ||
| } | ||
|
|
||
| private async Task DispatchInvocationAsync(InvocationMessage invocation, CancellationToken cancellationToken) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,12 +12,13 @@ public interface IConnection | |
| { | ||
| Task StartAsync(); | ||
| Task SendAsync(byte[] data, CancellationToken cancellationToken); | ||
| Task StopAsync(); | ||
| Task DisposeAsync(); | ||
| Task AbortAsync(Exception ex); | ||
|
|
||
| IDisposable OnReceived(Func<byte[], object, Task> callback, object state); | ||
|
|
||
| Task Closed { get; } | ||
| event Action<Exception> Closed; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Used to be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure we want it (and what we had was not implemented correctly). I think we need to try which pattern works better for restarting the connection from this event. |
||
|
|
||
| IFeatureCollection Features { get; } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need to return here? Otherwise this method will never return... or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value was changed from
Task<HubConnection>toTaskThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes indeed, but that still means you need a return statement right? Just without the connection...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, unless you're using async await
https://docs.microsoft.com/en-us/dotnet/csharp/language-reference/keywords/await
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand it, the await keyword will just wait for the execution of StartAsync() to finish before continuing with the method. It will not return from the method. As this is inside a while(true) loop, I think it will never return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What @fretje means is that this is an infinite loop:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@acjh Thanks! Yes you guys are right, this is broken. Would either of you like to make a PR to fix it?