Skip to content

Commit 6de357e

Browse files
authored
Revert the output pipe in the DuplexStreamPipeAdapter (#11601)
* Revert back to copying data to pipes * Replace the output pipe only * Don't complete the connection pipe in Http2FrameWriter - This leads to trunated data in some cases. Instead just yield the middleware so we can be sure no more user code is running (Http1OutputProducer does this as well). There are still cases where a misbeaving application that doesn't properly await writes gets cut off but that will be fixed in the SteamPipeWriter itself. - Updated tests
1 parent 585b575 commit 6de357e

File tree

5 files changed

+114
-14
lines changed

5 files changed

+114
-14
lines changed

src/Servers/Kestrel/Core/src/Internal/Http2/Http2FrameWriter.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ public void Complete()
8989

9090
_completed = true;
9191
_connectionOutputFlowControl.Abort();
92-
_outputWriter.Complete();
9392
}
9493
}
9594

src/Servers/Kestrel/Core/src/Middleware/HttpsConnectionMiddleware.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context)
103103

104104
if (_options.ClientCertificateMode == ClientCertificateMode.NoCertificate)
105105
{
106-
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions);
106+
sslDuplexPipe = new SslDuplexPipe(context.Transport, inputPipeOptions, outputPipeOptions)
107+
{
108+
Log = _logger
109+
};
107110
certificateRequired = false;
108111
}
109112
else
@@ -140,7 +143,10 @@ private async Task InnerOnConnectionAsync(ConnectionContext context)
140143
}
141144

142145
return true;
143-
}));
146+
}))
147+
{
148+
Log = _logger
149+
};
144150

145151
certificateRequired = true;
146152
}

src/Servers/Kestrel/Core/src/Middleware/Internal/DuplexPipeStreamAdapter.cs

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.IO;
66
using System.IO.Pipelines;
77
using System.Threading.Tasks;
8+
using Microsoft.Extensions.Logging;
89

910
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
1011
{
@@ -14,36 +15,114 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
1415
/// <typeparam name="TStream"></typeparam>
1516
internal class DuplexPipeStreamAdapter<TStream> : DuplexPipeStream, IDuplexPipe where TStream : Stream
1617
{
18+
private readonly Pipe _output;
19+
private Task _outputTask;
20+
1721
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, Func<Stream, TStream> createStream) :
1822
this(duplexPipe, new StreamPipeReaderOptions(leaveOpen: true), new StreamPipeWriterOptions(leaveOpen: true), createStream)
1923
{
2024
}
2125

22-
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) : base(duplexPipe.Input, duplexPipe.Output)
26+
public DuplexPipeStreamAdapter(IDuplexPipe duplexPipe, StreamPipeReaderOptions readerOptions, StreamPipeWriterOptions writerOptions, Func<Stream, TStream> createStream) :
27+
base(duplexPipe.Input, duplexPipe.Output)
2328
{
2429
Stream = createStream(this);
30+
31+
var outputOptions = new PipeOptions(pool: writerOptions.Pool,
32+
readerScheduler: PipeScheduler.Inline,
33+
writerScheduler: PipeScheduler.Inline,
34+
pauseWriterThreshold: 1,
35+
resumeWriterThreshold: 1,
36+
minimumSegmentSize: writerOptions.MinimumBufferSize,
37+
useSynchronizationContext: false);
38+
2539
Input = PipeReader.Create(Stream, readerOptions);
26-
Output = PipeWriter.Create(Stream, writerOptions);
40+
41+
// We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
42+
// about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
43+
// those patterns are fixed.
44+
_output = new Pipe(outputOptions);
2745
}
2846

47+
public ILogger Log { get; set; }
48+
2949
public TStream Stream { get; }
3050

3151
public PipeReader Input { get; }
3252

33-
public PipeWriter Output { get; }
53+
public PipeWriter Output
54+
{
55+
get
56+
{
57+
if (_outputTask == null)
58+
{
59+
_outputTask = WriteOutputAsync();
60+
}
3461

35-
protected override void Dispose(bool disposing)
62+
return _output.Writer;
63+
}
64+
}
65+
66+
public override async ValueTask DisposeAsync()
3667
{
3768
Input.Complete();
38-
Output.Complete();
39-
base.Dispose(disposing);
69+
_output.Writer.Complete();
70+
71+
if (_outputTask != null)
72+
{
73+
// Wait for the output task to complete, this ensures that we've copied
74+
// the application data to the underlying stream
75+
await _outputTask;
76+
}
4077
}
4178

42-
public override ValueTask DisposeAsync()
79+
private async Task WriteOutputAsync()
4380
{
44-
Input.Complete();
45-
Output.Complete();
46-
return base.DisposeAsync();
81+
try
82+
{
83+
while (true)
84+
{
85+
var result = await _output.Reader.ReadAsync();
86+
var buffer = result.Buffer;
87+
88+
try
89+
{
90+
if (buffer.IsEmpty)
91+
{
92+
if (result.IsCompleted)
93+
{
94+
break;
95+
}
96+
97+
await Stream.FlushAsync();
98+
}
99+
else if (buffer.IsSingleSegment)
100+
{
101+
await Stream.WriteAsync(buffer.First);
102+
}
103+
else
104+
{
105+
foreach (var memory in buffer)
106+
{
107+
await Stream.WriteAsync(memory);
108+
}
109+
}
110+
}
111+
finally
112+
{
113+
_output.Reader.AdvanceTo(buffer.End);
114+
}
115+
}
116+
}
117+
catch (Exception ex)
118+
{
119+
Log?.LogCritical(0, ex, $"{GetType().Name}.{nameof(WriteOutputAsync)}");
120+
}
121+
finally
122+
{
123+
_output.Reader.Complete();
124+
}
47125
}
48126
}
49127
}
128+

src/Servers/Kestrel/Core/src/Middleware/LoggingConnectionMiddleware.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private class LoggingDuplexPipe : DuplexPipeStreamAdapter<LoggingStream>
4444
public LoggingDuplexPipe(IDuplexPipe transport, ILogger logger) :
4545
base(transport, stream => new LoggingStream(stream, logger))
4646
{
47+
Log = logger;
4748
}
4849
}
4950
}

src/Servers/Kestrel/test/InMemory.FunctionalTests/Http2/Http2TestBase.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,22 @@ protected async Task InitializeConnectionAsync(RequestDelegate application, int
467467
CreateConnection();
468468
}
469469

470-
_connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));
470+
var connectionTask = _connection.ProcessRequestsAsync(new DummyApplication(application));
471+
472+
async Task CompletePipeOnTaskCompletion()
473+
{
474+
try
475+
{
476+
await connectionTask;
477+
}
478+
finally
479+
{
480+
_pair.Transport.Input.Complete();
481+
_pair.Transport.Output.Complete();
482+
}
483+
}
484+
485+
_connectionTask = CompletePipeOnTaskCompletion();
471486

472487
await SendPreambleAsync().ConfigureAwait(false);
473488
await SendSettingsAsync();

0 commit comments

Comments
 (0)