Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Change the IConnection contract to be an IDuplexPipe #1446

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions samples/ClientSample/RawSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Http;
using System.Text;
Expand Down Expand Up @@ -37,13 +38,17 @@ public static async Task<int> ExecuteAsync(string baseUrl)

Console.WriteLine($"Connecting to {baseUrl}...");
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
Task reading = null;

try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));

await connection.StartAsync();

reading = ReadAsync(connection);

Console.WriteLine($"Connected to {baseUrl}");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += async (sender, a) =>
Expand All @@ -61,7 +66,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
break;
}

await connection.SendAsync(Encoding.UTF8.GetBytes(line), cts.Token);
await connection.Output.WriteAsync(Encoding.UTF8.GetBytes(line));
}
}
catch (AggregateException aex) when (aex.InnerExceptions.All(e => e is OperationCanceledException))
Expand All @@ -73,8 +78,40 @@ public static async Task<int> ExecuteAsync(string baseUrl)
finally
{
await connection.DisposeAsync();

if (reading != null)
{
await reading;
}
}
return 0;
}

private static async Task ReadAsync(HttpConnection connection)
{
while (true)
{
var result = await connection.Input.ReadAsync();
var buffer = result.Buffer;

try
{
if (!buffer.IsEmpty)
{
Console.WriteLine(Encoding.UTF8.GetString(buffer.ToArray()));
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
connection.Input.AdvanceTo(buffer.End);
}
}

connection.Input.Complete();
}
}
}
2 changes: 1 addition & 1 deletion samples/SocketsSample/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:59847/",
"applicationUrl": "http://localhost:57059/",
"sslPort": 0
}
},
Expand Down
141 changes: 99 additions & 42 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
Expand Down Expand Up @@ -38,6 +39,8 @@ public class HubConnection
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private CancellationTokenSource _connectionActive;

private Task _readingTask;

private int _nextId = 0;
private volatile bool _startCalled;
private Timer _timeoutTimer;
Expand Down Expand Up @@ -68,9 +71,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
_protocol = protocol;
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
_connection.Closed += e => Shutdown(e);

// Create the timer for timeout, but disabled by default (we enable it when started).
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
}
Expand Down Expand Up @@ -103,7 +104,7 @@ private void ResetTimeoutTimer()
// we don't need the timer anyway.
try
{
_timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
_timeoutTimer.Change(Debugger.IsAttached ? Timeout.InfiniteTimeSpan : ServerTimeout, Timeout.InfiniteTimeSpan);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -140,9 +141,13 @@ private async Task StartAsyncCore()
using (var memoryStream = new MemoryStream())
{
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token);

// TODO: Pass the token when that's available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a bug

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await _connection.Output.WriteAsync(memoryStream.ToArray());
}

_readingTask = StartReading();

ResetTimeoutTimer();
}

Expand All @@ -162,14 +167,24 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer

public async Task StopAsync() => await StopAsyncCore().ForceAsync();

