Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Change the IConnection contract to be an IDuplexPipe #1446

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions samples/ClientSample/RawSample.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Http;
using System.Text;
Expand Down Expand Up @@ -37,13 +38,17 @@ public static async Task<int> ExecuteAsync(string baseUrl)

Console.WriteLine($"Connecting to {baseUrl}...");
var connection = new HttpConnection(new Uri(baseUrl), loggerFactory);
Task reading = null;

try
{
var closeTcs = new TaskCompletionSource<object>();
connection.Closed += e => closeTcs.SetResult(null);
connection.OnReceived(data => Console.Out.WriteLineAsync($"{Encoding.UTF8.GetString(data)}"));

await connection.StartAsync();

reading = ReadAsync(connection);

Console.WriteLine($"Connected to {baseUrl}");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += async (sender, a) =>
Expand All @@ -61,7 +66,7 @@ public static async Task<int> ExecuteAsync(string baseUrl)
break;
}

await connection.SendAsync(Encoding.UTF8.GetBytes(line), cts.Token);
await connection.Output.WriteAsync(Encoding.UTF8.GetBytes(line));
}
}
catch (AggregateException aex) when (aex.InnerExceptions.All(e => e is OperationCanceledException))
Expand All @@ -73,8 +78,40 @@ public static async Task<int> ExecuteAsync(string baseUrl)
finally
{
await connection.DisposeAsync();

if (reading != null)
{
await reading;
}
}
return 0;
}

private static async Task ReadAsync(HttpConnection connection)
{
while (true)
{
var result = await connection.Input.ReadAsync();
var buffer = result.Buffer;

try
{
if (!buffer.IsEmpty)
{
Console.WriteLine(Encoding.UTF8.GetString(buffer.ToArray()));
}
else if (result.IsCompleted)
{
break;
}
}
finally
{
connection.Input.AdvanceTo(buffer.End);
}
}

connection.Input.Complete();
}
}
}
2 changes: 1 addition & 1 deletion samples/SocketsSample/Properties/launchSettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:59847/",
"applicationUrl": "http://localhost:57059/",
"sslPort": 0
}
},
Expand Down
133 changes: 95 additions & 38 deletions src/Microsoft.AspNetCore.SignalR.Client.Core/HubConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Channels;
Expand Down Expand Up @@ -38,6 +39,8 @@ public class HubConnection
private readonly ConcurrentDictionary<string, List<InvocationHandler>> _handlers = new ConcurrentDictionary<string, List<InvocationHandler>>();
private CancellationTokenSource _connectionActive;

private Task _readingTask;

private int _nextId = 0;
private volatile bool _startCalled;
private Timer _timeoutTimer;
Expand Down Expand Up @@ -68,9 +71,7 @@ public HubConnection(IConnection connection, IHubProtocol protocol, ILoggerFacto
_protocol = protocol;
_loggerFactory = loggerFactory ?? NullLoggerFactory.Instance;
_logger = _loggerFactory.CreateLogger<HubConnection>();
_connection.OnReceived((data, state) => ((HubConnection)state).OnDataReceivedAsync(data), this);
_connection.Closed += e => Shutdown(e);

// Create the timer for timeout, but disabled by default (we enable it when started).
_timeoutTimer = new Timer(state => ((HubConnection)state).TimeoutElapsed(), this, Timeout.Infinite, Timeout.Infinite);
}
Expand Down Expand Up @@ -103,7 +104,7 @@ private void ResetTimeoutTimer()
// we don't need the timer anyway.
try
{
_timeoutTimer.Change(ServerTimeout, Timeout.InfiniteTimeSpan);
_timeoutTimer.Change(Debugger.IsAttached ? Timeout.InfiniteTimeSpan : ServerTimeout, Timeout.InfiniteTimeSpan);
}
catch (ObjectDisposedException)
{
Expand Down Expand Up @@ -140,9 +141,13 @@ private async Task StartAsyncCore()
using (var memoryStream = new MemoryStream())
{
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream);
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token);

