-
Notifications
You must be signed in to change notification settings - Fork 448
Change the IConnection contract to be an IDuplexPipe #1446
Conversation
- Decorate the PipeReader and PipeWriter so the reference is always valid. - Make sure StopAsync waits for the Read loop to drain.
{ | ||
await registration.InvokeAsync(new byte[0]); | ||
} | ||
var connection = new Mock<IConnection>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @pakrym this is what it looks like to mock a pipe 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can implement completed IPipeAwaiter very easy, it would make this mock much cleaner.
Maybe we would end up with a complete mock pipe in a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -31,24 +32,24 @@ public class HttpConnection : IConnection | |||
private readonly object _stateChangeLock = new object(); | |||
|
|||
private volatile IDuplexPipe _transportChannel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename away from channel?
@@ -0,0 +1,55 @@ | |||
using System; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Header
@@ -0,0 +1,80 @@ | |||
using System; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Header
InvocationRequest irq; | ||
switch (message) | ||
{ | ||
case InvocationMessage invocation: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we get some new lines in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is literally copied from what was here before. Where do you want the newlines and does our other code look like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still need to go a little deeper into connection lifecycle/races etc. but I have some initial comments.
@@ -140,9 +141,13 @@ private void ResetTimeoutTimer() | |||
using (var memoryStream = new MemoryStream()) | |||
{ | |||
NegotiationProtocol.WriteMessage(new NegotiationMessage(_protocol.Name), memoryStream); | |||
await _connection.SendAsync(memoryStream.ToArray(), _connectionActive.Token); | |||
|
|||
// TODO: Pass the token when that's available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ | ||
foreach (var message in messages) | ||
while (true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you loop on !_connectionActive.IsCancellationRequested
? while(true)
makes me nervous, even when I feel confident it will exit :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a couple cancellation checks in the below code as well, maybe after every message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah. I’m going to do a pass removing all of the tokens. I hate the fact that it throws an exception for expected behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the pipelines loops look like this FWIW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate the fact that it throws an exception for expected behavior.
Only if you use ThrowIfCancellationRequested
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is the contract for cancellation tokens. The operation is cancelled via an exception. Pipelines has an alternative model that I plan to change to once all the code is moved.
private static readonly Action<ILogger, string, Exception> _sendInvocationCompleted = | ||
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(6, nameof(SendInvocationCompleted)), "Sending Invocation '{invocationId}' completed."); | ||
private static readonly Action<ILogger, string, HubMessage, Exception> _sendInvocationCompleted = | ||
LoggerMessage.Define<string, HubMessage>(LogLevel.Debug, new EventId(6, nameof(SendInvocationCompleted)), "Sending Invocation '{invocationId}' completed. {payload}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still super unsure about putting message payloads in logs. They potentially have sensitive information in them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea the current logging isn’t great but that’s a good point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do it in the JS client too. I'm filing an issue for that because I think this it's generally bad. Let's not add it here though. We can add in attribute about the message (type, probably even method name) but not the argument values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
public override void AdvanceTo(SequencePosition consumed) | ||
{ | ||
_connection._transportChannel.Input.AdvanceTo(consumed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will NRE if the connection hasn't been started. We should put in a check that provides a better error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t want to put checks in all of the methods here since some of performance critical. I’ll see what makes sense and update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
catch(NullReferenceException)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just add the check for now and we can fix it when we do moar performance.
{ | ||
await registration.InvokeAsync(new byte[0]); | ||
} | ||
var connection = new Mock<IConnection>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if maybe |
Agree @anurse but not as part of this change. It also means your paying for negotiation each time. But maybe that’s ok. |
|
||
void IDisposable.Dispose() | ||
{ | ||
// We want to hide this since it noops, it's temporary until we remove IDisposable from IDuplexPipe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
File this somewhere so it doesn't get overlooked later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assert.Equal("Cannot send messages when the connection is not in the Connected state.", exception.Message); | ||
}); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Remove new line
Only for WebSockets, for the other transports you have to pay that cost on reconnect anyway. In theory we could try to detect that WebSockets worked and just do a bare WebSocket connection on reconnect, but that would make transport fallback on reconnect tricky. I think the cost is minimal and acceptable. |
{ | ||
await _connection.StopAsync(); | ||
|
||
if (_readingTask != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you have to capture _readingTask
before calling _connection.StopAsync
otherwise someone could start a new connection in the closed callback and set _readingTask
for the new connection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, this part of the code is racy. I took a look last night and since the receive loop moved out of the place that had all of the locks and synchronization it's a bit messy.
I'll be back with something else |
This is a design change that we considered in the past but moved away from while we were discussiong messaging vs streams. We're firmly in the streams camp now so I think we should make this change (it also cleans and a bunch of things and introduces problems 😄 ).
Quirks:
No errors if the you write to the connection before it starts or after it is disposed. This is possible but requires wrapping the PipeReader and PipeWriter returned.(solved)The current implementation gives you a null pipe if you you don't call Start first, this can be fixed by making a wrapping PipeWriter and PipeReader.(solved)