private Task StopAsyncCore() => _connection.StopAsync();
private async Task StopAsyncCore()
{
await _connection.StopAsync();

if (_readingTask != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have to capture _readingTask before calling _connection.StopAsync otherwise someone could start a new connection in the closed callback and set _readingTask for the new connection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this part of the code is racy. I took a look last night and since the receive loop moved out of the place that had all of the locks and synchronization it's a bit messy.

{
await _readingTask;
}
}

public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();

private async Task DisposeAsyncCore()
{
await _connection.DisposeAsync();

await StopAsync();

// Dispose the timer AFTER shutting down the connection.
_timeoutTimer.Dispose();
}
Expand Down Expand Up @@ -296,10 +311,14 @@ private async Task SendHubMessage(HubInvocationMessage hubMessage, InvocationReq
try
{
var payload = _protocolReaderWriter.WriteMessage(hubMessage);
_logger.SendInvocation(hubMessage.InvocationId);
_logger.SendInvocation(hubMessage.InvocationId, hubMessage);

// TODO: Pass irq.CancellationToken when that's available
irq.CancellationToken.ThrowIfCancellationRequested();

await _connection.Output.WriteAsync(payload);

await _connection.SendAsync(payload, irq.CancellationToken);
_logger.SendInvocationCompleted(hubMessage.InvocationId);
_logger.SendInvocationCompleted(hubMessage.InvocationId, hubMessage);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -329,10 +348,13 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT
_logger.PreparingNonBlockingInvocation(methodName, args.Length);

var payload = _protocolReaderWriter.WriteMessage(invocationMessage);
_logger.SendInvocation(invocationMessage.InvocationId);
_logger.SendInvocation(invocationMessage.InvocationId, invocationMessage);

await _connection.SendAsync(payload, cancellationToken);
_logger.SendInvocationCompleted(invocationMessage.InvocationId);
// TODO: Pass the cancellationToken when that's available
cancellationToken.ThrowIfCancellationRequested();

await _connection.Output.WriteAsync(payload);
_logger.SendInvocationCompleted(invocationMessage.InvocationId, invocationMessage);
}
catch (Exception ex)
{
Expand All @@ -341,47 +363,82 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT
}
}

private async Task OnDataReceivedAsync(byte[] data)
private async Task StartReading()
{
ResetTimeoutTimer();
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))
try
{
foreach (var message in messages)
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you loop on !_connectionActive.IsCancellationRequested? while(true) makes me nervous, even when I feel confident it will exit :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a couple cancellation checks in the below code as well, maybe after every message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah. I’m going to do a pass removing all of the tokens. I hate the fact that it throws an exception for expected behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the pipelines loops look like this FWIW

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate the fact that it throws an exception for expected behavior.

Only if you use ThrowIfCancellationRequested

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the contract for cancellation tokens. The operation is cancelled via an exception. Pipelines has an alternative model that I plan to change to once all the code is moved.

{
InvocationRequest irq;
switch (message)
var result = await _connection.Input.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;
var examined = buffer.End;

try
{
case InvocationMessage invocation:
_logger.ReceivedInvocation(invocation.InvocationId, invocation.Target,
invocation.ArgumentBindingException != null ? null : invocation.Arguments);
await DispatchInvocationAsync(invocation, _connectionActive.Token);
break;
case CompletionMessage completion:
if (!TryRemoveInvocation(completion.InvocationId, out irq))
{
_logger.DropCompletionMessage(completion.InvocationId);
return;
}
DispatchInvocationCompletion(completion, irq);
irq.Dispose();
break;
case StreamItemMessage streamItem:
// Complete the invocation with an error, we don't support streaming (yet)
if (!TryGetInvocation(streamItem.InvocationId, out irq))
if (!buffer.IsEmpty)
{
ResetTimeoutTimer();

if (_protocolReaderWriter.ReadMessages(buffer, _binder, out var messages, out consumed, out examined))
{
_logger.DropStreamMessage(streamItem.InvocationId);
return;
foreach (var message in messages)
{
InvocationRequest irq;
switch (message)
{
case InvocationMessage invocation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we get some new lines in here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is literally copied from what was here before. Where do you want the newlines and does our other code look like that?

_logger.ReceivedInvocation(invocation.InvocationId, invocation.Target,
invocation.ArgumentBindingException != null ? null : invocation.Arguments);
await DispatchInvocationAsync(invocation, _connectionActive.Token);
break;
case CompletionMessage completion:
if (!TryRemoveInvocation(completion.InvocationId, out irq))
{
_logger.DropCompletionMessage(completion.InvocationId);
return;
}
DispatchInvocationCompletion(completion, irq);
irq.Dispose();
break;
case StreamItemMessage streamItem:
// Complete the invocation with an error, we don't support streaming (yet)
if (!TryGetInvocation(streamItem.InvocationId, out irq))
{
_logger.DropStreamMessage(streamItem.InvocationId);
return;
}
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
case PingMessage _:
// Nothing to do on receipt of a ping.
break;
default:
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
}
}
}
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
case PingMessage _:
// Nothing to do on receipt of a ping.

}
else if (result.IsCompleted)
{
break;
default:
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
}
}
finally
{
_connection.Input.AdvanceTo(consumed, examined);
}
}
}
catch (Exception ex)
{
_connection.Input.Complete(ex);
}
finally
{
_connection.Input.Complete();
}
}