// TODO: Pass the token when that's available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File a bug

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await _connection.Output.WriteAsync(memoryStream.ToArray());
}

_readingTask = StartReading();

ResetTimeoutTimer();
}

Expand All @@ -162,14 +167,24 @@ private IDataEncoder GetDataEncoder(TransferMode requestedTransferMode, Transfer

public async Task StopAsync() => await StopAsyncCore().ForceAsync();

private Task StopAsyncCore() => _connection.StopAsync();
private async Task StopAsyncCore()
{
await _connection.StopAsync();

if (_readingTask != null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have to capture _readingTask before calling _connection.StopAsync otherwise someone could start a new connection in the closed callback and set _readingTask for the new connection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, this part of the code is racy. I took a look last night and since the receive loop moved out of the place that had all of the locks and synchronization it's a bit messy.

{
await _readingTask;
}
}

public async Task DisposeAsync() => await DisposeAsyncCore().ForceAsync();

private async Task DisposeAsyncCore()
{
await _connection.DisposeAsync();

await StopAsync();

// Dispose the timer AFTER shutting down the connection.
_timeoutTimer.Dispose();
}
Expand Down Expand Up @@ -298,7 +313,11 @@ private async Task SendHubMessage(HubInvocationMessage hubMessage, InvocationReq
var payload = _protocolReaderWriter.WriteMessage(hubMessage);
_logger.SendInvocation(hubMessage.InvocationId);

await _connection.SendAsync(payload, irq.CancellationToken);
// TODO: Pass irq.CancellationToken when that's available
irq.CancellationToken.ThrowIfCancellationRequested();

await _connection.Output.WriteAsync(payload);

_logger.SendInvocationCompleted(hubMessage.InvocationId);
}
catch (Exception ex)
Expand Down Expand Up @@ -331,7 +350,10 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT
var payload = _protocolReaderWriter.WriteMessage(invocationMessage);
_logger.SendInvocation(invocationMessage.InvocationId);

await _connection.SendAsync(payload, cancellationToken);
// TODO: Pass the cancellationToken when that's available
cancellationToken.ThrowIfCancellationRequested();

await _connection.Output.WriteAsync(payload);
_logger.SendInvocationCompleted(invocationMessage.InvocationId);
}
catch (Exception ex)
Expand All @@ -341,47 +363,82 @@ private async Task SendAsyncCore(string methodName, object[] args, CancellationT
}
}

