Skip to content

HTTP/3: Http3Stream pooling (builds on top of QUIC stream pooling) #34222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public Http2StreamContext(
Http2PeerSettings serverPeerSettings,
Http2FrameWriter frameWriter,
InputFlowControl connectionInputFlowControl,
OutputFlowControl connectionOutputFlowControl) : base(connectionId, protocols, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint, transport: null!)
OutputFlowControl connectionOutputFlowControl) : base(connectionId, protocols, connectionContext: null!, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint)
{
StreamId = streamId;
StreamLifetimeHandler = streamLifetimeHandler;
Expand Down
55 changes: 36 additions & 19 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,26 +257,10 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> appl
Debug.Assert(streamDirectionFeature != null);
Debug.Assert(streamIdFeature != null);

var httpConnectionContext = new Http3StreamContext(
streamContext.ConnectionId,
protocols: default,
connectionContext: null!, // TODO connection context is null here. Should we set it to anything?
_context.ServiceContext,
streamContext.Features,
_context.MemoryPool,
streamContext.LocalEndPoint as IPEndPoint,
streamContext.RemoteEndPoint as IPEndPoint,
streamContext.Transport,
_streamLifetimeHandler,
streamContext,
_clientSettings,
_serverSettings);
httpConnectionContext.TimeoutControl = _context.TimeoutControl;

if (!streamDirectionFeature.CanWrite)
{
// Unidirectional stream
var stream = new Http3ControlStream<TContext>(application, httpConnectionContext);
var stream = new Http3ControlStream<TContext>(application, CreateHttpStreamContext(streamContext));
_streamLifetimeHandler.OnStreamCreated(stream);

ThreadPool.UnsafeQueueUserWorkItem(stream, preferLocal: false);
Expand All @@ -286,7 +270,19 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> appl
// Request stream
UpdateHighestStreamId(streamIdFeature.StreamId);

var stream = new Http3Stream<TContext>(application, httpConnectionContext);
// Check whether there is an existing HTTP/3 stream on the transport stream.
// A stream will only be cached if the transport stream itself is reused.
var stream = streamContext.Features.Get<ICachedHttp3StreamFeature<TContext>>()?.CachedStream;
if (stream == null)
{
stream = new Http3Stream<TContext>(application, CreateHttpStreamContext(streamContext));
streamContext.Features.Set<ICachedHttp3StreamFeature<TContext>>(new DefaultCachedHttp3StreamFeature<TContext>(stream));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feature collection doesn't make sense as a caching mechanism since the collection itself shouldn't preserve state between uses. You'd want the lower layer to provide an explicate caching feature.

Copy link
Member Author

@JamesNK JamesNK Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a suggestion about how that might work? Would it be a new feature with a dictionary that gets preserved between requests like:

public interface IStateFeature
{
   IDictionary<string, object> State { get; }
}

Or a new explicit dictionary on transport context? Note that BaseConnectionContext already has Features and Items properties - https://github.com/dotnet/aspnetcore/blob/main/src/Servers/Connections.Abstractions/src/BaseConnectionContext.cs

I created an issue a while back about using features to preserve the state between requests - #6895. There is a lot of crossover between that and this, just at a different layer.

I don't want to turn this into a big thing so another option is to have a hack in .NET 6 where the stream is added to the transport context's Items with a specific key and that specific key isn't cleared between calls by the QUIC transport. We could then improve it in .NET 7.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The state feature and the dictionary on the transport context accomplish the same end, I don't have a preference between them.

I don't want to turn this into a big thing so another option is to have a hack in .NET 6 where the stream is added to the transport context's Items with a specific key and that specific key isn't cleared between calls by the QUIC transport. We could then improve it in .NET 7.

That is hackier than I want to go. Given our http/3 goals for 6.0 emphasize functionality over perf, I'd rather not cache if we can't come up with a clear mechanic for it.

}
else
{
stream.InitializeWithExistingContext(streamContext.Transport);
}

_streamLifetimeHandler.OnStreamCreated(stream);

KestrelEventSource.Log.RequestQueuedStart(stream, AspNetCore.Http.HttpProtocol.Http3);
Expand Down Expand Up @@ -371,6 +367,27 @@ public async Task ProcessRequestsAsync<TContext>(IHttpApplication<TContext> appl
}
}

private Http3StreamContext CreateHttpStreamContext(ConnectionContext streamContext)
{
var httpConnectionContext = new Http3StreamContext(
streamContext.ConnectionId,
protocols: default,
connectionContext: null!, // TODO connection context is null here. Should we set it to anything?
_context.ServiceContext,
streamContext.Features,
_context.MemoryPool,
streamContext.LocalEndPoint as IPEndPoint,
streamContext.RemoteEndPoint as IPEndPoint,
_streamLifetimeHandler,
streamContext,
_clientSettings,
_serverSettings);
httpConnectionContext.TimeoutControl = _context.TimeoutControl;
httpConnectionContext.Transport = streamContext.Transport;

return httpConnectionContext;
}

private void UpdateConnectionState()
{
if (_isClosed != 0)
Expand Down Expand Up @@ -443,12 +460,12 @@ private async ValueTask<Http3ControlStream> CreateNewUnidirectionalStreamAsync<T
_context.MemoryPool,
streamContext.LocalEndPoint as IPEndPoint,
streamContext.RemoteEndPoint as IPEndPoint,
streamContext.Transport,
_streamLifetimeHandler,
streamContext,
_clientSettings,
_serverSettings);
httpConnectionContext.TimeoutControl = _context.TimeoutControl;
httpConnectionContext.Transport = streamContext.Transport;

return new Http3ControlStream<TContext>(application, httpConnectionContext);
}
Expand Down
65 changes: 40 additions & 25 deletions src/Servers/Kestrel/Core/src/Internal/Http3/Http3Stream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
private const PseudoHeaderFields _mandatoryRequestPseudoHeaderFields =
PseudoHeaderFields.Method | PseudoHeaderFields.Path | PseudoHeaderFields.Scheme;

