diff --git a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs index 51b17f1331f4..6de99384cbc8 100644 --- a/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs +++ b/src/Servers/Kestrel/Transport.Quic/src/Internal/QuicStreamContext.cs @@ -187,7 +187,30 @@ private async Task DoReceive() input.Advance(bytesReceived); - var flushTask = input.FlushAsync(); + ValueTask flushTask; + + if (_stream.ReadsCompleted) + { + // If the data returned from ReadAsync is the final chunk on the stream then + // flush data and end pipe together with CompleteAsync. + // + // Getting data and complete together is important for HTTP/3 when parsing headers. + // It is important that it knows that there is no body after the headers. + var completeTask = input.CompleteAsync(ResolveCompleteReceiveException(error)); + if (completeTask.IsCompletedSuccessfully) + { + // Fast path. CompleteAsync completed immediately. + flushTask = ValueTask.FromResult(new FlushResult(isCanceled: false, isCompleted: true)); + } + else + { + flushTask = AwaitCompleteTaskAsync(completeTask); + } + } + else + { + flushTask = input.FlushAsync(); + } var paused = !flushTask.IsCompleted; @@ -240,12 +263,23 @@ private async Task DoReceive() finally { // If Shutdown() has already bee called, assume that was the reason ProcessReceives() exited. - Input.Complete(_shutdownReadReason ?? _shutdownReason ?? error); + Input.Complete(ResolveCompleteReceiveException(error)); FireStreamClosed(); await _waitForConnectionClosedTcs.Task; } + + async static ValueTask AwaitCompleteTaskAsync(ValueTask completeTask) + { + await completeTask; + return new FlushResult(isCanceled: false, isCompleted: true); + } + } + + private Exception? ResolveCompleteReceiveException(Exception? error) + { + return _shutdownReadReason ?? _shutdownReason ?? error; } private void FireStreamClosed() diff --git a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs index 6e8dfced9488..4fcdac16e124 100644 --- a/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs +++ b/src/Servers/Kestrel/Transport.Quic/test/QuicStreamContextTests.cs @@ -255,6 +255,38 @@ public async Task ClientToServerUnidirectionalStream_ClientAbort_ServerReceivesA await closedTcs.Task.DefaultTimeout(); } + [ConditionalFact] + [MsQuicSupported] + public async Task ClientToServerUnidirectionalStream_CompleteWrites_PipeProvidesDataAndCompleteTogether() + { + // Arrange + await using var connectionListener = await QuicTestHelpers.CreateConnectionListenerFactory(LoggerFactory); + + var options = QuicTestHelpers.CreateClientConnectionOptions(connectionListener.EndPoint); + using var quicConnection = new QuicConnection(QuicImplementationProviders.MsQuic, options); + await quicConnection.ConnectAsync().DefaultTimeout(); + + await using var serverConnection = await connectionListener.AcceptAndAddFeatureAsync().DefaultTimeout(); + + // Act + await using var clientStream = quicConnection.OpenUnidirectionalStream(); + await clientStream.WriteAsync(TestData).DefaultTimeout(); + + await using var serverStream = await serverConnection.AcceptAsync().DefaultTimeout(); + var readResult = await serverStream.Transport.Input.ReadAtLeastAsync(TestData.Length).DefaultTimeout(); + serverStream.Transport.Input.AdvanceTo(readResult.Buffer.End); + + var readResultTask = serverStream.Transport.Input.ReadAsync(); + + await clientStream.WriteAsync(TestData, endStream: true).DefaultTimeout(); + + // Assert + var completeReadResult = await readResultTask.DefaultTimeout(); + + Assert.Equal(TestData, completeReadResult.Buffer.ToArray()); + Assert.True(completeReadResult.IsCompleted); + } + [ConditionalFact] [MsQuicSupported] public async Task ServerToClientUnidirectionalStream_ServerWritesDataAndCompletes_GracefullyClosed()