From 73e022b35a1b47310e4a24ee3a5125183b7434b2 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sun, 4 Jul 2021 14:33:58 +1200 Subject: [PATCH 01/10] HTTP/3: Pool QuicStreamContext instances --- .../src/Internal/ISystemClock.cs | 23 ++ .../src/Internal/QuicConnectionContext.cs | 67 +++++- .../src/Internal/QuicStreamContext.cs | 207 ++++++++++++++++-- .../src/Internal/QuicStreamStack.cs | 144 ++++++++++++ .../src/QuicTransportOptions.cs | 4 +- .../test/QuicStreamContextTests.cs | 49 ++++- 6 files changed, 461 insertions(+), 33 deletions(-) create mode 100644 src/Servers/Kestrel/Transport.Quic/src/Internal/ISystemClock.cs create mode 100644 src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamStack.cs diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/ISystemClock.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/ISystemClock.cs new file mode 100644 index 000000000000..6b3904990391 --- /dev/null +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/ISystemClock.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal +{ + /// + /// Abstracts the system clock to facilitate testing. + /// + internal interface ISystemClock + { + /// + /// Retrieves the current system time in UTC. + /// + DateTimeOffset UtcNow { get; } + } + + internal class SystemClock : ISystemClock + { + public DateTimeOffset UtcNow => DateTimeOffset.UtcNow; + } +} diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs index 99b4d13a99f3..eaecf48dfe46 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Diagnostics; using System.Net.Quic; using System.Threading; using System.Threading.Tasks; @@ -14,6 +15,13 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal { internal class QuicConnectionContext : TransportMultiplexedConnection, IProtocolErrorCodeFeature { + // Internal for testing. + internal QuicStreamStack StreamPool; + + private bool _streamPoolHeartbeatInitialized; + private long _currentTicks; + private readonly object _poolLock = new object(); + private readonly QuicConnection _connection; private readonly QuicTransportContext _context; private readonly IQuicTrace _log; @@ -23,6 +31,10 @@ internal class QuicConnectionContext : TransportMultiplexedConnection, IProtocol public long Error { get; set; } + internal const int InitialStreamPoolSize = 5; + internal const int MaxStreamPoolSize = 100; + internal const long StreamPoolExpiryTicks = TimeSpan.TicksPerSecond * 5; + public QuicConnectionContext(QuicConnection connection, QuicTransportContext context) { _log = context.Log; @@ -31,6 +43,8 @@ public QuicConnectionContext(QuicConnection connection, QuicTransportContext con ConnectionClosed = _connectionClosedTokenSource.Token; Features.Set(new FakeTlsConnectionFeature()); Features.Set(this); + + StreamPool = new QuicStreamStack(InitialStreamPoolSize); } public override async ValueTask DisposeAsync() @@ -62,7 +76,19 @@ public override void Abort(ConnectionAbortedException abortReason) try { var stream = await _connection.AcceptStreamAsync(cancellationToken); - var context = new QuicStreamContext(stream, this, _context); + + QuicStreamContext? context; + + lock (_poolLock) + { + StreamPool.TryPop(out context); + } + if (context == null) + { + context = new QuicStreamContext(this, _context); + } + + context.Initialize(stream); context.Start(); _log.AcceptedStream(context); @@ -124,12 +150,49 @@ public override ValueTask ConnectAsync(IFeatureCollection? fe quicStream = _connection.OpenBidirectionalStream(); } - var context = new QuicStreamContext(quicStream, this, _context); + // TODO - pool connect streams? + QuicStreamContext? context = new QuicStreamContext(this, _context); + context.Initialize(quicStream); context.Start(); _log.ConnectedStream(context); return new ValueTask(context); } + + internal void ReturnStream(QuicStreamContext stream) + { + lock (_poolLock) + { + if (!_streamPoolHeartbeatInitialized) + { + // Heartbeat feature is added to connection features by Kestrel. + var heartbeatFeature = Features.Get(); + if (heartbeatFeature != null) + { + heartbeatFeature.OnHeartbeat(state => ((QuicConnectionContext)state).RemoveExpiredStreams(), this); + } + + var now = _context.Options.SystemClock.UtcNow.Ticks; + Volatile.Write(ref _currentTicks, now); + + _streamPoolHeartbeatInitialized = true; + } + + stream.PoolExpirationTicks = Volatile.Read(ref _currentTicks) + StreamPoolExpiryTicks; + StreamPool.Push(stream); + } + } + + private void RemoveExpiredStreams() + { + lock (_poolLock) + { + var now = _context.Options.SystemClock.UtcNow.Ticks; + Volatile.Write(ref _currentTicks, now); + + StreamPool.RemoveExpired(now); + } + } } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index 076965e44a87..7d813a1445ce 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -3,6 +3,7 @@ using System; using System.Buffers; +using System.IO; using System.IO.Pipelines; using System.Net.Quic; using System.Threading; @@ -19,30 +20,32 @@ internal class QuicStreamContext : TransportConnection, IStreamDirectionFeature, // Internal for testing. internal Task _processingTask = Task.CompletedTask; - private readonly QuicStream _stream; + private QuicStream _stream = default!; private readonly QuicConnectionContext _connection; private readonly QuicTransportContext _context; + private readonly Pipe _inputPipe; + private readonly Pipe _outputPipe; private readonly IDuplexPipe _originalTransport; - private readonly CancellationTokenSource _streamClosedTokenSource = new CancellationTokenSource(); + private readonly IDuplexPipe _originalApplication; + private readonly CompletionPipeReader _transportPipeReader; + private readonly CompletionPipeWriter _transportPipeWriter; private readonly IQuicTrace _log; + private CancellationTokenSource _streamClosedTokenSource = default!; private string? _connectionId; private const int MinAllocBufferSize = 4096; private volatile Exception? _shutdownReason; private bool _streamClosed; private bool _aborted; - private readonly TaskCompletionSource _waitForConnectionClosedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private TaskCompletionSource _waitForConnectionClosedTcs = default!; private readonly object _shutdownLock = new object(); - public QuicStreamContext(QuicStream stream, QuicConnectionContext connection, QuicTransportContext context) + public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext context) { - _stream = stream; _connection = connection; _context = context; _log = context.Log; MemoryPool = connection.MemoryPool; - ConnectionClosed = _streamClosedTokenSource.Token; - var maxReadBufferSize = context.Options.MaxReadBufferSize ?? 0; var maxWriteBufferSize = context.Options.MaxWriteBufferSize ?? 0; @@ -50,30 +53,67 @@ public QuicStreamContext(QuicStream stream, QuicConnectionContext connection, Qu var inputOptions = new PipeOptions(MemoryPool, PipeScheduler.ThreadPool, PipeScheduler.Inline, maxReadBufferSize, maxReadBufferSize / 2, useSynchronizationContext: false); var outputOptions = new PipeOptions(MemoryPool, PipeScheduler.Inline, PipeScheduler.ThreadPool, maxWriteBufferSize, maxWriteBufferSize / 2, useSynchronizationContext: false); - var pair = DuplexPipe.CreateConnectionPair(inputOptions, outputOptions); + _inputPipe = new Pipe(inputOptions); + _outputPipe = new Pipe(outputOptions); + + _transportPipeReader = new CompletionPipeReader(_inputPipe.Reader); + _transportPipeWriter = new CompletionPipeWriter(_outputPipe.Writer); + + _originalApplication = new DuplexPipe(_outputPipe.Reader, _inputPipe.Writer); + _originalTransport = new DuplexPipe(_transportPipeReader, _transportPipeWriter); + } + + public override MemoryPool MemoryPool { get; } + private PipeWriter Input => Application.Output; + private PipeReader Output => Application.Input; + + public bool CanRead { get; private set; } + public bool CanWrite { get; private set; } + + public long StreamId => _stream.StreamId; + public bool CanReuse { get; private set; } + + public void Initialize(QuicStream stream) + { + _stream = stream; + + if (!(_streamClosedTokenSource?.TryReset() ?? false)) + { + _streamClosedTokenSource = new CancellationTokenSource(); + } + ConnectionClosed = _streamClosedTokenSource.Token; Features.Set(this); Features.Set(this); Features.Set(this); // TODO populate the ITlsConnectionFeature (requires client certs). Features.Set(new FakeTlsConnectionFeature()); - CanRead = stream.CanRead; - CanWrite = stream.CanWrite; + CanRead = _stream.CanRead; + CanWrite = _stream.CanWrite; + Error = 0; + PoolExpirationTicks = 0; + + Transport = _originalTransport; + Application = _originalApplication; + + _connectionId = null; + _shutdownReason = null; + _streamClosed = false; + _aborted = false; + // TODO - resetable TCS + _waitForConnectionClosedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + // Only reset pipes if the stream has been reused. + if (CanReuse) + { + _inputPipe.Reset(); + _outputPipe.Reset(); + } - Transport = _originalTransport = pair.Transport; - Application = pair.Application; + CanReuse = false; } - public override MemoryPool MemoryPool { get; } - public PipeWriter Input => Application.Output; - public PipeReader Output => Application.Input; - - public bool CanRead { get; } - public bool CanWrite { get; } - - public long StreamId => _stream.StreamId; - public override string ConnectionId { get => _connectionId ??= $"{_connection.ConnectionId}:{StreamId}"; @@ -82,6 +122,8 @@ public override string ConnectionId public long Error { get; set; } + public long PoolExpirationTicks { get; set; } + public void Start() { _processingTask = StartAsync(); @@ -109,6 +151,13 @@ private async Task StartAsync() // Now wait for both to complete await receiveTask; await sendTask; + + CanReuse = _transportPipeReader.IsComplete && _transportPipeReader.CompleteException == null + && _transportPipeWriter.IsComplete && _transportPipeWriter.CompleteException == null; + if (CanReuse) + { + _connection.ReturnStream(this); + } } catch (Exception ex) { @@ -341,9 +390,123 @@ public override async ValueTask DisposeAsync() await _processingTask; - _stream.Dispose(); + DisposeCore(); _streamClosedTokenSource.Dispose(); } + + public void DisposeCore() + { + _stream.Dispose(); + } + + private sealed class CompletionPipeWriter : PipeWriter + { + private readonly PipeWriter _inner; + + public bool IsComplete { get; private set; } + public Exception? CompleteException { get; private set; } + + public CompletionPipeWriter(PipeWriter inner) + { + _inner = inner; + } + + public override void Advance(int bytes) + { + _inner.Advance(bytes); + } + + public override void CancelPendingFlush() + { + _inner.CancelPendingFlush(); + } + + public override void Complete(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override ValueTask CompleteAsync(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return _inner.WriteAsync(source, cancellationToken); + } + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + return _inner.FlushAsync(cancellationToken); + } + + public override Memory GetMemory(int sizeHint = 0) + { + return _inner.GetMemory(sizeHint); + } + + public override Span GetSpan(int sizeHint = 0) + { + return _inner.GetSpan(sizeHint); + } + } + + private sealed class CompletionPipeReader : PipeReader + { + private readonly PipeReader _inner; + + public bool IsComplete { get; private set; } + public Exception? CompleteException { get; private set; } + + public CompletionPipeReader(PipeReader inner) + { + _inner = inner; + } + + public override void AdvanceTo(SequencePosition consumed) + { + _inner.AdvanceTo(consumed); + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + _inner.AdvanceTo(consumed, examined); + } + + public override ValueTask CompleteAsync(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override void Complete(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override void CancelPendingRead() + { + _inner.CancelPendingRead(); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + return _inner.ReadAsync(cancellationToken); + } + + public override bool TryRead(out ReadResult result) + { + return _inner.TryRead(out result); + } + } } } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamStack.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamStack.cs new file mode 100644 index 000000000000..917270346e5e --- /dev/null +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamStack.cs @@ -0,0 +1,144 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.CompilerServices; +using Microsoft.Extensions.Hosting; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal +{ + // See https://github.com/dotnet/runtime/blob/master/src/libraries/System.IO.Pipelines/src/System/IO/Pipelines/BufferSegmentStack.cs + internal struct QuicStreamStack + { + // Internal for testing + internal QuicStreamAsValueType[] _array; + private int _size; + + public QuicStreamStack(int size) + { + _array = new QuicStreamAsValueType[size]; + _size = 0; + } + + public int Count => _size; + + public bool TryPop([NotNullWhen(true)] out QuicStreamContext? result) + { + int size = _size - 1; + QuicStreamAsValueType[] array = _array; + + if ((uint)size >= (uint)array.Length) + { + result = default; + return false; + } + + _size = size; + result = array[size]; + array[size] = default; + return true; + } + + public bool TryPeek([NotNullWhen(true)] out QuicStreamContext? result) + { + int size = _size - 1; + QuicStreamAsValueType[] array = _array; + + if ((uint)size >= (uint)array.Length) + { + result = default; + return false; + } + + result = array[size]; + return true; + } + + // Pushes an item to the top of the stack. + public void Push(QuicStreamContext item) + { + int size = _size; + QuicStreamAsValueType[] array = _array; + + if ((uint)size < (uint)array.Length) + { + array[size] = item; + _size = size + 1; + } + else + { + PushWithResize(item); + } + } + + // Non-inline from Stack.Push to improve its code quality as uncommon path + [MethodImpl(MethodImplOptions.NoInlining)] + private void PushWithResize(QuicStreamContext item) + { + Array.Resize(ref _array, 2 * _array.Length); + _array[_size] = item; + _size++; + } + + public void RemoveExpired(long now) + { + int size = _size; + QuicStreamAsValueType[] array = _array; + + var removeCount = CalculateRemoveCount(now, size, array); + if (removeCount == 0) + { + return; + } + + var newSize = size - removeCount; + + // Dispose removed streams + for (var i = 0; i < removeCount; i++) + { + QuicStreamContext stream = array[i]; + stream.DisposeCore(); + } + + // Move remaining streams + for (var i = 0; i < newSize; i++) + { + array[i] = array[i + removeCount]; + } + + // Clear unused array indexes + for (var i = newSize; i < size; i++) + { + array[i] = default; + } + + _size = newSize; + } + + private static int CalculateRemoveCount(long now, int size, QuicStreamAsValueType[] array) + { + for (var i = 0; i < size; i++) + { + QuicStreamContext stream = array[i]; + if (stream.PoolExpirationTicks >= now) + { + // Stream is still valid. All streams after this will have a later expiration. + // No reason to keep checking. Return count of streams to remove. + return i; + } + } + + // All will be removed. + return size; + } + + internal readonly struct QuicStreamAsValueType + { + private readonly QuicStreamContext _value; + private QuicStreamAsValueType(QuicStreamContext value) => _value = value; + public static implicit operator QuicStreamAsValueType(QuicStreamContext s) => new QuicStreamAsValueType(s); + public static implicit operator QuicStreamContext(QuicStreamAsValueType s) => s._value; + } + } +} diff --git a/src/Servers/Kestrel/Transport.Quic/src/QuicTransportOptions.cs b/src/Servers/Kestrel/Transport.Quic/src/QuicTransportOptions.cs index 86a3003e5486..bc19b3753fe3 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/QuicTransportOptions.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/QuicTransportOptions.cs @@ -4,6 +4,7 @@ using System; using System.Buffers; using System.Security.Cryptography.X509Certificates; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic { @@ -42,7 +43,6 @@ public class QuicTransportOptions /// public long? MaxWriteBufferSize { get; set; } = 64 * 1024; - internal Func> MemoryPoolFactory { get; set; } = System.Buffers.PinnedBlockMemoryPoolFactory.Create; - + internal ISystemClock SystemClock = new SystemClock(); } } diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index caa8e22861ce..27842ec4d21a 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -31,22 +31,57 @@ public async Task BidirectionalStream_ServerReadsDataAndCompletes_GracefullyClos await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); - using var quicConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); - await quicConnection.ConnectAsync().DefaultTimeout(); + using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await clientConnection.ConnectAsync().DefaultTimeout(); await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); // Act - await using var clientStream = quicConnection.OpenBidirectionalStream(); - await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); + await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); - await using var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); + Assert.Contains(LogMessages, m => m.Message.Contains("send loop completed gracefully")); + + var quicConnectionContext = Assert.IsType(serverConnection); + + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + } + + [ConditionalFact] + [MsQuicSupported] + public async Task BidirectionalStream_MultipleStreamsOnConnection_ReusedFromPool() + { + // Arrange + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); + + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); + using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await clientConnection.ConnectAsync().DefaultTimeout(); + + await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); + + // Act + var stream1 = await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + var stream2 = await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + + Assert.Same(stream1, stream2); + + var quicConnectionContext = Assert.IsType(serverConnection); + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + } + + private static async Task CreateAndCompleteBidirectionalStreamGracefully(QuicConnection quicConnection, MultiplexedConnectionContext serverConnection) + { + var clientStream = quicConnection.OpenBidirectionalStream(); + await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); + var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); // Input should be completed. readResult = await serverStream.Transport.Input.ReadAsync(); - // Complete output. + + // Complete reading and writing. + await serverStream.Transport.Input.CompleteAsync(); await serverStream.Transport.Output.CompleteAsync(); // Assert @@ -59,7 +94,7 @@ public async Task BidirectionalStream_ServerReadsDataAndCompletes_GracefullyClos Assert.True(quicStreamContext.CanWrite); Assert.True(quicStreamContext.CanRead); - Assert.Contains(LogMessages, m => m.Message.Contains("send loop completed gracefully")); + return quicStreamContext; } [ConditionalFact] From 27af4ffed2b54ab161f58d9fc0a90fa616ea1e40 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sun, 4 Jul 2021 15:53:22 +1200 Subject: [PATCH 02/10] Update --- .../src/Internal/QuicConnectionContext.cs | 12 ++++++--- .../src/Internal/QuicStreamContext.cs | 26 ++++++++++++------- .../test/QuicStreamContextTests.cs | 4 ++- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs index eaecf48dfe46..0d5ab1bc8151 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs @@ -77,12 +77,18 @@ public override void Abort(ConnectionAbortedException abortReason) { var stream = await _connection.AcceptStreamAsync(cancellationToken); - QuicStreamContext? context; + QuicStreamContext? context = null; - lock (_poolLock) + // Only use pool for bidirectional streams. Just a handful of unidirecitonal + // streams are created for a connection and they live for the lifetime of the connection. + if (stream.CanRead && stream.CanWrite) { - StreamPool.TryPop(out context); + lock (_poolLock) + { + StreamPool.TryPop(out context); + } } + if (context == null) { context = new QuicStreamContext(this, _context); diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index 7d813a1445ce..1ea70ce16b09 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -3,6 +3,7 @@ using System; using System.Buffers; +using System.Diagnostics; using System.IO; using System.IO.Pipelines; using System.Net.Quic; @@ -75,6 +76,8 @@ public QuicStreamContext(QuicConnectionContext connection, QuicTransportContext public void Initialize(QuicStream stream) { + Debug.Assert(_stream == null); + _stream = stream; if (!(_streamClosedTokenSource?.TryReset() ?? false)) @@ -152,12 +155,9 @@ private async Task StartAsync() await receiveTask; await sendTask; - CanReuse = _transportPipeReader.IsComplete && _transportPipeReader.CompleteException == null + CanReuse = _stream.CanRead && _stream.CanWrite + && _transportPipeReader.IsComplete && _transportPipeReader.CompleteException == null && _transportPipeWriter.IsComplete && _transportPipeWriter.CompleteException == null; - if (CanReuse) - { - _connection.ReturnStream(this); - } } catch (Exception ex) { @@ -390,14 +390,22 @@ public override async ValueTask DisposeAsync() await _processingTask; - DisposeCore(); + _stream.Dispose(); + _stream = null!; - _streamClosedTokenSource.Dispose(); + if (CanReuse) + { + _connection.ReturnStream(this); + } + else + { + DisposeCore(); + } } - public void DisposeCore() + internal void DisposeCore() { - _stream.Dispose(); + _streamClosedTokenSource.Dispose(); } private sealed class CompletionPipeWriter : PipeWriter diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index 27842ec4d21a..61c9d346c71b 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -94,6 +94,8 @@ private static async Task CreateAndCompleteBidirectionalStrea Assert.True(quicStreamContext.CanWrite); Assert.True(quicStreamContext.CanRead); + await quicStreamContext.DisposeAsync(); + return quicStreamContext; } @@ -162,7 +164,7 @@ public async Task ClientToServerUnidirectionalStream_ServerReadsData_GracefullyC serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); // Input should be completed. - readResult = await serverStream.Transport.Input.ReadAsync(); + readResult = await serverStream.Transport.Input.ReadAsync().DefaultTimeout(); // Assert Assert.True(readResult.IsCompleted); From 1d9a4154bdaa5d812dfa0b6a2f6009fb7b819446 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 6 Jul 2021 12:54:05 +1200 Subject: [PATCH 03/10] Tests --- .../src/Internal/QuicConnectionContext.cs | 19 +++-- .../test/QuicConnectionContextTests.cs | 82 +++++++++++++++++++ .../test/QuicStreamContextTests.cs | 37 +-------- .../Transport.Quic/test/QuicTestHelpers.cs | 41 +++++++++- 4 files changed, 137 insertions(+), 42 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs index 0d5ab1bc8151..0585ee7390e6 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs @@ -19,7 +19,8 @@ internal class QuicConnectionContext : TransportMultiplexedConnection, IProtocol internal QuicStreamStack StreamPool; private bool _streamPoolHeartbeatInitialized; - private long _currentTicks; + // Ticks updated once per-second in heartbeat event. + private long _heartbeatTicks; private readonly object _poolLock = new object(); private readonly QuicConnection _connection; @@ -156,7 +157,8 @@ public override ValueTask ConnectAsync(IFeatureCollection? fe quicStream = _connection.OpenBidirectionalStream(); } - // TODO - pool connect streams? + // Only a handful of control streams are created by the server and they last for the + // lifetime of the connection. No value in pooling them. QuicStreamContext? context = new QuicStreamContext(this, _context); context.Initialize(quicStream); context.Start(); @@ -173,19 +175,23 @@ internal void ReturnStream(QuicStreamContext stream) if (!_streamPoolHeartbeatInitialized) { // Heartbeat feature is added to connection features by Kestrel. + // No event is on the context is raised between feature being added and serving + // connections so initialize heartbeat the first time a stream is added to + // the connection's stream pool. var heartbeatFeature = Features.Get(); if (heartbeatFeature != null) { - heartbeatFeature.OnHeartbeat(state => ((QuicConnectionContext)state).RemoveExpiredStreams(), this); + heartbeatFeature.OnHeartbeat(static state => ((QuicConnectionContext)state).RemoveExpiredStreams(), this); } + // Set ticks for the first time. Ticks are then updated in heartbeat. var now = _context.Options.SystemClock.UtcNow.Ticks; - Volatile.Write(ref _currentTicks, now); + Volatile.Write(ref _heartbeatTicks, now); _streamPoolHeartbeatInitialized = true; } - stream.PoolExpirationTicks = Volatile.Read(ref _currentTicks) + StreamPoolExpiryTicks; + stream.PoolExpirationTicks = Volatile.Read(ref _heartbeatTicks) + StreamPoolExpiryTicks; StreamPool.Push(stream); } } @@ -194,8 +200,9 @@ private void RemoveExpiredStreams() { lock (_poolLock) { + // Update ticks on heartbeat. A precise value isn't necessary. var now = _context.Options.SystemClock.UtcNow.Ticks; - Volatile.Write(ref _currentTicks, now); + Volatile.Write(ref _heartbeatTicks, now); StreamPool.RemoveExpired(now); } diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs index 75cf81898414..97dd532a547d 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs @@ -7,7 +7,9 @@ using System.Net.Quic; using System.Text; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests; +using Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal; using Microsoft.AspNetCore.Testing; using Xunit; @@ -163,5 +165,85 @@ public async Task AcceptAsync_ServerStartsAndStopsUnidirectionStream_ClientAccep readCount = await clientStream.ReadAsync(buffer).DefaultTimeout(); Assert.Equal(0, readCount); } + + [ConditionalFact] + [MsQuicSupported] + public async Task StreamPool_Heartbeat_ExpiredStreamRemoved() + { + // Arrange + var now = new DateTimeOffset(2021, 7, 6, 12, 0, 0, TimeSpan.Zero); + var testSystemClock = new TestSystemClock { UtcNow = now }; + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory, testSystemClock); + + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); + using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await clientConnection.ConnectAsync().DefaultTimeout(); + + await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); + + var testHeartbeatFeature = new TestHeartbeatFeature(); + serverConnection.Features.Set(testHeartbeatFeature); + + // Act & Assert + var quicConnectionContext = Assert.IsType(serverConnection); + Assert.Equal(0, quicConnectionContext.StreamPool.Count); + + var stream1 = await QuicTestHelpers.CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + QuicStreamContext pooledStream = quicConnectionContext.StreamPool._array[0]; + Assert.Same(stream1, pooledStream); + Assert.Equal(now.Ticks + QuicConnectionContext.StreamPoolExpiryTicks, pooledStream.PoolExpirationTicks); + + now = now.AddMilliseconds(100); + testSystemClock.UtcNow = now; + testHeartbeatFeature.RaiseHeartbeat(); + // Not removed. + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + + var stream2 = await QuicTestHelpers.CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + pooledStream = quicConnectionContext.StreamPool._array[0]; + Assert.Same(stream1, pooledStream); + Assert.Equal(now.Ticks + QuicConnectionContext.StreamPoolExpiryTicks, pooledStream.PoolExpirationTicks); + + Assert.Same(stream1, stream2); + + now = now.AddTicks(QuicConnectionContext.StreamPoolExpiryTicks); + testSystemClock.UtcNow = now; + testHeartbeatFeature.RaiseHeartbeat(); + // Not removed. + Assert.Equal(1, quicConnectionContext.StreamPool.Count); + + now = now.AddTicks(1); + testSystemClock.UtcNow = now; + testHeartbeatFeature.RaiseHeartbeat(); + // Removed. + Assert.Equal(0, quicConnectionContext.StreamPool.Count); + } + + private class TestSystemClock : ISystemClock + { + public DateTimeOffset UtcNow { get; set; } + } + + private class TestHeartbeatFeature : IConnectionHeartbeatFeature + { + private readonly List<(Action Action, object State)> _actions = new List<(Action, object)>(); + + public void OnHeartbeat(Action action, object state) + { + _actions.Add((action, state)); + } + + public void RaiseHeartbeat() + { + foreach (var a in _actions) + { + a.Action(a.State); + } + } + } } } diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index 61c9d346c71b..5d0dc0f83aa3 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -37,7 +37,7 @@ public async Task BidirectionalStream_ServerReadsDataAndCompletes_GracefullyClos await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); // Act - await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + await QuicTestHelpers.CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); Assert.Contains(LogMessages, m => m.Message.Contains("send loop completed gracefully")); @@ -60,8 +60,8 @@ public async Task BidirectionalStream_MultipleStreamsOnConnection_ReusedFromPool await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); // Act - var stream1 = await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); - var stream2 = await CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + var stream1 = await QuicTestHelpers.CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); + var stream2 = await QuicTestHelpers.CreateAndCompleteBidirectionalStreamGracefully(clientConnection, serverConnection); Assert.Same(stream1, stream2); @@ -69,36 +69,6 @@ public async Task BidirectionalStream_MultipleStreamsOnConnection_ReusedFromPool Assert.Equal(1, quicConnectionContext.StreamPool.Count); } - private static async Task CreateAndCompleteBidirectionalStreamGracefully(QuicConnection quicConnection, MultiplexedConnectionContext serverConnection) - { - var clientStream = quicConnection.OpenBidirectionalStream(); - await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); - var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); - var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); - serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); - - // Input should be completed. - readResult = await serverStream.Transport.Input.ReadAsync(); - - // Complete reading and writing. - await serverStream.Transport.Input.CompleteAsync(); - await serverStream.Transport.Output.CompleteAsync(); - - // Assert - Assert.True(readResult.IsCompleted); - - var quicStreamContext = Assert.IsType(serverStream); - - // Both send and receive loops have exited. - await quicStreamContext._processingTask.DefaultTimeout(); - Assert.True(quicStreamContext.CanWrite); - Assert.True(quicStreamContext.CanRead); - - await quicStreamContext.DisposeAsync(); - - return quicStreamContext; - } - [ConditionalFact] [MsQuicSupported] public async Task BidirectionalStream_ClientAbortWrite_ServerReceivesAbort() @@ -304,6 +274,7 @@ public async Task ServerToClientUnidirectionalStream_ServerAborts_ClientGetsAbor serverStream.Abort(new ConnectionAbortedException("Test message")); // TODO - client isn't getting abort? + // https://github.com/dotnet/runtime/issues/55056 readCount = await clientStream.ReadAsync(buffer).DefaultTimeout(); // Assert diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs index d36b69ae19d0..025bd3bda120 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs @@ -7,8 +7,10 @@ using System.Net.Quic; using System.Net.Security; using System.Security.Cryptography.X509Certificates; +using System.Text; using System.Threading; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Http.Features; using Microsoft.AspNetCore.Server.Kestrel.Https; using Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal; @@ -16,25 +18,28 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; +using Xunit; namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Tests { internal static class QuicTestHelpers { private const string Alpn = "h3-29"; + private static readonly byte[] TestData = Encoding.UTF8.GetBytes("Hello world"); - public static QuicTransportFactory CreateTransportFactory(ILoggerFactory loggerFactory = null) + public static QuicTransportFactory CreateTransportFactory(ILoggerFactory loggerFactory = null, ISystemClock systemClock = null) { var quicTransportOptions = new QuicTransportOptions(); quicTransportOptions.Alpn = Alpn; quicTransportOptions.IdleTimeout = TimeSpan.FromMinutes(1); + quicTransportOptions.SystemClock = systemClock; return new QuicTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(quicTransportOptions)); } - public static async Task CreateConnectionListenerFactory(ILoggerFactory loggerFactory = null) + public static async Task CreateConnectionListenerFactory(ILoggerFactory loggerFactory = null, ISystemClock systemClock = null) { - var transportFactory = CreateTransportFactory(loggerFactory); + var transportFactory = CreateTransportFactory(loggerFactory, systemClock); // Use ephemeral port 0. OS will assign unused port. var endpoint = new IPEndPoint(IPAddress.Loopback, 0); @@ -79,5 +84,35 @@ public static QuicClientConnectionOptions CreateClientConnectionOptions(EndPoint } }; } + + public static async Task CreateAndCompleteBidirectionalStreamGracefully(QuicConnection quicConnection, MultiplexedConnectionContext serverConnection) + { + var clientStream = quicConnection.OpenBidirectionalStream(); + await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); + var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); + var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); + serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); + + // Input should be completed. + readResult = await serverStream.Transport.Input.ReadAsync(); + + // Complete reading and writing. + await serverStream.Transport.Input.CompleteAsync(); + await serverStream.Transport.Output.CompleteAsync(); + + // Assert + Assert.True(readResult.IsCompleted); + + var quicStreamContext = Assert.IsType(serverStream); + + // Both send and receive loops have exited. + await quicStreamContext._processingTask.DefaultTimeout(); + Assert.True(quicStreamContext.CanWrite); + Assert.True(quicStreamContext.CanRead); + + await quicStreamContext.DisposeAsync(); + + return quicStreamContext; + } } } From f92fe8669e04ceeaa1c736aeed5a7beb48fe5ee3 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Tue, 6 Jul 2021 13:00:02 +1200 Subject: [PATCH 04/10] Fix --- src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs index 025bd3bda120..ec3c08f3d0f4 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs @@ -32,7 +32,10 @@ public static QuicTransportFactory CreateTransportFactory(ILoggerFactory loggerF var quicTransportOptions = new QuicTransportOptions(); quicTransportOptions.Alpn = Alpn; quicTransportOptions.IdleTimeout = TimeSpan.FromMinutes(1); - quicTransportOptions.SystemClock = systemClock; + if (systemClock != null) + { + quicTransportOptions.SystemClock = systemClock; + } return new QuicTransportFactory(loggerFactory ?? NullLoggerFactory.Instance, Options.Create(quicTransportOptions)); } From 75ef80d4621c631f7da20633ab2ff99ddacf47b9 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Wed, 7 Jul 2021 16:16:23 +1200 Subject: [PATCH 05/10] Limit pool size and unit test --- .../src/Internal/QuicConnectionContext.cs | 12 ++- .../src/Internal/QuicConnectionListener.cs | 2 + .../src/Internal/QuicStreamContext.cs | 10 +-- .../test/QuicConnectionContextTests.cs | 89 +++++++++++++++++++ .../Transport.Quic/test/QuicTestHelpers.cs | 10 +-- 5 files changed, 110 insertions(+), 13 deletions(-) diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs index 0585ee7390e6..ad5174892e9b 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs @@ -168,7 +168,7 @@ public override ValueTask ConnectAsync(IFeatureCollection? fe return new ValueTask(context); } - internal void ReturnStream(QuicStreamContext stream) + internal bool TryReturnStream(QuicStreamContext stream) { lock (_poolLock) { @@ -191,9 +191,15 @@ internal void ReturnStream(QuicStreamContext stream) _streamPoolHeartbeatInitialized = true; } - stream.PoolExpirationTicks = Volatile.Read(ref _heartbeatTicks) + StreamPoolExpiryTicks; - StreamPool.Push(stream); + if (stream.CanReuse && StreamPool.Count < MaxStreamPoolSize) + { + stream.PoolExpirationTicks = Volatile.Read(ref _heartbeatTicks) + StreamPoolExpiryTicks; + StreamPool.Push(stream); + return true; + } } + + return false; } private void RemoveExpiredStreams() diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionListener.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionListener.cs index b90cad6ad69e..d1afa552617c 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionListener.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionListener.cs @@ -41,6 +41,8 @@ public QuicConnectionListener(QuicTransportOptions options, IQuicTrace log, EndP quicListenerOptions.ServerAuthenticationOptions = sslServerAuthenticationOptions; quicListenerOptions.ListenEndPoint = endpoint as IPEndPoint; quicListenerOptions.IdleTimeout = options.IdleTimeout; + quicListenerOptions.MaxBidirectionalStreams = options.MaxBidirectionalStreamCount; + quicListenerOptions.MaxUnidirectionalStreams = options.MaxUnidirectionalStreamCount; _listener = new QuicListener(QuicImplementationProviders.MsQuic, quicListenerOptions); diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index 1ea70ce16b09..169b06e1f798 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -393,12 +393,12 @@ public override async ValueTask DisposeAsync() _stream.Dispose(); _stream = null!; - if (CanReuse) - { - _connection.ReturnStream(this); - } - else + if (!_connection.TryReturnStream(this)) { + // Dispose when one of: + // - Stream is not bidirection + // - Stream didn't complete gracefully + // - Pool is full DisposeCore(); } } diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs index 97dd532a547d..0b822ef147e2 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicConnectionContextTests.cs @@ -7,6 +7,7 @@ using System.Net.Quic; using System.Text; using System.Threading.Tasks; +using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Connections.Features; using Microsoft.AspNetCore.Server.Kestrel.FunctionalTests; using Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal; @@ -223,6 +224,94 @@ public async Task StreamPool_Heartbeat_ExpiredStreamRemoved() Assert.Equal(0, quicConnectionContext.StreamPool.Count); } + [ConditionalFact] + [MsQuicSupported] + public async Task StreamPool_ManyConcurrentStreams_StreamPoolFull() + { + // Arrange + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); + + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); + using var clientConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await clientConnection.ConnectAsync().DefaultTimeout(); + + await using var serverConnection = await connectionListener.AcceptAsync().DefaultTimeout(); + + var testHeartbeatFeature = new TestHeartbeatFeature(); + serverConnection.Features.Set(testHeartbeatFeature); + + // Act + var quicConnectionContext = Assert.IsType(serverConnection); + Assert.Equal(0, quicConnectionContext.StreamPool.Count); + + var pauseCompleteTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var allConnectionsOnServerTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var streamTasks = new List(); + var requestState = new RequestState(clientConnection, serverConnection, allConnectionsOnServerTcs, pauseCompleteTcs.Task); + + const int StreamsSent = 101; + for (var i = 0; i < StreamsSent; i++) + { + // TODO: Race condition in QUIC library. + // Delay between sending streams to avoid + // https://github.com/dotnet/runtime/issues/55249 + await Task.Delay(50); + streamTasks.Add(SendStream(requestState)); + } + + await allConnectionsOnServerTcs.Task.DefaultTimeout(); + pauseCompleteTcs.SetResult(); + + await Task.WhenAll(streamTasks).DefaultTimeout(); + + // Assert + // Up to 100 streams are pooled. + Assert.Equal(100, quicConnectionContext.StreamPool.Count); + + static async Task SendStream(RequestState requestState) + { + var clientStream = requestState.QuicConnection.OpenBidirectionalStream(); + await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); + var serverStream = await requestState.ServerConnection.AcceptAsync().DefaultTimeout(); + var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); + serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); + + // Input should be completed. + readResult = await serverStream.Transport.Input.ReadAsync(); + Assert.True(readResult.IsCompleted); + + lock (requestState) + { + requestState.ActiveConcurrentConnections++; + if (requestState.ActiveConcurrentConnections == StreamsSent) + { + requestState.AllConnectionsOnServerTcs.SetResult(); + } + } + + await requestState.PauseCompleteTask; + + // Complete reading and writing. + await serverStream.Transport.Input.CompleteAsync(); + await serverStream.Transport.Output.CompleteAsync(); + + var quicStreamContext = Assert.IsType(serverStream); + + // Both send and receive loops have exited. + await quicStreamContext._processingTask.DefaultTimeout(); + await quicStreamContext.DisposeAsync(); + } + } + + private record RequestState( + QuicConnection QuicConnection, + MultiplexedConnectionContext ServerConnection, + TaskCompletionSource AllConnectionsOnServerTcs, + Task PauseCompleteTask) + { + public int ActiveConcurrentConnections { get; set; } + }; + private class TestSystemClock : ISystemClock { public DateTimeOffset UtcNow { get; set; } diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs index ec3c08f3d0f4..29b1ca043aaf 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicTestHelpers.cs @@ -32,6 +32,8 @@ public static QuicTransportFactory CreateTransportFactory(ILoggerFactory loggerF var quicTransportOptions = new QuicTransportOptions(); quicTransportOptions.Alpn = Alpn; quicTransportOptions.IdleTimeout = TimeSpan.FromMinutes(1); + quicTransportOptions.MaxBidirectionalStreamCount = 200; + quicTransportOptions.MaxUnidirectionalStreamCount = 200; if (systemClock != null) { quicTransportOptions.SystemClock = systemClock; @@ -74,8 +76,8 @@ public static QuicClientConnectionOptions CreateClientConnectionOptions(EndPoint { return new QuicClientConnectionOptions { - MaxBidirectionalStreams = 10, - MaxUnidirectionalStreams = 20, + MaxBidirectionalStreams = 200, + MaxUnidirectionalStreams = 200, RemoteEndPoint = remoteEndPoint, ClientAuthenticationOptions = new SslClientAuthenticationOptions { @@ -98,14 +100,12 @@ public static async Task CreateAndCompleteBidirectionalStream // Input should be completed. readResult = await serverStream.Transport.Input.ReadAsync(); + Assert.True(readResult.IsCompleted); // Complete reading and writing. await serverStream.Transport.Input.CompleteAsync(); await serverStream.Transport.Output.CompleteAsync(); - // Assert - Assert.True(readResult.IsCompleted); - var quicStreamContext = Assert.IsType(serverStream); // Both send and receive loops have exited. From 4fbe9bec27ecd78266d25bd111221a4f75f54684 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Thu, 8 Jul 2021 14:28:43 +1200 Subject: [PATCH 06/10] Extract test stream context from test stream types --- .../src/Internal/Http3/Http3Connection.cs | 52 ++++++---- .../Core/src/Internal/Http3/Http3Stream.cs | 5 + .../Http3/ICachedHttp3StreamFeature.cs | 20 ++++ .../Http3/Http3ConnectionTests.cs | 4 +- .../Http3/Http3TestBase.cs | 94 ++++++++++--------- 5 files changed, 110 insertions(+), 65 deletions(-) create mode 100644 src/Servers/Kestrel/Core/src/Internal/Http3/ICachedHttp3StreamFeature.cs diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs index ac282e81011d..89b53af3ff91 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs @@ -257,26 +257,10 @@ public async Task ProcessRequestsAsync(IHttpApplication appl Debug.Assert(streamDirectionFeature != null); Debug.Assert(streamIdFeature != null); - var httpConnectionContext = new Http3StreamContext( - streamContext.ConnectionId, - protocols: default, - connectionContext: null!, // TODO connection context is null here. Should we set it to anything? - _context.ServiceContext, - streamContext.Features, - _context.MemoryPool, - streamContext.LocalEndPoint as IPEndPoint, - streamContext.RemoteEndPoint as IPEndPoint, - streamContext.Transport, - _streamLifetimeHandler, - streamContext, - _clientSettings, - _serverSettings); - httpConnectionContext.TimeoutControl = _context.TimeoutControl; - if (!streamDirectionFeature.CanWrite) { // Unidirectional stream - var stream = new Http3ControlStream(application, httpConnectionContext); + var stream = new Http3ControlStream(application, CreateHttpStreamContext(streamContext)); _streamLifetimeHandler.OnStreamCreated(stream); ThreadPool.UnsafeQueueUserWorkItem(stream, preferLocal: false); @@ -286,7 +270,19 @@ public async Task ProcessRequestsAsync(IHttpApplication appl // Request stream UpdateHighestStreamId(streamIdFeature.StreamId); - var stream = new Http3Stream(application, httpConnectionContext); + // Check whether there is an existing HTTP/3 stream on the transport stream. + // A stream will only be cached if the transport stream itself is reused. + var stream = streamContext.Features.Get>()?.CachedStream; + //if (stream == null) + { + stream = new Http3Stream(application, CreateHttpStreamContext(streamContext)); + streamContext.Features.Set>(new DefaultCachedHttp3StreamFeature(stream)); + } + //else + //{ + // stream.InitializeWithExistingContext(); + //} + _streamLifetimeHandler.OnStreamCreated(stream); KestrelEventSource.Log.RequestQueuedStart(stream, AspNetCore.Http.HttpProtocol.Http3); @@ -371,6 +367,26 @@ public async Task ProcessRequestsAsync(IHttpApplication appl } } + private Http3StreamContext CreateHttpStreamContext(ConnectionContext streamContext) + { + var httpConnectionContext = new Http3StreamContext( + streamContext.ConnectionId, + protocols: default, + connectionContext: null!, // TODO connection context is null here. Should we set it to anything? + _context.ServiceContext, + streamContext.Features, + _context.MemoryPool, + streamContext.LocalEndPoint as IPEndPoint, + streamContext.RemoteEndPoint as IPEndPoint, + streamContext.Transport, + _streamLifetimeHandler, + streamContext, + _clientSettings, + _serverSettings); + httpConnectionContext.TimeoutControl = _context.TimeoutControl; + return httpConnectionContext; + } + private void UpdateConnectionState() { if (_isClosed != 0) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs index 0d504f727b2a..d05e0f13ffcc 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs @@ -110,6 +110,11 @@ public Http3Stream(Http3StreamContext context) public bool IsRequestStream => true; + public void InitializeWithExistingContext() + { + Initialize(_context); + } + public void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode) { var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/ICachedHttp3StreamFeature.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/ICachedHttp3StreamFeature.cs new file mode 100644 index 000000000000..1d8fb2430651 --- /dev/null +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/ICachedHttp3StreamFeature.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3 +{ + internal interface ICachedHttp3StreamFeature where TContext : notnull + { + Http3Stream CachedStream { get; } + } + + internal class DefaultCachedHttp3StreamFeature : ICachedHttp3StreamFeature where TContext : notnull + { + public Http3Stream CachedStream { get; } + + public DefaultCachedHttp3StreamFeature(Http3Stream cachedStream) + { + CachedStream = cachedStream; + } + } +} diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs index ae4223dc2f3c..49d90c3e60be 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs @@ -160,9 +160,9 @@ await WaitForConnectionErrorAsync( [Fact] public async Task ControlStream_ServerToClient_ErrorInitializing_ConnectionError() { - OnCreateServerControlStream = () => + OnCreateServerControlStream = testStreamContext => { - var controlStream = new Http3ControlStream(this, StreamInitiator.Server); + var controlStream = new Http3ControlStream(this, testStreamContext); // Make server connection error when trying to write to control stream. controlStream.StreamContext.Transport.Output.Complete(); diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs index cf388abededa..bc84d0defa00 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs @@ -62,7 +62,7 @@ public abstract class Http3TestBase : TestApplicationErrorLoggerLoggedTest, IDis protected readonly RequestDelegate _echoPath; protected readonly RequestDelegate _echoHost; - protected Func OnCreateServerControlStream; + internal Func OnCreateServerControlStream; private Http3ControlStream _inboundControlStream; private long _currentStreamId; @@ -161,7 +161,7 @@ public Http3TestBase() internal ChannelReader> ServerReceivedSettingsReader => _serverReceivedSettings.Reader; - public TestMultiplexedConnectionContext MultiplexedConnectionContext { get; set; } + internal TestMultiplexedConnectionContext MultiplexedConnectionContext { get; set; } public override void Initialize(TestContext context, MethodInfo methodInfo, object[] testMethodArguments, ITestOutputHelper testOutputHelper) { @@ -431,14 +431,17 @@ private static long GetOutputResponseBufferSize(ServiceContext serviceContext) return bufferSize ?? 0; } - public ValueTask CreateControlStream() + internal ValueTask CreateControlStream() { return CreateControlStream(id: 0); } - public async ValueTask CreateControlStream(int? id) + internal async ValueTask CreateControlStream(int? id) { - var stream = new Http3ControlStream(this, StreamInitiator.Client); + var streamId = GetStreamId(0x02); + + var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, this, streamId); + var stream = new Http3ControlStream(this, testStreamContext); _runningStreams[stream.StreamId] = stream; MultiplexedConnectionContext.ToServerAcceptQueue.Writer.TryWrite(stream.StreamContext); @@ -451,36 +454,44 @@ public async ValueTask CreateControlStream(int? id) internal ValueTask CreateRequestStream() { - var stream = new Http3RequestStream(this, Connection); + var testStreamContext = new TestStreamContext(canRead: true, canWrite: true, this, GetStreamId(0x00)); + + var stream = new Http3RequestStream(this, Connection, testStreamContext); _runningStreams[stream.StreamId] = stream; MultiplexedConnectionContext.ToServerAcceptQueue.Writer.TryWrite(stream.StreamContext); return new ValueTask(stream); } - public ValueTask StartBidirectionalStreamAsync() - { - var stream = new Http3RequestStream(this, Connection); - // TODO put these somewhere to be read. - return new ValueTask(stream.StreamContext); - } - - public class Http3StreamBase : IProtocolErrorCodeFeature + internal class Http3StreamBase : IProtocolErrorCodeFeature { internal TaskCompletionSource _onStreamCreatedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); internal TaskCompletionSource _onStreamCompletedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); internal TaskCompletionSource _onHeaderReceivedTcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + internal ConnectionContext StreamContext { get; } + internal IProtocolErrorCodeFeature _protocolErrorCodeFeature; internal DuplexPipe.DuplexPipePair _pair; internal Http3TestBase _testBase; internal Http3Connection _connection; public long BytesReceived { get; private set; } - public long Error { get; set; } + public long Error + { + get => _protocolErrorCodeFeature.Error; + set => _protocolErrorCodeFeature.Error = value; + } public Task OnStreamCreatedTask => _onStreamCreatedTcs.Task; public Task OnStreamCompletedTask => _onStreamCompletedTcs.Task; public Task OnHeaderReceivedTask => _onHeaderReceivedTcs.Task; + public Http3StreamBase(TestStreamContext testStreamContext) + { + StreamContext = testStreamContext; + _protocolErrorCodeFeature = testStreamContext; + _pair = testStreamContext._pair; + } + protected Task SendAsync(ReadOnlySpan span) { var writableBuffer = _pair.Application.Output; @@ -587,8 +598,6 @@ internal class Http3RequestStream : Http3StreamBase, IHttpHeadersHandler private readonly TestStreamContext _testStreamContext; private readonly long _streamId; - internal ConnectionContext StreamContext { get; } - public bool CanRead => true; public bool CanWrite => true; @@ -600,17 +609,13 @@ internal class Http3RequestStream : Http3StreamBase, IHttpHeadersHandler private readonly QPackDecoder _qpackDecoder = new QPackDecoder(8192); protected readonly Dictionary _decodedHeaders = new Dictionary(StringComparer.OrdinalIgnoreCase); - public Http3RequestStream(Http3TestBase testBase, Http3Connection connection) + public Http3RequestStream(Http3TestBase testBase, Http3Connection connection, TestStreamContext testStreamContext) + : base(testStreamContext) { _testBase = testBase; _connection = connection; - var inputPipeOptions = GetInputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); - var outputPipeOptions = GetOutputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); - - _pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions); - _streamId = testBase.GetStreamId(0x00); - _testStreamContext = new TestStreamContext(canRead: true, canWrite: true, _pair, this, _streamId); - StreamContext = _testStreamContext; + _streamId = testStreamContext.StreamId; + _testStreamContext = testStreamContext; } public async Task SendHeadersAsync(IEnumerable> headers, bool endStream = false) @@ -706,9 +711,8 @@ public enum StreamInitiator Server } - public class Http3ControlStream : Http3StreamBase + internal class Http3ControlStream : Http3StreamBase { - internal ConnectionContext StreamContext { get; } private readonly long _streamId; public bool CanRead => true; @@ -716,19 +720,11 @@ public class Http3ControlStream : Http3StreamBase public long StreamId => _streamId; - public Http3ControlStream(Http3TestBase testBase, StreamInitiator initiator) + public Http3ControlStream(Http3TestBase testBase, TestStreamContext testStreamContext) + : base(testStreamContext) { _testBase = testBase; - var inputPipeOptions = GetInputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); - var outputPipeOptions = GetOutputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); - _pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions); - _streamId = testBase.GetStreamId(initiator == StreamInitiator.Client ? 0x02 : 0x03); - StreamContext = new TestStreamContext(canRead: true, canWrite: false, _pair, this, _streamId); - } - - public Http3ControlStream(ConnectionContext streamContext) - { - StreamContext = streamContext; + _streamId = testStreamContext.StreamId; } internal async Task> ExpectSettingsAsync() @@ -857,7 +853,7 @@ public async ValueTask TryReadStreamIdAsync() } } - public class TestMultiplexedConnectionContext : MultiplexedConnectionContext, IConnectionLifetimeNotificationFeature, IConnectionLifetimeFeature, IConnectionHeartbeatFeature, IProtocolErrorCodeFeature + internal class TestMultiplexedConnectionContext : MultiplexedConnectionContext, IConnectionLifetimeNotificationFeature, IConnectionLifetimeFeature, IConnectionHeartbeatFeature, IProtocolErrorCodeFeature { public readonly Channel ToServerAcceptQueue = Channel.CreateUnbounded(new UnboundedChannelOptions { @@ -926,7 +922,10 @@ public override async ValueTask AcceptAsync(CancellationToken public override ValueTask ConnectAsync(IFeatureCollection features = null, CancellationToken cancellationToken = default) { - var stream = _testBase.OnCreateServerControlStream?.Invoke() ?? new Http3ControlStream(_testBase, StreamInitiator.Server); + var streamId = _testBase.GetStreamId(0x03); + var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, _testBase, streamId); + + var stream = _testBase.OnCreateServerControlStream?.Invoke(testStreamContext) ?? new Http3ControlStream(_testBase, testStreamContext); ToClientAcceptQueue.Writer.WriteAsync(stream); return new ValueTask(stream.StreamContext); } @@ -941,16 +940,19 @@ public void RequestClose() } } - private class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature + internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature { - private readonly DuplexPipePair _pair; - public TestStreamContext(bool canRead, bool canWrite, DuplexPipePair pair, IProtocolErrorCodeFeature errorCodeFeature, long streamId) + internal readonly DuplexPipePair _pair; + public TestStreamContext(bool canRead, bool canWrite, Http3TestBase testBase, long streamId) { - _pair = pair; + var inputPipeOptions = GetInputPipeOptions(testBase._serviceContext, testBase._memoryPool, PipeScheduler.ThreadPool); + var outputPipeOptions = GetOutputPipeOptions(testBase._serviceContext, testBase._memoryPool, PipeScheduler.ThreadPool); + _pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions); + Features = new FeatureCollection(); Features.Set(this); Features.Set(this); - Features.Set(errorCodeFeature); + Features.Set< IProtocolErrorCodeFeature>(this); CanRead = canRead; CanWrite = canWrite; @@ -983,6 +985,8 @@ public override IDuplexPipe Transport public bool CanWrite { get; } + public long Error { get; set; } + public override void Abort(ConnectionAbortedException abortReason) { _pair.Application.Output.Complete(abortReason); From 383876e20538080c233df4d47ed4f2868bf81100 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Thu, 8 Jul 2021 22:34:26 +1200 Subject: [PATCH 07/10] Reuse context in tests --- .../src/Internal/CompletionPipeReader.cs | 62 ++++++++++ .../src/Internal/CompletionPipeWriter.cs | 67 +++++++++++ .../src/Internal/QuicStreamContext.cs | 109 ------------------ .../Http3/Http3ConnectionTests.cs | 48 ++++++++ .../Http3/Http3StreamTests.cs | 4 +- .../Http3/Http3TestBase.cs | 75 +++++++++--- .../Http3/Internal/CompletionPipeReader.cs | 62 ++++++++++ .../Http3/Internal/CompletionPipeWriter.cs | 67 +++++++++++ 8 files changed, 367 insertions(+), 127 deletions(-) create mode 100644 src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeReader.cs create mode 100644 src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeWriter.cs create mode 100644 src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeReader.cs create mode 100644 src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeWriter.cs diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeReader.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeReader.cs new file mode 100644 index 000000000000..83ae40296f5f --- /dev/null +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeReader.cs @@ -0,0 +1,62 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal +{ + internal sealed class CompletionPipeReader : PipeReader + { + private readonly PipeReader _inner; + + public bool IsComplete { get; private set; } + public Exception? CompleteException { get; private set; } + + public CompletionPipeReader(PipeReader inner) + { + _inner = inner; + } + + public override void AdvanceTo(SequencePosition consumed) + { + _inner.AdvanceTo(consumed); + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + _inner.AdvanceTo(consumed, examined); + } + + public override ValueTask CompleteAsync(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override void Complete(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override void CancelPendingRead() + { + _inner.CancelPendingRead(); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + return _inner.ReadAsync(cancellationToken); + } + + public override bool TryRead(out ReadResult result) + { + return _inner.TryRead(out result); + } + } +} diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeWriter.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeWriter.cs new file mode 100644 index 000000000000..32a35b8ca64c --- /dev/null +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/CompletionPipeWriter.cs @@ -0,0 +1,67 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal +{ + internal sealed class CompletionPipeWriter : PipeWriter + { + private readonly PipeWriter _inner; + + public bool IsComplete { get; private set; } + public Exception? CompleteException { get; private set; } + + public CompletionPipeWriter(PipeWriter inner) + { + _inner = inner; + } + + public override void Advance(int bytes) + { + _inner.Advance(bytes); + } + + public override void CancelPendingFlush() + { + _inner.CancelPendingFlush(); + } + + public override void Complete(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override ValueTask CompleteAsync(Exception? exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return _inner.WriteAsync(source, cancellationToken); + } + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + return _inner.FlushAsync(cancellationToken); + } + + public override Memory GetMemory(int sizeHint = 0) + { + return _inner.GetMemory(sizeHint); + } + + public override Span GetSpan(int sizeHint = 0) + { + return _inner.GetSpan(sizeHint); + } + } +} diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index 169b06e1f798..4a0e8f631a36 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -407,114 +407,5 @@ internal void DisposeCore() { _streamClosedTokenSource.Dispose(); } - - private sealed class CompletionPipeWriter : PipeWriter - { - private readonly PipeWriter _inner; - - public bool IsComplete { get; private set; } - public Exception? CompleteException { get; private set; } - - public CompletionPipeWriter(PipeWriter inner) - { - _inner = inner; - } - - public override void Advance(int bytes) - { - _inner.Advance(bytes); - } - - public override void CancelPendingFlush() - { - _inner.CancelPendingFlush(); - } - - public override void Complete(Exception? exception = null) - { - IsComplete = true; - CompleteException = exception; - _inner.Complete(exception); - } - - public override ValueTask CompleteAsync(Exception? exception = null) - { - IsComplete = true; - CompleteException = exception; - return _inner.CompleteAsync(exception); - } - - public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) - { - return _inner.WriteAsync(source, cancellationToken); - } - - public override ValueTask FlushAsync(CancellationToken cancellationToken = default) - { - return _inner.FlushAsync(cancellationToken); - } - - public override Memory GetMemory(int sizeHint = 0) - { - return _inner.GetMemory(sizeHint); - } - - public override Span GetSpan(int sizeHint = 0) - { - return _inner.GetSpan(sizeHint); - } - } - - private sealed class CompletionPipeReader : PipeReader - { - private readonly PipeReader _inner; - - public bool IsComplete { get; private set; } - public Exception? CompleteException { get; private set; } - - public CompletionPipeReader(PipeReader inner) - { - _inner = inner; - } - - public override void AdvanceTo(SequencePosition consumed) - { - _inner.AdvanceTo(consumed); - } - - public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) - { - _inner.AdvanceTo(consumed, examined); - } - - public override ValueTask CompleteAsync(Exception? exception = null) - { - IsComplete = true; - CompleteException = exception; - return _inner.CompleteAsync(exception); - } - - public override void Complete(Exception? exception = null) - { - IsComplete = true; - CompleteException = exception; - _inner.Complete(exception); - } - - public override void CancelPendingRead() - { - _inner.CancelPendingRead(); - } - - public override ValueTask ReadAsync(CancellationToken cancellationToken = default) - { - return _inner.ReadAsync(cancellationToken); - } - - public override bool TryRead(out ReadResult result) - { - return _inner.TryRead(out result); - } - } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs index 49d90c3e60be..f384874a61ac 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs @@ -204,5 +204,53 @@ await outboundcontrolStream.SendSettingsAsync(new List Assert.Equal(Internal.Http3.Http3SettingType.MaxFieldSectionSize, maxFieldSetting.Key); Assert.Equal(100, maxFieldSetting.Value); } + + [Fact] + public async Task StreamPool_MultipleStreamsInSequence_PooledStreamReused() + { + var headers = new[] + { + new KeyValuePair(HeaderNames.Method, "Custom"), + new KeyValuePair(HeaderNames.Path, "/"), + new KeyValuePair(HeaderNames.Scheme, "http"), + new KeyValuePair(HeaderNames.Authority, "localhost:80"), + }; + + await InitializeConnectionAsync(_echoApplication); + + var requestStream = await CreateRequestStream(); + var streamContext1 = requestStream.StreamContext; + + await requestStream.SendHeadersAsync(headers); + await requestStream.SendDataAsync(Encoding.ASCII.GetBytes("Hello world 1"), endStream: true); + + Assert.False(requestStream.Disposed); + + await requestStream.ExpectHeadersAsync(); + var responseData = await requestStream.ExpectDataAsync(); + Assert.Equal("Hello world 1", Encoding.ASCII.GetString(responseData.ToArray())); + + await requestStream.ExpectReceiveEndOfStream(); + + Assert.True(requestStream.Disposed); + + requestStream = await CreateRequestStream(); + var streamContext2 = requestStream.StreamContext; + + await requestStream.SendHeadersAsync(headers); + await requestStream.SendDataAsync(Encoding.ASCII.GetBytes("Hello world 2"), endStream: true); + + Assert.False(requestStream.Disposed); + + await requestStream.ExpectHeadersAsync(); + responseData = await requestStream.ExpectDataAsync(); + Assert.Equal("Hello world 2", Encoding.ASCII.GetString(responseData.ToArray())); + + await requestStream.ExpectReceiveEndOfStream(); + + Assert.True(requestStream.Disposed); + + Assert.Same(streamContext1, streamContext2); + } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs index af7ec5dc7fb0..0f8f1c1cf880 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3StreamTests.cs @@ -2435,12 +2435,12 @@ await InitializeConnectionAsync(context => var outboundcontrolStream = await CreateControlStream(); await outboundcontrolStream.SendSettingsAsync(new List { - new Http3PeerSetting(Internal.Http3.Http3SettingType.MaxFieldSectionSize, 100) + new Http3PeerSetting(Core.Internal.Http3.Http3SettingType.MaxFieldSectionSize, 100) }); var maxFieldSetting = await ServerReceivedSettingsReader.ReadAsync().DefaultTimeout(); - Assert.Equal(Internal.Http3.Http3SettingType.MaxFieldSectionSize, maxFieldSetting.Key); + Assert.Equal(Core.Internal.Http3.Http3SettingType.MaxFieldSectionSize, maxFieldSetting.Key); Assert.Equal(100, maxFieldSetting.Value); var requestStream = await CreateRequestStream().DefaultTimeout(); diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs index bc84d0defa00..b89d6197d000 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3TestBase.cs @@ -50,6 +50,7 @@ public abstract class Http3TestBase : TestApplicationErrorLoggerLoggedTest, IDis internal readonly Mock _mockTimeoutHandler = new Mock(); internal readonly Mock _mockTimeoutControl; internal readonly MemoryPool _memoryPool = PinnedBlockMemoryPoolFactory.Create(); + internal readonly ConcurrentQueue _streamContextPool = new ConcurrentQueue(); protected Task _connectionTask; protected readonly TaskCompletionSource _closedStateReached = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); @@ -438,9 +439,9 @@ internal ValueTask CreateControlStream() internal async ValueTask CreateControlStream(int? id) { - var streamId = GetStreamId(0x02); + var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, this); + testStreamContext.Initialize(GetStreamId(0x02)); - var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, this, streamId); var stream = new Http3ControlStream(this, testStreamContext); _runningStreams[stream.StreamId] = stream; @@ -454,7 +455,11 @@ internal async ValueTask CreateControlStream(int? id) internal ValueTask CreateRequestStream() { - var testStreamContext = new TestStreamContext(canRead: true, canWrite: true, this, GetStreamId(0x00)); + if (!_streamContextPool.TryDequeue(out var testStreamContext)) + { + testStreamContext = new TestStreamContext(canRead: true, canWrite: true, this); + } + testStreamContext.Initialize(GetStreamId(0x00)); var stream = new Http3RequestStream(this, Connection, testStreamContext); _runningStreams[stream.StreamId] = stream; @@ -922,8 +927,8 @@ public override async ValueTask AcceptAsync(CancellationToken public override ValueTask ConnectAsync(IFeatureCollection features = null, CancellationToken cancellationToken = default) { - var streamId = _testBase.GetStreamId(0x03); - var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, _testBase, streamId); + var testStreamContext = new TestStreamContext(canRead: true, canWrite: false, _testBase); + testStreamContext.Initialize(_testBase.GetStreamId(0x03)); var stream = _testBase.OnCreateServerControlStream?.Invoke(testStreamContext) ?? new Http3ControlStream(_testBase, testStreamContext); ToClientAcceptQueue.Writer.WriteAsync(stream); @@ -942,28 +947,57 @@ public void RequestClose() internal class TestStreamContext : ConnectionContext, IStreamDirectionFeature, IStreamIdFeature, IProtocolErrorCodeFeature { - internal readonly DuplexPipePair _pair; - public TestStreamContext(bool canRead, bool canWrite, Http3TestBase testBase, long streamId) - { - var inputPipeOptions = GetInputPipeOptions(testBase._serviceContext, testBase._memoryPool, PipeScheduler.ThreadPool); - var outputPipeOptions = GetOutputPipeOptions(testBase._serviceContext, testBase._memoryPool, PipeScheduler.ThreadPool); - _pair = DuplexPipe.CreateConnectionPair(inputPipeOptions, outputPipeOptions); + private readonly Http3TestBase _testBase; + internal DuplexPipePair _pair; + private Pipe _inputPipe; + private Pipe _outputPipe; + private CompletionPipeReader _transportPipeReader; + private CompletionPipeWriter _transportPipeWriter; + + private bool _isAborted; + + public TestStreamContext(bool canRead, bool canWrite, Http3TestBase testBase) + { Features = new FeatureCollection(); + CanRead = canRead; + CanWrite = canWrite; + _testBase = testBase; + } + + public void Initialize(long streamId) + { + // Create new pipes when test stream context is reused rather than reseting them. + // This is required because the client tests read from these directly from these pipes. + // When a request is finished they'll check to see whether there is anymore content + // in the Application.Output pipe. If it has been reset then that code will error. + var inputOptions = GetInputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); + var outputOptions = GetOutputPipeOptions(_testBase._serviceContext, _testBase._memoryPool, PipeScheduler.ThreadPool); + + _inputPipe = new Pipe(inputOptions); + _outputPipe = new Pipe(outputOptions); + + _transportPipeReader = new CompletionPipeReader(_inputPipe.Reader); + _transportPipeWriter = new CompletionPipeWriter(_outputPipe.Writer); + + _pair = new DuplexPipePair( + new DuplexPipe(_transportPipeReader, _transportPipeWriter), + new DuplexPipe(_outputPipe.Reader, _inputPipe.Writer)); + Features.Set(this); Features.Set(this); - Features.Set< IProtocolErrorCodeFeature>(this); + Features.Set(this); - CanRead = canRead; - CanWrite = canWrite; StreamId = streamId; + + Disposed = false; } public bool Disposed { get; private set; } public override string ConnectionId { get; set; } - public long StreamId { get; } + public long StreamId { get; private set; } public override IFeatureCollection Features { get; } @@ -989,13 +1023,22 @@ public override IDuplexPipe Transport public override void Abort(ConnectionAbortedException abortReason) { + _isAborted = true; _pair.Application.Output.Complete(abortReason); } public override ValueTask DisposeAsync() { Disposed = true; - return base.DisposeAsync(); + + if (!_isAborted && + _transportPipeReader.IsComplete && _transportPipeReader.CompleteException == null && + _transportPipeWriter.IsComplete && _transportPipeWriter.CompleteException == null) + { + _testBase._streamContextPool.Enqueue(this); + } + + return ValueTask.CompletedTask; } } } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeReader.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeReader.cs new file mode 100644 index 000000000000..506a76200d06 --- /dev/null +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeReader.cs @@ -0,0 +1,62 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests +{ + internal sealed class CompletionPipeReader : PipeReader + { + private readonly PipeReader _inner; + + public bool IsComplete { get; private set; } + public Exception CompleteException { get; private set; } + + public CompletionPipeReader(PipeReader inner) + { + _inner = inner; + } + + public override void AdvanceTo(SequencePosition consumed) + { + _inner.AdvanceTo(consumed); + } + + public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) + { + _inner.AdvanceTo(consumed, examined); + } + + public override ValueTask CompleteAsync(Exception exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override void Complete(Exception exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override void CancelPendingRead() + { + _inner.CancelPendingRead(); + } + + public override ValueTask ReadAsync(CancellationToken cancellationToken = default) + { + return _inner.ReadAsync(cancellationToken); + } + + public override bool TryRead(out ReadResult result) + { + return _inner.TryRead(out result); + } + } +} diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeWriter.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeWriter.cs new file mode 100644 index 000000000000..5b8183b3ea23 --- /dev/null +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Internal/CompletionPipeWriter.cs @@ -0,0 +1,67 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.Server.Kestrel.Core.Tests +{ + internal sealed class CompletionPipeWriter : PipeWriter + { + private readonly PipeWriter _inner; + + public bool IsComplete { get; private set; } + public Exception CompleteException { get; private set; } + + public CompletionPipeWriter(PipeWriter inner) + { + _inner = inner; + } + + public override void Advance(int bytes) + { + _inner.Advance(bytes); + } + + public override void CancelPendingFlush() + { + _inner.CancelPendingFlush(); + } + + public override void Complete(Exception exception = null) + { + IsComplete = true; + CompleteException = exception; + _inner.Complete(exception); + } + + public override ValueTask CompleteAsync(Exception exception = null) + { + IsComplete = true; + CompleteException = exception; + return _inner.CompleteAsync(exception); + } + + public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) + { + return _inner.WriteAsync(source, cancellationToken); + } + + public override ValueTask FlushAsync(CancellationToken cancellationToken = default) + { + return _inner.FlushAsync(cancellationToken); + } + + public override Memory GetMemory(int sizeHint = 0) + { + return _inner.GetMemory(sizeHint); + } + + public override Span GetSpan(int sizeHint = 0) + { + return _inner.GetSpan(sizeHint); + } + } +} From 7f2da6ae95e02766de6ec8717ebf351a298410fd Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Fri, 9 Jul 2021 14:07:47 +1200 Subject: [PATCH 08/10] Reset Http3Stream --- .../src/Internal/Http2/Http2StreamContext.cs | 2 +- .../src/Internal/Http3/Http3Connection.cs | 15 +++-- .../Core/src/Internal/Http3/Http3Stream.cs | 64 +++++++++++-------- .../Core/src/Internal/Http3/Http3StreamOfT.cs | 3 +- .../Core/src/Internal/Http3StreamContext.cs | 3 +- .../src/Internal/HttpConnectionContext.cs | 6 +- .../Middleware/HttpConnectionMiddleware.cs | 4 +- ...Http3HttpProtocolFeatureCollectionTests.cs | 4 -- .../Kestrel/shared/test/TestContextFactory.cs | 6 +- .../Http3/Http3ConnectionTests.cs | 2 + 10 files changed, 58 insertions(+), 51 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs index 892a2567ab22..7964884bc361 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2StreamContext.cs @@ -26,7 +26,7 @@ public Http2StreamContext( Http2PeerSettings serverPeerSettings, Http2FrameWriter frameWriter, InputFlowControl connectionInputFlowControl, - OutputFlowControl connectionOutputFlowControl) : base(connectionId, protocols, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint, transport: null!) + OutputFlowControl connectionOutputFlowControl) : base(connectionId, protocols, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) { StreamId = streamId; StreamLifetimeHandler = streamLifetimeHandler; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs index 89b53af3ff91..881775a2930c 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs @@ -273,15 +273,15 @@ public async Task ProcessRequestsAsync(IHttpApplication appl // Check whether there is an existing HTTP/3 stream on the transport stream. // A stream will only be cached if the transport stream itself is reused. var stream = streamContext.Features.Get>()?.CachedStream; - //if (stream == null) + if (stream == null) { stream = new Http3Stream(application, CreateHttpStreamContext(streamContext)); streamContext.Features.Set>(new DefaultCachedHttp3StreamFeature(stream)); } - //else - //{ - // stream.InitializeWithExistingContext(); - //} + else + { + stream.InitializeWithExistingContext(streamContext.Transport); + } _streamLifetimeHandler.OnStreamCreated(stream); @@ -378,12 +378,13 @@ private Http3StreamContext CreateHttpStreamContext(ConnectionContext streamConte _context.MemoryPool, streamContext.LocalEndPoint as IPEndPoint, streamContext.RemoteEndPoint as IPEndPoint, - streamContext.Transport, _streamLifetimeHandler, streamContext, _clientSettings, _serverSettings); httpConnectionContext.TimeoutControl = _context.TimeoutControl; + httpConnectionContext.Transport = streamContext.Transport; + return httpConnectionContext; } @@ -459,12 +460,12 @@ private async ValueTask CreateNewUnidirectionalStreamAsync(application, httpConnectionContext); } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs index d05e0f13ffcc..9e34c83207d8 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs @@ -36,12 +36,12 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH private const PseudoHeaderFields _mandatoryRequestPseudoHeaderFields = PseudoHeaderFields.Method | PseudoHeaderFields.Path | PseudoHeaderFields.Scheme; - private readonly Http3FrameWriter _frameWriter; - private readonly Http3OutputProducer _http3Output; + private Http3FrameWriter _frameWriter = default!; + private Http3OutputProducer _http3Output = default!; + private Http3StreamContext _context = default!; + private IProtocolErrorCodeFeature _errorCodeFeature = default!; + private IStreamIdFeature _streamIdFeature = default!; private int _isClosed; - private readonly Http3StreamContext _context; - private readonly IProtocolErrorCodeFeature _errorCodeFeature; - private readonly IStreamIdFeature _streamIdFeature; private readonly Http3RawFrame _incomingFrame = new Http3RawFrame(); protected RequestHeaderParsingState _requestHeaderParsingState; private PseudoHeaderFields _parsedPseudoHeaderFields; @@ -57,11 +57,26 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted; internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived; - public Pipe RequestBodyPipe { get; } + public Pipe RequestBodyPipe { get; private set; } = default!; - public Http3Stream(Http3StreamContext context) + public long? InputRemaining { get; internal set; } + + public QPackDecoder QPackDecoder { get; private set; } = default!; + + public PipeReader Input => _context.Transport.Input; + + public ISystemClock SystemClock => _context.ServiceContext.SystemClock; + public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits; + public long StreamId => _streamIdFeature.StreamId; + + public long HeaderTimeoutTicks { get; set; } + public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received + + public bool IsRequestStream => true; + + public void Initialize(Http3StreamContext context) { - Initialize(context); + base.Initialize(context); InputRemaining = null; @@ -70,6 +85,15 @@ public Http3Stream(Http3StreamContext context) _errorCodeFeature = _context.ConnectionFeatures.Get()!; _streamIdFeature = _context.ConnectionFeatures.Get()!; + _appCompleted = null; + _isClosed = 0; + _requestHeaderParsingState = default; + _parsedPseudoHeaderFields = default; + _totalParsedHeaderSize = 0; + _isMethodConnect = false; + _completionState = default; + HeaderTimeoutTicks = 0; + _frameWriter = new Http3FrameWriter( context.Transport.Output, context.StreamContext, @@ -82,9 +106,6 @@ public Http3Stream(Http3StreamContext context) context.ClientPeerSettings, this); - // ResponseHeaders aren't set, kind of ugly that we need to reset. - Reset(); - _http3Output = new Http3OutputProducer( _frameWriter, context.MemoryPool, @@ -95,23 +116,9 @@ public Http3Stream(Http3StreamContext context) QPackDecoder = new QPackDecoder(_context.ServiceContext.ServerOptions.Limits.Http3.MaxRequestHeaderFieldSize); } - public long? InputRemaining { get; internal set; } - - public QPackDecoder QPackDecoder { get; } - - public PipeReader Input => _context.Transport.Input; - - public ISystemClock SystemClock => _context.ServiceContext.SystemClock; - public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits; - public long StreamId => _streamIdFeature.StreamId; - - public long HeaderTimeoutTicks { get; set; } - public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received - - public bool IsRequestStream => true; - - public void InitializeWithExistingContext() + public void InitializeWithExistingContext(IDuplexPipe transport) { + _context.Transport = transport; Initialize(_context); } @@ -629,6 +636,9 @@ private Task ProcessDataFrameAsync(in ReadOnlySequence payload) protected override void OnReset() { + _keepAlive = true; + _connectionAborted = false; + // Reset Http3 Features _currentIHttpMinRequestBodyDataRateFeature = this; _currentIHttpResponseTrailersFeature = this; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3StreamOfT.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3StreamOfT.cs index 63a25e94f485..6d26a1703a84 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3StreamOfT.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3StreamOfT.cs @@ -11,8 +11,9 @@ class Http3Stream : Http3Stream, IHostContextContainer where { private readonly IHttpApplication _application; - public Http3Stream(IHttpApplication application, Http3StreamContext context) : base(context) + public Http3Stream(IHttpApplication application, Http3StreamContext context) { + Initialize(context); _application = application; } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3StreamContext.cs b/src/Servers/Kestrel/Core/src/Internal/Http3StreamContext.cs index 7ac8a69e27fc..78bc19b24d99 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3StreamContext.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3StreamContext.cs @@ -21,11 +21,10 @@ public Http3StreamContext( MemoryPool memoryPool, IPEndPoint? localEndPoint, IPEndPoint? remoteEndPoint, - IDuplexPipe transport, IHttp3StreamLifetimeHandler streamLifetimeHandler, ConnectionContext streamContext, Http3PeerSettings clientPeerSettings, - Http3PeerSettings serverPeerSettings) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint, transport) + Http3PeerSettings serverPeerSettings) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) { StreamLifetimeHandler = streamLifetimeHandler; StreamContext = streamContext; diff --git a/src/Servers/Kestrel/Core/src/Internal/HttpConnectionContext.cs b/src/Servers/Kestrel/Core/src/Internal/HttpConnectionContext.cs index 0070e08edecf..2536ca6198dc 100644 --- a/src/Servers/Kestrel/Core/src/Internal/HttpConnectionContext.cs +++ b/src/Servers/Kestrel/Core/src/Internal/HttpConnectionContext.cs @@ -19,12 +19,10 @@ public HttpConnectionContext( IFeatureCollection connectionFeatures, MemoryPool memoryPool, IPEndPoint? localEndPoint, - IPEndPoint? remoteEndPoint, - IDuplexPipe transport) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) + IPEndPoint? remoteEndPoint) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint) { - Transport = transport; } - public IDuplexPipe Transport { get; } + public IDuplexPipe Transport { get; set; } = default!; } } diff --git a/src/Servers/Kestrel/Core/src/Middleware/HttpConnectionMiddleware.cs b/src/Servers/Kestrel/Core/src/Middleware/HttpConnectionMiddleware.cs index 5b49222eb487..033e72cf5608 100644 --- a/src/Servers/Kestrel/Core/src/Middleware/HttpConnectionMiddleware.cs +++ b/src/Servers/Kestrel/Core/src/Middleware/HttpConnectionMiddleware.cs @@ -34,8 +34,8 @@ public Task OnConnectionAsync(ConnectionContext connectionContext) connectionContext.Features, memoryPoolFeature?.MemoryPool ?? System.Buffers.MemoryPool.Shared, connectionContext.LocalEndPoint as IPEndPoint, - connectionContext.RemoteEndPoint as IPEndPoint, - connectionContext.Transport); + connectionContext.RemoteEndPoint as IPEndPoint); + httpConnectionContext.Transport = connectionContext.Transport; var connection = new HttpConnection(httpConnectionContext); diff --git a/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs b/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs index d1474418ca23..3b652bebaade 100644 --- a/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs +++ b/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs @@ -59,10 +59,6 @@ public void Http3StreamFeatureCollectionDoesIncludeIHttpMinRequestBodyDataRateFe private class TestHttp3Stream : Http3Stream { - public TestHttp3Stream(Http3StreamContext context) : base(context) - { - } - public override void Execute() { } diff --git a/src/Servers/Kestrel/shared/test/TestContextFactory.cs b/src/Servers/Kestrel/shared/test/TestContextFactory.cs index 232f1aed8416..d89ae10030bf 100644 --- a/src/Servers/Kestrel/shared/test/TestContextFactory.cs +++ b/src/Servers/Kestrel/shared/test/TestContextFactory.cs @@ -66,9 +66,9 @@ public static HttpConnectionContext CreateHttpConnectionContext( connectionFeatures, memoryPool ?? MemoryPool.Shared, localEndPoint, - remoteEndPoint, - transport); + remoteEndPoint); context.TimeoutControl = timeoutControl; + context.Transport = transport; return context; } @@ -185,13 +185,13 @@ public static Http3StreamContext CreateHttp3StreamContext( memoryPool: memoryPool ?? MemoryPool.Shared, localEndPoint: localEndPoint, remoteEndPoint: remoteEndPoint, - transport: transport, streamLifetimeHandler: streamLifetimeHandler, streamContext: null, clientPeerSettings: new Http3PeerSettings(), serverPeerSettings: null ); context.TimeoutControl = timeoutControl; + context.Transport = transport; return context; } diff --git a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs index f384874a61ac..87082d797a84 100644 --- a/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs +++ b/src/Servers/Kestrel/test/InMemory.FunctionalTests/Http3/Http3ConnectionTests.cs @@ -232,6 +232,8 @@ public async Task StreamPool_MultipleStreamsInSequence_PooledStreamReused() await requestStream.ExpectReceiveEndOfStream(); + await requestStream.OnStreamCompletedTask.DefaultTimeout(); + Assert.True(requestStream.Disposed); requestStream = await CreateRequestStream(); From 9047c8964d3b0e4df42263b31db27861ce8d072f Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sun, 11 Jul 2021 17:14:39 +1200 Subject: [PATCH 09/10] Update --- .../src/Internal/Http/Http1OutputProducer.cs | 3 +- .../src/Internal/Http2/Http2FrameWriter.cs | 3 +- .../src/Internal/Http2/Http2OutputProducer.cs | 3 +- .../src/Internal/Http3/Http3ControlStream.cs | 3 +- .../src/Internal/Http3/Http3FrameWriter.cs | 24 ++++++--- .../src/Internal/Http3/Http3OutputProducer.cs | 52 ++++++++++++------- .../Core/src/Internal/Http3/Http3Stream.cs | 49 ++++++++++------- .../PipeWriterHelpers/TimingPipeFlusher.cs | 9 ++-- .../Core/test/Http3FrameWriterTests.cs | 5 +- ...Http3HttpProtocolFeatureCollectionTests.cs | 4 +- 10 files changed, 100 insertions(+), 55 deletions(-) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs index 9394b587eb08..b0c983251d88 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs @@ -91,7 +91,8 @@ public Http1OutputProducer( _minResponseDataRateFeature = minResponseDataRateFeature; _outputAborter = outputAborter; - _flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl, log); + _flusher = new TimingPipeFlusher(timeoutControl, log); + _flusher.Initialize(_pipeWriter); } public Task WriteDataAsync(ReadOnlySpan buffer, CancellationToken cancellationToken = default) diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs index 89d23dd5edf5..7c9a5577cebd 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs @@ -67,7 +67,8 @@ public Http2FrameWriter( _log = serviceContext.Log; _timeoutControl = timeoutControl; _minResponseDataRate = minResponseDataRate; - _flusher = new TimingPipeFlusher(_outputWriter, timeoutControl, serviceContext.Log); + _flusher = new TimingPipeFlusher(timeoutControl, serviceContext.Log); + _flusher.Initialize(_outputWriter); _outgoingFrame = new Http2Frame(); _headerEncodingBuffer = new byte[_maxFrameSize]; diff --git a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs index 01e2b3d69265..3cd1eac8a844 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs @@ -72,7 +72,8 @@ public Http2OutputProducer(Http2Stream stream, Http2StreamContext context, Strea // No need to pass in timeoutControl here, since no minDataRates are passed to the TimingPipeFlusher. // The minimum output data rate is enforced at the connection level by Http2FrameWriter. - _flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, _log); + _flusher = new TimingPipeFlusher(timeoutControl: null, _log); + _flusher.Initialize(_pipeWriter); _dataWriteProcessingTask = ProcessDataWrites(); } diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs index 20965652b0d8..0f7823107105 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3ControlStream.cs @@ -48,16 +48,15 @@ public Http3ControlStream(Http3StreamContext context) _headerType = -1; _frameWriter = new Http3FrameWriter( - context.Transport.Output, context.StreamContext, context.TimeoutControl, httpLimits.MinResponseDataRate, - context.ConnectionId, context.MemoryPool, context.ServiceContext.Log, _streamIdFeature, context.ClientPeerSettings, this); + _frameWriter.Reset(context.Transport.Output, context.ConnectionId); } private void OnStreamClosed() diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3FrameWriter.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3FrameWriter.cs index a7f40e0fbe1e..2063457ba52c 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3FrameWriter.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3FrameWriter.cs @@ -28,11 +28,9 @@ internal class Http3FrameWriter private readonly object _writeLock = new object(); private readonly int _maxTotalHeaderSize; - private readonly PipeWriter _outputWriter; private readonly ConnectionContext _connectionContext; private readonly ITimeoutControl _timeoutControl; private readonly MinDataRate? _minResponseDataRate; - private readonly string _connectionId; private readonly MemoryPool _memoryPool; private readonly IKestrelTrace _log; private readonly IStreamIdFeature _streamIdFeature; @@ -40,6 +38,9 @@ internal class Http3FrameWriter private readonly Http3RawFrame _outgoingFrame; private readonly TimingPipeFlusher _flusher; + private PipeWriter _outputWriter = default!; + private string _connectionId = default!; + // HTTP/3 doesn't have a max frame size (peer can optionally specify a size). // Write headers to a buffer that can grow. Possible performance improvement // by writing directly to output writer (difficult as frame length is prefixed). @@ -51,19 +52,17 @@ internal class Http3FrameWriter private bool _completed; private bool _aborted; - public Http3FrameWriter(PipeWriter output, ConnectionContext connectionContext, ITimeoutControl timeoutControl, MinDataRate? minResponseDataRate, string connectionId, MemoryPool memoryPool, IKestrelTrace log, IStreamIdFeature streamIdFeature, Http3PeerSettings clientPeerSettings, IHttp3Stream http3Stream) + public Http3FrameWriter(ConnectionContext connectionContext, ITimeoutControl timeoutControl, MinDataRate? minResponseDataRate, MemoryPool memoryPool, IKestrelTrace log, IStreamIdFeature streamIdFeature, Http3PeerSettings clientPeerSettings, IHttp3Stream http3Stream) { - _outputWriter = output; _connectionContext = connectionContext; _timeoutControl = timeoutControl; _minResponseDataRate = minResponseDataRate; - _connectionId = connectionId; _memoryPool = memoryPool; _log = log; _streamIdFeature = streamIdFeature; _http3Stream = http3Stream; _outgoingFrame = new Http3RawFrame(); - _flusher = new TimingPipeFlusher(_outputWriter, timeoutControl, log); + _flusher = new TimingPipeFlusher(timeoutControl, log); _headerEncodingBuffer = new ArrayBufferWriter(HeaderBufferSize); // Note that max total header size value doesn't react to settings change during a stream. @@ -75,6 +74,19 @@ public Http3FrameWriter(PipeWriter output, ConnectionContext connectionContext, : (int)clientPeerSettings.MaxRequestHeaderFieldSectionSize; } + public void Reset(PipeWriter output, string connectionId) + { + _outputWriter = output; + _flusher.Initialize(output); + _connectionId = connectionId; + + _headersTotalSize = 0; + _headerEncodingBuffer.Clear(); + _unflushedBytes = 0; + _completed = false; + _aborted = false; + } + internal Task WriteSettingsAsync(List settings) { _outgoingFrame.PrepareSettings(); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs index 3cf763cf3d5a..33a06b009a66 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs @@ -24,12 +24,13 @@ internal class Http3OutputProducer : IHttpOutputProducer, IHttpOutputAborter private readonly IKestrelTrace _log; private readonly MemoryPool _memoryPool; private readonly Http3Stream _stream; + private readonly Pipe _pipe; private readonly PipeWriter _pipeWriter; private readonly PipeReader _pipeReader; private readonly object _dataWriterLock = new object(); - private readonly ValueTask _dataWriteProcessingTask; + private ValueTask _dataWriteProcessingTask; private bool _startedWritingDataFrames; - private bool _completed; + private bool _streamCompleted; private bool _disposed; private bool _suffixSent; private IMemoryOwner? _fakeMemoryOwner; @@ -45,12 +46,27 @@ public Http3OutputProducer( _stream = stream; _log = log; - var pipe = CreateDataPipe(pool); + _pipe = CreateDataPipe(pool); - _pipeWriter = pipe.Writer; - _pipeReader = pipe.Reader; + _pipeWriter = _pipe.Writer; + _pipeReader = _pipe.Reader; + + _flusher = new TimingPipeFlusher(timeoutControl: null, log); + _flusher.Initialize(_pipeWriter); + _dataWriteProcessingTask = ProcessDataWrites().Preserve(); + } + + public void StreamReset() + { + // Data background task has finished. + Debug.Assert(_dataWriteProcessingTask.IsCompleted); + + _suffixSent = false; + _startedWritingDataFrames = false; + _streamCompleted = false; + + _pipe.Reset(); - _flusher = new TimingPipeFlusher(_pipeWriter, timeoutControl: null, log); _dataWriteProcessingTask = ProcessDataWrites().Preserve(); } @@ -91,7 +107,7 @@ public void Advance(int bytes) { ThrowIfSuffixSent(); - if (_completed) + if (_streamCompleted) { return; } @@ -106,7 +122,7 @@ public void CancelPendingFlush() { lock (_dataWriterLock) { - if (_completed) + if (_streamCompleted) { return; } @@ -141,7 +157,7 @@ public ValueTask FlushAsync(CancellationToken cancellationToken) { ThrowIfSuffixSent(); - if (_completed) + if (_streamCompleted) { return default; } @@ -167,7 +183,7 @@ public Memory GetMemory(int sizeHint = 0) { ThrowIfSuffixSent(); - if (_completed) + if (_streamCompleted) { return GetFakeMemory(sizeHint); } @@ -183,7 +199,7 @@ public Span GetSpan(int sizeHint = 0) { ThrowIfSuffixSent(); - if (_completed) + if (_streamCompleted) { return GetFakeMemory(sizeHint).Span; } @@ -225,12 +241,12 @@ public void Stop() { lock (_dataWriterLock) { - if (_completed) + if (_streamCompleted) { return; } - _completed = true; + _streamCompleted = true; _pipeWriter.Complete(new OperationCanceledException()); } @@ -259,7 +275,7 @@ public Task WriteDataAsync(ReadOnlySpan data, CancellationToken cancellati // This length check is important because we don't want to set _startedWritingDataFrames unless a data // frame will actually be written causing the headers to be flushed. - if (_completed || data.Length == 0) + if (_streamCompleted || data.Length == 0) { return Task.CompletedTask; } @@ -284,7 +300,7 @@ public ValueTask WriteDataToPipeAsync(ReadOnlySpan data, Canc // This length check is important because we don't want to set _startedWritingDataFrames unless a data // frame will actually be written causing the headers to be flushed. - if (_completed || data.Length == 0) + if (_streamCompleted || data.Length == 0) { return default; } @@ -300,7 +316,7 @@ public void WriteResponseHeaders(int statusCode, string? reasonPhrase, HttpRespo { lock (_dataWriterLock) { - if (_completed) + if (_streamCompleted) { return; } @@ -318,12 +334,12 @@ public ValueTask WriteStreamSuffixAsync() { lock (_dataWriterLock) { - if (_completed) + if (_streamCompleted) { return _dataWriteProcessingTask; } - _completed = true; + _streamCompleted = true; _suffixSent = true; _pipeWriter.Complete(); diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs index 9e34c83207d8..eb24826dc390 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs @@ -94,26 +94,35 @@ public void Initialize(Http3StreamContext context) _completionState = default; HeaderTimeoutTicks = 0; - _frameWriter = new Http3FrameWriter( - context.Transport.Output, - context.StreamContext, - context.TimeoutControl, - context.ServiceContext.ServerOptions.Limits.MinResponseDataRate, - context.ConnectionId, - context.MemoryPool, - context.ServiceContext.Log, - _streamIdFeature, - context.ClientPeerSettings, - this); - - _http3Output = new Http3OutputProducer( - _frameWriter, - context.MemoryPool, - this, - context.ServiceContext.Log); - RequestBodyPipe = CreateRequestBodyPipe(64 * 1024); // windowSize? - Output = _http3Output; - QPackDecoder = new QPackDecoder(_context.ServiceContext.ServerOptions.Limits.Http3.MaxRequestHeaderFieldSize); + if (_frameWriter == null) + { + _frameWriter = new Http3FrameWriter( + context.StreamContext, + context.TimeoutControl, + context.ServiceContext.ServerOptions.Limits.MinResponseDataRate, + context.MemoryPool, + context.ServiceContext.Log, + _streamIdFeature, + context.ClientPeerSettings, + this); + + _http3Output = new Http3OutputProducer( + _frameWriter, + context.MemoryPool, + this, + context.ServiceContext.Log); + Output = _http3Output; + RequestBodyPipe = CreateRequestBodyPipe(64 * 1024); // windowSize? + QPackDecoder = new QPackDecoder(_context.ServiceContext.ServerOptions.Limits.Http3.MaxRequestHeaderFieldSize); + } + else + { + _http3Output.StreamReset(); + RequestBodyPipe.Reset(); + QPackDecoder.Reset(); + } + + _frameWriter.Reset(context.Transport.Output, context.ConnectionId); } public void InitializeWithExistingContext(IDuplexPipe transport) diff --git a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs index 686932a72e6a..45fca1cf7e21 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Infrastructure/PipeWriterHelpers/TimingPipeFlusher.cs @@ -19,20 +19,23 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeW /// internal class TimingPipeFlusher { - private readonly PipeWriter _writer; + private PipeWriter _writer = default!; private readonly ITimeoutControl? _timeoutControl; private readonly IKestrelTrace _log; public TimingPipeFlusher( - PipeWriter writer, ITimeoutControl? timeoutControl, IKestrelTrace log) { - _writer = writer; _timeoutControl = timeoutControl; _log = log; } + public void Initialize(PipeWriter output) + { + _writer = output; + } + public ValueTask FlushAsync() { return FlushAsync(outputAborter: null, cancellationToken: default); diff --git a/src/Servers/Kestrel/Core/test/Http3FrameWriterTests.cs b/src/Servers/Kestrel/Core/test/Http3FrameWriterTests.cs index b3e39012de69..14ff1406ca21 100644 --- a/src/Servers/Kestrel/Core/test/Http3FrameWriterTests.cs +++ b/src/Servers/Kestrel/Core/test/Http3FrameWriterTests.cs @@ -88,7 +88,10 @@ public async Task WriteSettings_TwoSettingsWritten() private Http3FrameWriter CreateFrameWriter(Pipe pipe) { - return new Http3FrameWriter(pipe.Writer, null, null, null, null, _dirtyMemoryPool, null, Mock.Of(), new Http3PeerSettings(), null); + var frameWriter = new Http3FrameWriter(null, null, null, _dirtyMemoryPool, null, Mock.Of(), new Http3PeerSettings(), null); + frameWriter.Reset(pipe.Writer, null); + + return frameWriter; } } } diff --git a/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs b/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs index 3b652bebaade..9716dd61efbb 100644 --- a/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs +++ b/src/Servers/Kestrel/Core/test/Http3HttpProtocolFeatureCollectionTests.cs @@ -20,8 +20,8 @@ public Http3HttpProtocolFeatureCollectionTests() { var streamContext = TestContextFactory.CreateHttp3StreamContext(transport: DuplexPipe.CreateConnectionPair(new PipeOptions(), new PipeOptions()).Application); - var http3Stream = new TestHttp3Stream(streamContext); - http3Stream.Reset(); + var http3Stream = new TestHttp3Stream(); + http3Stream.Initialize(streamContext); _http3Collection = http3Stream; } From 6eed9e80e343573007105cfb84a7ae2057e04595 Mon Sep 17 00:00:00 2001 From: James Newton-King Date: Sun, 11 Jul 2021 19:07:14 +1200 Subject: [PATCH 10/10] Add PersistentState --- .../src/ConnectionContext.cs | 25 +++++++++++++++++++ .../src/PublicAPI.Unshipped.txt | 2 ++ .../src/Internal/Http3/Http3Connection.cs | 10 +++++--- .../src/Internal/QuicConnectionContext.cs | 5 ++++ .../Kestrel/shared/TransportConnection.cs | 5 ++++ 5 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/Servers/Connections.Abstractions/src/ConnectionContext.cs b/src/Servers/Connections.Abstractions/src/ConnectionContext.cs index 02b291c2c816..447a1370c1ab 100644 --- a/src/Servers/Connections.Abstractions/src/ConnectionContext.cs +++ b/src/Servers/Connections.Abstractions/src/ConnectionContext.cs @@ -2,6 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System; +using System.Collections.Generic; using System.IO.Pipelines; using Microsoft.AspNetCore.Connections.Features; @@ -12,6 +13,30 @@ namespace Microsoft.AspNetCore.Connections /// public abstract class ConnectionContext : BaseConnectionContext, IAsyncDisposable { + internal IDictionary? _persistentState; + + /// + /// Gets or sets a key/value collection that can be used to persist state between connections. + /// Whether a transport pools and reuses instances and allows state to + /// be persisted depends on the transport implementation. + /// + /// Because values added to persistent state can live in memory until a + /// is no longer pooled, use caution with this collection to avoid excessive memory use. + /// + /// + public virtual IDictionary PersistentState + { + get + { + // Lazily allocate connection metadata + return _persistentState ?? (_persistentState = new ConnectionItems()); + } + set + { + _persistentState = value; + } + } + /// /// Gets or sets the that can be used to read or write data on this connection. /// diff --git a/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt b/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt index 88c8c23149f2..20715d61ceab 100644 --- a/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt +++ b/src/Servers/Connections.Abstractions/src/PublicAPI.Unshipped.txt @@ -25,3 +25,5 @@ Microsoft.AspNetCore.Connections.MultiplexedConnectionContext.MultiplexedConnect Microsoft.AspNetCore.Connections.MultiplexedConnectionDelegate abstract Microsoft.AspNetCore.Connections.MultiplexedConnectionContext.AcceptAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask abstract Microsoft.AspNetCore.Connections.MultiplexedConnectionContext.ConnectAsync(Microsoft.AspNetCore.Http.Features.IFeatureCollection? features = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +virtual Microsoft.AspNetCore.Connections.ConnectionContext.PersistentState.get -> System.Collections.Generic.IDictionary! +virtual Microsoft.AspNetCore.Connections.ConnectionContext.PersistentState.set -> void diff --git a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs index 881775a2930c..405724726be4 100644 --- a/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs +++ b/src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs @@ -23,6 +23,8 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3 { internal class Http3Connection : IHttp3StreamLifetimeHandler, IRequestProcessor { + private static readonly object StreamPersistentStateKey = new object(); + // Internal for unit testing internal readonly Dictionary _streams = new Dictionary(); internal IHttp3StreamLifetimeHandler _streamLifetimeHandler; @@ -270,16 +272,18 @@ public async Task ProcessRequestsAsync(IHttpApplication appl // Request stream UpdateHighestStreamId(streamIdFeature.StreamId); + Http3Stream stream; + // Check whether there is an existing HTTP/3 stream on the transport stream. // A stream will only be cached if the transport stream itself is reused. - var stream = streamContext.Features.Get>()?.CachedStream; - if (stream == null) + if (!streamContext.PersistentState.TryGetValue(StreamPersistentStateKey, out var s)) { stream = new Http3Stream(application, CreateHttpStreamContext(streamContext)); - streamContext.Features.Set>(new DefaultCachedHttp3StreamFeature(stream)); + streamContext.PersistentState.Add(StreamPersistentStateKey, stream); } else { + stream = (Http3Stream)s!; stream.InitializeWithExistingContext(streamContext.Transport); } diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs index ad5174892e9b..3ce1ec468034 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicConnectionContext.cs @@ -94,6 +94,11 @@ public override void Abort(ConnectionAbortedException abortReason) { context = new QuicStreamContext(this, _context); } + else + { + context.ResetFeatureCollection(); + context.ResetItems(); + } context.Initialize(stream); context.Start(); diff --git a/src/Servers/Kestrel/shared/TransportConnection.cs b/src/Servers/Kestrel/shared/TransportConnection.cs index 217001b7c545..61d1b6ab8a67 100644 --- a/src/Servers/Kestrel/shared/TransportConnection.cs +++ b/src/Servers/Kestrel/shared/TransportConnection.cs @@ -54,6 +54,11 @@ public override string ConnectionId } } + internal void ResetItems() + { + _items?.Clear(); + } + public override CancellationToken ConnectionClosed { get; set; } // DO NOT remove this override to ConnectionContext.Abort. Doing so would cause