private readonly Http3FrameWriter _frameWriter;
private readonly Http3OutputProducer _http3Output;
private Http3FrameWriter _frameWriter = default!;
private Http3OutputProducer _http3Output = default!;
private Http3StreamContext _context = default!;
private IProtocolErrorCodeFeature _errorCodeFeature = default!;
private IStreamIdFeature _streamIdFeature = default!;
private int _isClosed;
private readonly Http3StreamContext _context;
private readonly IProtocolErrorCodeFeature _errorCodeFeature;
private readonly IStreamIdFeature _streamIdFeature;
private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
protected RequestHeaderParsingState _requestHeaderParsingState;
private PseudoHeaderFields _parsedPseudoHeaderFields;
Expand All @@ -57,11 +57,26 @@ internal abstract partial class Http3Stream : HttpProtocol, IHttp3Stream, IHttpH
private bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted;
internal bool RstStreamReceived => (_completionState & StreamCompletionFlags.RstStreamReceived) == StreamCompletionFlags.RstStreamReceived;

public Pipe RequestBodyPipe { get; }
public Pipe RequestBodyPipe { get; private set; } = default!;

public Http3Stream(Http3StreamContext context)
public long? InputRemaining { get; internal set; }

public QPackDecoder QPackDecoder { get; private set; } = default!;

public PipeReader Input => _context.Transport.Input;

public ISystemClock SystemClock => _context.ServiceContext.SystemClock;
public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits;
public long StreamId => _streamIdFeature.StreamId;

public long HeaderTimeoutTicks { get; set; }
public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received

public bool IsRequestStream => true;