private async Task OnDataReceivedAsync(byte[] data)
private async Task StartReading()
{
ResetTimeoutTimer();
if (_protocolReaderWriter.ReadMessages(data, _binder, out var messages))
try
{
foreach (var message in messages)
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you loop on !_connectionActive.IsCancellationRequested? while(true) makes me nervous, even when I feel confident it will exit :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a couple cancellation checks in the below code as well, maybe after every message

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah. I’m going to do a pass removing all of the tokens. I hate the fact that it throws an exception for expected behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the pipelines loops look like this FWIW

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate the fact that it throws an exception for expected behavior.

Only if you use ThrowIfCancellationRequested

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the contract for cancellation tokens. The operation is cancelled via an exception. Pipelines has an alternative model that I plan to change to once all the code is moved.

{
InvocationRequest irq;
switch (message)
var result = await _connection.Input.ReadAsync();
var buffer = result.Buffer;
var consumed = buffer.End;
var examined = buffer.End;

try
{
case InvocationMessage invocation:
_logger.ReceivedInvocation(invocation.InvocationId, invocation.Target,
invocation.ArgumentBindingException != null ? null : invocation.Arguments);
await DispatchInvocationAsync(invocation, _connectionActive.Token);
break;
case CompletionMessage completion:
if (!TryRemoveInvocation(completion.InvocationId, out irq))
{
_logger.DropCompletionMessage(completion.InvocationId);
return;
}
DispatchInvocationCompletion(completion, irq);
irq.Dispose();
break;
case StreamItemMessage streamItem:
// Complete the invocation with an error, we don't support streaming (yet)
if (!TryGetInvocation(streamItem.InvocationId, out irq))
if (!buffer.IsEmpty)
{
ResetTimeoutTimer();

if (_protocolReaderWriter.ReadMessages(buffer, _binder, out var messages, out consumed, out examined))
{
_logger.DropStreamMessage(streamItem.InvocationId);
return;
foreach (var message in messages)
{
InvocationRequest irq;
switch (message)
{
case InvocationMessage invocation:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we get some new lines in here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is literally copied from what was here before. Where do you want the newlines and does our other code look like that?

_logger.ReceivedInvocation(invocation.InvocationId, invocation.Target,
invocation.ArgumentBindingException != null ? null : invocation.Arguments);
await DispatchInvocationAsync(invocation, _connectionActive.Token);
break;
case CompletionMessage completion:
if (!TryRemoveInvocation(completion.InvocationId, out irq))
{
_logger.DropCompletionMessage(completion.InvocationId);
return;
}
DispatchInvocationCompletion(completion, irq);
irq.Dispose();
break;
case StreamItemMessage streamItem:
// Complete the invocation with an error, we don't support streaming (yet)
if (!TryGetInvocation(streamItem.InvocationId, out irq))
{
_logger.DropStreamMessage(streamItem.InvocationId);
return;
}
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
case PingMessage _:
// Nothing to do on receipt of a ping.
break;
default:
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
}
}
}
DispatchInvocationStreamItemAsync(streamItem, irq);
break;
case PingMessage _:
// Nothing to do on receipt of a ping.

}
else if (result.IsCompleted)
{
break;
default:
throw new InvalidOperationException($"Unexpected message type: {message.GetType().FullName}");
}
}
finally
{
_connection.Input.AdvanceTo(consumed, examined);
}
}
}
catch (Exception ex)
{
_connection.Input.Complete(ex);
}
finally
{
_connection.Input.Complete();
}
}

private void Shutdown(Exception exception = null)
Expand Down
6 changes: 2 additions & 4 deletions src/Microsoft.AspNetCore.Sockets.Abstractions/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http.Features;

namespace Microsoft.AspNetCore.Sockets.Client
{
public interface IConnection
public interface IConnection : IDuplexPipe
{
Task StartAsync();
Task SendAsync(byte[] data, CancellationToken cancellationToken);
Task StopAsync();
Task DisposeAsync();
Task AbortAsync(Exception ex);

IDisposable OnReceived(Func<byte[], object, Task> callback, object state);

event Action<Exception> Closed;

IFeatureCollection Features { get; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.IO.Pipelines;
using System.Threading;

namespace Microsoft.AspNetCore.Sockets.Client
{
public partial class HttpConnection
{
private class HttpConnectionPipeReader : PipeReader
{
private readonly HttpConnection _connection;

public HttpConnectionPipeReader(HttpConnection connection)
{
_connection = connection;
}

public override void AdvanceTo(SequencePosition consumed)
{
_connection._transportChannel.Input.AdvanceTo(consumed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will NRE if the connection hasn't been started. We should put in a check that provides a better error message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t want to put checks in all of the methods here since some of performance critical. I’ll see what makes sense and update.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catch(NullReferenceException) :trollface:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just add the check for now and we can fix it when we do moar performance.

}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_connection._transportChannel.Input.AdvanceTo(consumed, examined);
}

public override void CancelPendingRead()
{
_connection._transportChannel.Input.CancelPendingRead();
}

public override void Complete(Exception exception = null)
{
_connection._transportChannel.Input.Complete(exception);
}

public override void OnWriterCompleted(Action<Exception, object> callback, object state)
{
_connection._transportChannel.Input.OnWriterCompleted(callback, state);
}

public override ValueAwaiter<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
return _connection._transportChannel.Input.ReadAsync(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
return _connection._transportChannel.Input.TryRead(out result);
}
}
}
}
Loading