Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 34 additions & 12 deletions src/Servers/Kestrel/Core/src/Adapter/Internal/AdaptedPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.Extensions.Logging;

Expand All @@ -14,21 +15,24 @@ internal class AdaptedPipeline : IDuplexPipe
{
private readonly int _minAllocBufferSize;

private readonly IDuplexPipe _transport;
private Task _inputTask;
private Task _outputTask;

public AdaptedPipeline(IDuplexPipe transport,
Pipe inputPipe,
Pipe outputPipe,
IKestrelTrace log,
int minAllocBufferSize)
{
_transport = transport;
TransportStream = new RawStream(transport.Input, transport.Output, throwOnCancelled: true);
Input = inputPipe;
Output = outputPipe;
Log = log;
_minAllocBufferSize = minAllocBufferSize;
}

public RawStream TransportStream { get; }

public Pipe Input { get; }

public Pipe Output { get; }
Expand All @@ -39,13 +43,31 @@ public AdaptedPipeline(IDuplexPipe transport,

PipeWriter IDuplexPipe.Output => Output.Writer;

public async Task RunAsync(Stream stream)
public void RunAsync(Stream stream)
{
_inputTask = ReadInputAsync(stream);
_outputTask = WriteOutputAsync(stream);
}

public async Task CompleteAsync()
{
var inputTask = ReadInputAsync(stream);
var outputTask = WriteOutputAsync(stream);
Output.Writer.Complete();
Input.Reader.Complete();

if (_outputTask == null)
{
return;
}

// Wait for the output task to complete, this ensures that we've copied
// the application data to the underlying stream
await _outputTask;

// Cancel the underlying stream so that the input task yields
TransportStream.CancelPendingRead();

await inputTask;
await outputTask;
// The input task should yield now that we've cancelled it
await _inputTask;
}

private async Task WriteOutputAsync(Stream stream)
Expand Down Expand Up @@ -97,7 +119,6 @@ private async Task WriteOutputAsync(Stream stream)
finally
{
Output.Reader.Complete();
_transport.Output.Complete();
}
}

Expand All @@ -115,7 +136,6 @@ private async Task ReadInputAsync(Stream stream)

while (true)
{

var outputBuffer = Input.Writer.GetMemory(_minAllocBufferSize);
var bytesRead = await stream.ReadAsync(outputBuffer);
Input.Writer.Advance(bytesRead);
Expand All @@ -134,6 +154,11 @@ private async Task ReadInputAsync(Stream stream)
}
}
}
catch (OperationCanceledException ex)
{
// Propagate the exception if it's ConnectionAbortedException
error = ex as ConnectionAbortedException;
}
catch (Exception ex)
{
// Don't rethrow the exception. It should be handled by the Pipeline consumer.
Expand All @@ -142,9 +167,6 @@ private async Task ReadInputAsync(Stream stream)
finally
{
Input.Writer.Complete(error);
// The application could have ended the input pipe so complete
// the transport pipe as well
_transport.Input.Complete();
}
}
}
Expand Down
30 changes: 23 additions & 7 deletions src/Servers/Kestrel/Core/src/Adapter/Internal/RawStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,20 @@ internal sealed class RawStream : Stream
{
private readonly PipeReader _input;
private readonly PipeWriter _output;
private readonly bool _throwOnCancelled;
private volatile bool _cancelCalled;

public RawStream(PipeReader input, PipeWriter output)
public RawStream(PipeReader input, PipeWriter output, bool throwOnCancelled = false)
{
_input = input;
_output = output;
_throwOnCancelled = throwOnCancelled;
}

public void CancelPendingRead()
{
_cancelCalled = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does CancelPendingRead need to be synchronized?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for our usage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't even think _cancelCalled needs to be volatile since CancelPendingRead acquires and releases the Pipe's _sync lock right afterwards.

_input.CancelPendingRead();
}

public override bool CanRead => true;
Expand Down Expand Up @@ -61,17 +70,17 @@ public override int Read(byte[] buffer, int offset, int count)
{
// ValueTask uses .GetAwaiter().GetResult() if necessary
// https://github.com/dotnet/corefx/blob/f9da3b4af08214764a51b2331f3595ffaf162abe/src/System.Threading.Tasks.Extensions/src/System/Threading/Tasks/ValueTask.cs#L156
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).Result;
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), default).Result;
}

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count)).AsTask();
return ReadAsyncInternal(new Memory<byte>(buffer, offset, count), cancellationToken).AsTask();
}