public void Initialize(Http3StreamContext context)
{
Initialize(context);
base.Initialize(context);

InputRemaining = null;

Expand All @@ -70,6 +85,15 @@ public Http3Stream(Http3StreamContext context)
_errorCodeFeature = _context.ConnectionFeatures.Get<IProtocolErrorCodeFeature>()!;
_streamIdFeature = _context.ConnectionFeatures.Get<IStreamIdFeature>()!;

_appCompleted = null;
_isClosed = 0;
_requestHeaderParsingState = default;
_parsedPseudoHeaderFields = default;
_totalParsedHeaderSize = 0;
_isMethodConnect = false;
_completionState = default;
HeaderTimeoutTicks = 0;

_frameWriter = new Http3FrameWriter(
context.Transport.Output,
context.StreamContext,
Expand All @@ -82,9 +106,6 @@ public Http3Stream(Http3StreamContext context)
context.ClientPeerSettings,
this);

// ResponseHeaders aren't set, kind of ugly that we need to reset.
Reset();

_http3Output = new Http3OutputProducer(
_frameWriter,
context.MemoryPool,
Expand All @@ -95,20 +116,11 @@ public Http3Stream(Http3StreamContext context)
QPackDecoder = new QPackDecoder(_context.ServiceContext.ServerOptions.Limits.Http3.MaxRequestHeaderFieldSize);
}

public long? InputRemaining { get; internal set; }

public QPackDecoder QPackDecoder { get; }

public PipeReader Input => _context.Transport.Input;

public ISystemClock SystemClock => _context.ServiceContext.SystemClock;
public KestrelServerLimits Limits => _context.ServiceContext.ServerOptions.Limits;
public long StreamId => _streamIdFeature.StreamId;

public long HeaderTimeoutTicks { get; set; }
public bool ReceivedHeader => _appCompleted != null; // TCS is assigned once headers are received

public bool IsRequestStream => true;
public void InitializeWithExistingContext(IDuplexPipe transport)
{
_context.Transport = transport;
Initialize(_context);
}

public void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode)
{
Expand Down Expand Up @@ -624,6 +636,9 @@ private Task ProcessDataFrameAsync(in ReadOnlySequence<byte> payload)

protected override void OnReset()
{
_keepAlive = true;
_connectionAborted = false;

// Reset Http3 Features
_currentIHttpMinRequestBodyDataRateFeature = this;
_currentIHttpResponseTrailersFeature = this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ class Http3Stream<TContext> : Http3Stream, IHostContextContainer<TContext> where
{
private readonly IHttpApplication<TContext> _application;

public Http3Stream(IHttpApplication<TContext> application, Http3StreamContext context) : base(context)
public Http3Stream(IHttpApplication<TContext> application, Http3StreamContext context)
{
Initialize(context);
_application = application;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3
{
internal interface ICachedHttp3StreamFeature<TContext> where TContext : notnull
{
Http3Stream<TContext> CachedStream { get; }
}

internal class DefaultCachedHttp3StreamFeature<TContext> : ICachedHttp3StreamFeature<TContext> where TContext : notnull
{
public Http3Stream<TContext> CachedStream { get; }

public DefaultCachedHttp3StreamFeature(Http3Stream<TContext> cachedStream)
{
CachedStream = cachedStream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ public Http3StreamContext(
MemoryPool<byte> memoryPool,
IPEndPoint? localEndPoint,
IPEndPoint? remoteEndPoint,
IDuplexPipe transport,
IHttp3StreamLifetimeHandler streamLifetimeHandler,
ConnectionContext streamContext,
Http3PeerSettings clientPeerSettings,
Http3PeerSettings serverPeerSettings) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint, transport)
Http3PeerSettings serverPeerSettings) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint)
{
StreamLifetimeHandler = streamLifetimeHandler;
StreamContext = streamContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ public HttpConnectionContext(
IFeatureCollection connectionFeatures,
MemoryPool<byte> memoryPool,
IPEndPoint? localEndPoint,
IPEndPoint? remoteEndPoint,
IDuplexPipe transport) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint)
IPEndPoint? remoteEndPoint) : base(connectionId, protocols, connectionContext, serviceContext, connectionFeatures, memoryPool, localEndPoint, remoteEndPoint)
{
Transport = transport;
}

public IDuplexPipe Transport { get; }
public IDuplexPipe Transport { get; set; } = default!;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public Task OnConnectionAsync(ConnectionContext connectionContext)
connectionContext.Features,
memoryPoolFeature?.MemoryPool ?? System.Buffers.MemoryPool<byte>.Shared,
connectionContext.LocalEndPoint as IPEndPoint,
connectionContext.RemoteEndPoint as IPEndPoint,
connectionContext.Transport);
connectionContext.RemoteEndPoint as IPEndPoint);
httpConnectionContext.Transport = connectionContext.Transport;

var connection = new HttpConnection(httpConnectionContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ public void Http3StreamFeatureCollectionDoesIncludeIHttpMinRequestBodyDataRateFe

private class TestHttp3Stream : Http3Stream
{
public TestHttp3Stream(Http3StreamContext context) : base(context)
{
}

public override void Execute()
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Quic.Internal
{
internal sealed class CompletionPipeReader : PipeReader
{
private readonly PipeReader _inner;

public bool IsComplete { get; private set; }
public Exception? CompleteException { get; private set; }

public CompletionPipeReader(PipeReader inner)
{
_inner = inner;
}

public override void AdvanceTo(SequencePosition consumed)
{
_inner.AdvanceTo(consumed);
}

public override void AdvanceTo(SequencePosition consumed, SequencePosition examined)
{
_inner.AdvanceTo(consumed, examined);
}

public override ValueTask CompleteAsync(Exception? exception = null)
{
IsComplete = true;
CompleteException = exception;
return _inner.CompleteAsync(exception);
}

public override void Complete(Exception? exception = null)
{
IsComplete = true;
CompleteException = exception;
_inner.Complete(exception);
}

public override void CancelPendingRead()
{
_inner.CancelPendingRead();
}

public override ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
return _inner.ReadAsync(cancellationToken);
}

public override bool TryRead(out ReadResult result)
{
return _inner.TryRead(out result);
}
}
}
Loading