private void Shutdown(Exception exception = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Linq;
using Microsoft.AspNetCore.SignalR.Internal.Protocol;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.SignalR.Client.Internal
Expand All @@ -22,11 +23,11 @@ internal static class SignalRClientLoggerExtensions
private static readonly Action<ILogger, string, string, string, string, Exception> _issueInvocation =
LoggerMessage.Define<string, string, string, string>(LogLevel.Trace, new EventId(4, nameof(IssueInvocation)), "Issuing Invocation '{invocationId}': {returnType} {methodName}({args}).");

private static readonly Action<ILogger, string, Exception> _sendInvocation =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(5, nameof(SendInvocation)), "Sending Invocation '{invocationId}'.");
private static readonly Action<ILogger, string, HubMessage, Exception> _sendInvocation =
LoggerMessage.Define<string, HubMessage>(LogLevel.Debug, new EventId(5, nameof(SendInvocation)), "Sending Invocation '{invocationId}'. {payload}");

private static readonly Action<ILogger, string, Exception> _sendInvocationCompleted =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(6, nameof(SendInvocationCompleted)), "Sending Invocation '{invocationId}' completed.");
private static readonly Action<ILogger, string, HubMessage, Exception> _sendInvocationCompleted =
LoggerMessage.Define<string, HubMessage>(LogLevel.Debug, new EventId(6, nameof(SendInvocationCompleted)), "Sending Invocation '{invocationId}' completed. {payload}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still super unsure about putting message payloads in logs. They potentially have sensitive information in them.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the current logging isn’t great but that’s a good point

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do it in the JS client too. I'm filing an issue for that because I think this it's generally bad. Let's not add it here though. We can add in attribute about the message (type, probably even method name) but not the argument values.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private static readonly Action<ILogger, string, Exception> _sendInvocationFailed =
LoggerMessage.Define<string>(LogLevel.Error, new EventId(7, nameof(SendInvocationFailed)), "Sending Invocation '{invocationId}' failed.");
Expand Down Expand Up @@ -147,14 +148,14 @@ public static void IssueInvocation(this ILogger logger, string invocationId, str
}
}

public static void SendInvocation(this ILogger logger, string invocationId)
public static void SendInvocation(this ILogger logger, string invocationId, HubMessage message)
{
_sendInvocation(logger, invocationId, null);
_sendInvocation(logger, invocationId, message, null);
}

public static void SendInvocationCompleted(this ILogger logger, string invocationId)
public static void SendInvocationCompleted(this ILogger logger, string invocationId, HubMessage message)
{
_sendInvocationCompleted(logger, invocationId, null);
_sendInvocationCompleted(logger, invocationId, message, null);
}

public static void SendInvocationFailed(this ILogger logger, string invocationId, Exception exception)
Expand Down
6 changes: 2 additions & 4 deletions src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;

namespace Microsoft.AspNetCore.Sockets.Client
{
public interface IConnection
public interface IConnection : IDuplexPipe
{
Task StartAsync();
Task SendAsync(byte[] data, CancellationToken cancellationToken);
Task StopAsync();
Task DisposeAsync();
Task AbortAsync(Exception ex);

IDisposable OnReceived(Func<byte[], object, Task> callback, object state);

event Action<Exception> Closed;

IFeatureCollection Features { get; }
Expand Down
Loading