public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
{
return ReadAsyncInternal(destination);
return ReadAsyncInternal(destination, cancellationToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we're flowing the cancellationToken through RawStream, do we have a regression test?

}

public override void Write(byte[] buffer, int offset, int count)
Expand Down Expand Up @@ -105,14 +114,21 @@ public override Task FlushAsync(CancellationToken cancellationToken)
return WriteAsync(null, 0, 0, cancellationToken);
}

private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination)
private async ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
while (true)
{
var result = await _input.ReadAsync();
var result = await _input.ReadAsync(cancellationToken);
var readableBuffer = result.Buffer;
try
{
if (_throwOnCancelled && result.IsCanceled && _cancelCalled)
{
// Reset the bool
_cancelCalled = false;
throw new OperationCanceledException();
}

if (!readableBuffer.IsEmpty)
{
// buffer.Count is int
Expand Down
2 changes: 0 additions & 2 deletions src/Servers/Kestrel/Core/src/Internal/Http/Http1Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public Http1Connection(HttpConnectionContext context)

protected override void OnRequestProcessingEnded()
{
Input.Complete();

TimeoutControl.StartDrainTimeout(MinResponseDataRate, ServerOptions.Limits.MaxResponseBufferSize);

// Prevent RequestAborted from firing. Free up unneeded feature references.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public Http1OutputProducer(
_memoryPool = memoryPool;
}

// For tests
internal PipeWriter PipeWriter => _pipeWriter;

public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
Expand Down Expand Up @@ -402,7 +405,6 @@ private void CompletePipe()
_log.ConnectionDisconnect(_connectionId);
_pipeWriterCompleted = true;
_completed = true;
_pipeWriter.Complete();
}
}

Expand Down
32 changes: 12 additions & 20 deletions src/Servers/Kestrel/Core/src/Internal/HttpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> http
try
{
AdaptedPipeline adaptedPipeline = null;
var adaptedPipelineTask = Task.CompletedTask;

// _adaptedTransport must be set prior to wiring up callbacks
// to allow the connection to be aborted prior to protocol selection.
Expand Down Expand Up @@ -120,8 +119,8 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> http
if (adaptedPipeline != null)
{
// Stream can be null here and run async will close the connection in that case
var stream = await ApplyConnectionAdaptersAsync();
adaptedPipelineTask = adaptedPipeline.RunAsync(stream);
var stream = await ApplyConnectionAdaptersAsync(adaptedPipeline.TransportStream);
adaptedPipeline.RunAsync(stream);
}

IRequestProcessor requestProcessor = null;
Expand Down Expand Up @@ -160,20 +159,19 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> http
}
}

_context.Transport.Input.OnWriterCompleted(
(_, state) => ((HttpConnection)state).OnInputOrOutputCompleted(),
this);
var closedRegistration = _context.ConnectionContext.ConnectionClosed.Register(state => ((HttpConnection)state).OnInputOrOutputCompleted(), this);

_context.Transport.Output.OnReaderCompleted(
(_, state) => ((HttpConnection)state).OnInputOrOutputCompleted(),
this);

if (requestProcessor != null)
// We don't care about callbacks once all requests are processed
using (closedRegistration)
{
await requestProcessor.ProcessRequestsAsync(httpApplication);
if (requestProcessor != null)
{
await requestProcessor.ProcessRequestsAsync(httpApplication);
}
}

await adaptedPipelineTask;
// Complete the pipeline after the method runs
await (adaptedPipeline?.CompleteAsync() ?? Task.CompletedTask);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -277,10 +275,9 @@ private void Abort(ConnectionAbortedException ex)
}
}

private async Task<Stream> ApplyConnectionAdaptersAsync()
private async Task<Stream> ApplyConnectionAdaptersAsync(RawStream stream)
{
var connectionAdapters = _context.ConnectionAdapters;
var stream = new RawStream(_context.Transport.Input, _context.Transport.Output);
var adapterContext = new ConnectionAdapterContext(_context.ConnectionContext, stream);
_adaptedConnections = new List<IAdaptedConnection>(connectionAdapters.Count);

Expand Down Expand Up @@ -367,12 +364,7 @@ private void Tick()

private void CloseUninitializedConnection(ConnectionAbortedException abortReason)
{
Debug.Assert(_adaptedTransport != null);

_context.ConnectionContext.Abort(abortReason);

_adaptedTransport.Input.Complete();
_adaptedTransport.Output.Complete();
}

public void OnTimeout(TimeoutReason reason)
Expand Down
7 changes: 4 additions & 3 deletions src/Servers/Kestrel/Core/test/OutputProducerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ public async Task WritesNoopAfterConnectionCloses()

await socketOutput.WriteDataAsync(new byte[] { 1, 2, 3, 4 }, default);

Assert.True(socketOutput.Pipe.Reader.TryRead(out var result));
Assert.True(result.IsCompleted);
Assert.True(result.Buffer.IsEmpty);
Assert.False(socketOutput.Pipe.Reader.TryRead(out var result));

socketOutput.Pipe.Writer.Complete();
socketOutput.Pipe.Reader.Complete();
}
}

Expand Down
41 changes: 27 additions & 14 deletions src/Servers/Kestrel/Transport.Libuv/src/Internal/LibuvConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ internal partial class LibuvConnection : TransportConnection

private MemoryHandle _bufferHandle;
private Task _processingTask;
private readonly TaskCompletionSource<object> _waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
private bool _connectionClosed;

public LibuvConnection(UvStreamHandle socket,
ILibuvTrace log,
Expand Down Expand Up @@ -135,21 +137,10 @@ private async Task StartCore()
// We're done with the socket now
_socket.Dispose();

// Fire the connection closed token and wait for it to complete
var waitForConnectionClosedTcs = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
// Ensure this always fires
FireConnectionClosed();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're going to continue firing ConnectionClosed for libuv output errors, we should do the same for the socket transport.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean? Aren't we doing that? It always fires no?

Copy link
Member

@halter73 halter73 Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm now thinking about making this more like the socket transport instead. The important thing is consistency.

Instead of calling FireConnectionClosed here, just don't dispose _socket here. Instead, close the uv_stream_handle only after both the output loop completed and

a) you've received a terminal read status or
b) the connection was aborted

In the case of b) you'd have to make sure to still call FireConnectionClosed when you close the socket ofc.


ThreadPool.UnsafeQueueUserWorkItem(state =>
{
(var connection, var tcs) = state;

connection.CancelConnectionClosedToken();

tcs.TrySetResult(null);
},
(this, waitForConnectionClosedTcs),
preferLocal: false);

await waitForConnectionClosedTcs.Task;
await _waitForConnectionClosedTcs.Task;
}
}
catch (Exception e)
Expand Down Expand Up @@ -241,11 +232,33 @@ private void OnRead(UvStreamHandle handle, int status)
error = LogAndWrapReadError(uvError);
}

FireConnectionClosed();

// Complete after aborting the connection
Input.Complete(error);
}
}

private void FireConnectionClosed()
{
// Guard against scheduling this multiple times
if (_connectionClosed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you're taking advantage of the single-threaded nature of libuv's event loop 👍

{
return;
}

_connectionClosed = true;

ThreadPool.UnsafeQueueUserWorkItem(state =>
{
state.CancelConnectionClosedToken();

state._waitForConnectionClosedTcs.TrySetResult(null);
},
this,
preferLocal: false);
}

private async Task ApplyBackpressureAsync(ValueTask<FlushResult> flushTask)
{
Log.ConnectionPause(ConnectionId);
Expand Down
Loading