Skip to content

Commit 07cbf7f

Browse files
pakrymdavidfowl
authored andcommitted
Use pipelines for SocketOutput
- Changed socket output to be based on pipelines - Changed connection filter glue to be based on pipelines - Codegen that used `MemoryPoolIterator` for output now uses `WritableBuffer` - Made `UvWriteReq` async/await friendly with `LibuvAwaitable<T>` - Deleted MemoryPool and friends
1 parent 2ef3804 commit 07cbf7f

35 files changed

+721
-2811
lines changed

KestrelHttpServer.sln

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Microsoft Visual Studio Solution File, Format Version 12.00
22
# Visual Studio 15
3-
VisualStudioVersion = 15.0.26228.0
3+
VisualStudioVersion = 15.0.26228.4
44
MinimumVisualStudioVersion = 10.0.40219.1
55
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{7972A5D6-3385-4127-9277-428506DD44FF}"
66
ProjectSection(SolutionItems) = preProject

build/dependencies.props

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
<Project>
22
<PropertyGroup>
33
<AspNetCoreVersion>1.2.0-*</AspNetCoreVersion>
4-
<CoreFxLabsPipelinesVersion>0.1.0-*</CoreFxLabsPipelinesVersion>
5-
<CoreFxLabsVersion>0.1.0-*</CoreFxLabsVersion>
4+
<CoreFxLabsPipelinesVersion>0.1.0-e170313-1</CoreFxLabsPipelinesVersion>
5+
<CoreFxLabsVersion>0.1.0-e170313-1</CoreFxLabsVersion>
66
<CoreFxVersion>4.3.0</CoreFxVersion>
77
<LibUvVersion>1.9.1</LibUvVersion>
88
<JsonNetVersion>9.0.1</JsonNetVersion>

src/Microsoft.AspNetCore.Server.Kestrel/Adapter/Internal/AdaptedPipeline.cs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
using System.IO.Pipelines;
77
using System.Threading.Tasks;
88
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
9-
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
10-
using MemoryPool = Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure.MemoryPool;
119

1210
namespace Microsoft.AspNetCore.Server.Kestrel.Adapter.Internal
1311
{
@@ -16,30 +14,50 @@ public class AdaptedPipeline : IDisposable
1614
private const int MinAllocBufferSize = 2048;
1715

1816
private readonly Stream _filteredStream;
17+
private readonly StreamSocketOutput _output;
1918

2019
public AdaptedPipeline(
21-
string connectionId,
2220
Stream filteredStream,
23-
IPipe pipe,
24-
MemoryPool memory,
25-
IKestrelTrace logger)
21+
IPipe inputPipe,
22+
IPipe outputPipe)
2623
{
27-
Input = pipe;
28-
Output = new StreamSocketOutput(connectionId, filteredStream, memory, logger);
24+
Input = inputPipe;
25+
_output = new StreamSocketOutput(filteredStream, outputPipe);
2926

3027
_filteredStream = filteredStream;
3128
}
3229

3330
public IPipe Input { get; }
3431

35-
public ISocketOutput Output { get; }
32+
public ISocketOutput Output => _output;
3633

3734
public void Dispose()
3835
{
3936
Input.Writer.Complete();
4037
}
4138

42-
public async Task ReadInputAsync()
39+
public async Task StartAsync()
40+
{
41+
var inputTask = ReadInputAsync();
42+
var outputTask = _output.WriteOutputAsync();
43+
44+
var result = await Task.WhenAny(inputTask, outputTask);
45+
46+
if (result == inputTask)
47+
{
48+
// Close output
49+
_output.Dispose();
50+
}
51+
else
52+
{
53+
// Close input
54+
Input.Writer.Complete();
55+
}
56+
57+
await Task.WhenAll(inputTask, outputTask);
58+
}
59+
60+
private async Task ReadInputAsync()
4361
{
4462
int bytesRead;
4563

src/Microsoft.AspNetCore.Server.Kestrel/Adapter/Internal/StreamSocketOutput.cs

Lines changed: 76 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,124 +3,130 @@
33

44
using System;
55
using System.IO;
6-
using System.Text;
6+
using System.IO.Pipelines;
77
using System.Threading;
88
using System.Threading.Tasks;
99
using Microsoft.AspNetCore.Server.Kestrel.Internal.Http;
10-
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
1110

1211
namespace Microsoft.AspNetCore.Server.Kestrel.Adapter.Internal
1312
{
1413
public class StreamSocketOutput : ISocketOutput
1514
{
16-
private static readonly byte[] _endChunkBytes = Encoding.ASCII.GetBytes("\r\n");
17-
private static readonly byte[] _nullBuffer = new byte[0];
15+
private static readonly ArraySegment<byte> _nullBuffer = new ArraySegment<byte>(new byte[0]);
1816

19-
private readonly string _connectionId;
2017
private readonly Stream _outputStream;
21-
private readonly MemoryPool _memory;
22-
private readonly IKestrelTrace _logger;
23-
private MemoryPoolBlock _producingBlock;
18+
private readonly IPipe _pipe;
19+
private object _sync = new object();
20+
private bool _completed;
2421

25-
private bool _canWrite = true;
26-
27-
public StreamSocketOutput(string connectionId, Stream outputStream, MemoryPool memory, IKestrelTrace logger)
22+
public StreamSocketOutput(Stream outputStream, IPipe pipe)
2823
{
29-
_connectionId = connectionId;
3024
_outputStream = outputStream;
31-
_memory = memory;
32-
_logger = logger;
25+
_pipe = pipe;
3326
}
3427

3528
public void Write(ArraySegment<byte> buffer, bool chunk)
3629
{
37-
if (chunk && buffer.Array != null)
38-
{
39-
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
40-
_outputStream.Write(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count);
41-
}
30+
WriteAsync(buffer, chunk, default(CancellationToken)).GetAwaiter().GetResult();
31+
}
4232

43-
_outputStream.Write(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count);
33+
public async Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
34+
{
35+
var flushAwaiter = default(WritableBufferAwaitable);
4436

45-
if (chunk && buffer.Array != null)
37+
lock (_sync)
4638
{
47-
_outputStream.Write(_endChunkBytes, 0, _endChunkBytes.Length);
39+
if (_completed)
40+
{
41+
return;
42+
}
43+
44+
var writableBuffer = _pipe.Writer.Alloc();
45+
46+
if (buffer.Count > 0)
47+
{
48+
if (chunk)
49+
{
50+
ChunkWriter.WriteBeginChunkBytes(ref writableBuffer, buffer.Count);
51+
writableBuffer.Write(buffer);
52+
ChunkWriter.WriteEndChunkBytes(ref writableBuffer);
53+
}
54+
else
55+
{
56+
writableBuffer.Write(buffer);
57+
}
58+
}
59+
60+
flushAwaiter = writableBuffer.FlushAsync();
4861
}
62+
63+
await flushAwaiter;
4964
}
5065

51-
public Task WriteAsync(ArraySegment<byte> buffer, bool chunk, CancellationToken cancellationToken)
66+
public void Dispose()
5267
{
53-
if (chunk && buffer.Array != null)
68+
lock (_sync)
5469
{
55-
return WriteAsyncChunked(buffer, cancellationToken);
70+
_completed = true;
5671
}
5772

58-
return _outputStream.WriteAsync(buffer.Array ?? _nullBuffer, buffer.Offset, buffer.Count, cancellationToken);
73+
_pipe.Writer.Complete();
5974
}
6075

61-
private async Task WriteAsyncChunked(ArraySegment<byte> buffer, CancellationToken cancellationToken)
76+
public void Flush()
6277
{
63-
var beginChunkBytes = ChunkWriter.BeginChunkBytes(buffer.Count);
78+
FlushAsync(CancellationToken.None).GetAwaiter().GetResult();
79+
}
6480

65-
await _outputStream.WriteAsync(beginChunkBytes.Array, beginChunkBytes.Offset, beginChunkBytes.Count, cancellationToken);
66-
await _outputStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
67-
await _outputStream.WriteAsync(_endChunkBytes, 0, _endChunkBytes.Length, cancellationToken);
81+
public Task FlushAsync(CancellationToken cancellationToken)
82+
{
83+
return WriteAsync(default(ArraySegment<byte>), chunk: false, cancellationToken: cancellationToken);
6884
}
6985

70-
public MemoryPoolIterator ProducingStart()
86+
public WritableBuffer Alloc()
7187
{
72-
_producingBlock = _memory.Lease();
73-
return new MemoryPoolIterator(_producingBlock);
88+
return _pipe.Writer.Alloc();
7489
}
7590

76-
public void ProducingComplete(MemoryPoolIterator end)
91+
public async Task WriteOutputAsync()
7792
{
78-
var block = _producingBlock;
79-
while (block != end.Block)
93+
try
8094
{
81-
// If we don't handle an exception from _outputStream.Write() here, we'll leak memory blocks.
82-
if (_canWrite)
95+
while (true)
8396
{
97+
var readResult = await _pipe.Reader.ReadAsync();
98+
var buffer = readResult.Buffer;
99+
84100
try
85101
{
86-
_outputStream.Write(block.Data.Array, block.Data.Offset, block.Data.Count);
102+
if (buffer.IsEmpty && readResult.IsCompleted)
103+
{
104+
break;
105+
}
106+
107+
if (buffer.IsEmpty)
108+
{
109+
await _outputStream.FlushAsync();
110+
}
111+
112+
foreach (var memory in buffer)
113+
{
114+
var array = memory.GetArray();
115+
await _outputStream.WriteAsync(array.Array, array.Offset, array.Count);
116+
}
87117
}
88-
catch (Exception ex)
118+
finally
89119
{
90-
_canWrite = false;
91-
_logger.ConnectionError(_connectionId, ex);
120+
_pipe.Reader.Advance(readResult.Buffer.End);
92121
}
93-
}
94122

95-
var returnBlock = block;
96-
block = block.Next;
97-
returnBlock.Pool.Return(returnBlock);
123+
// REVIEW: Should we flush here?
124+
}
98125
}
99-
100-
if (_canWrite)
126+
finally
101127
{
102-
try
103-
{
104-
_outputStream.Write(end.Block.Array, end.Block.Data.Offset, end.Index - end.Block.Data.Offset);
105-
}
106-
catch (Exception ex)
107-
{
108-
_canWrite = false;
109-
_logger.ConnectionError(_connectionId, ex);
110-
}
128+
_pipe.Reader.Complete();
111129
}
112-
113-
end.Block.Pool.Return(end.Block);
114-
}
115-
116-
public void Flush()
117-
{
118-
_outputStream.Flush();
119-
}
120-
121-
public Task FlushAsync(CancellationToken cancellationToken)
122-
{
123-
return _outputStream.FlushAsync(cancellationToken);
124130
}
125131
}
126132
}

src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/ChunkWriter.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using System.IO.Pipelines;
56
using System.Text;
67
using Microsoft.AspNetCore.Server.Kestrel.Internal.Infrastructure;
78

@@ -47,16 +48,16 @@ public static ArraySegment<byte> BeginChunkBytes(int dataCount)
4748
return new ArraySegment<byte>(bytes, offset, 10 - offset);
4849
}
4950

50-
public static int WriteBeginChunkBytes(ref MemoryPoolIterator start, int dataCount)
51+
public static int WriteBeginChunkBytes(ref WritableBuffer start, int dataCount)
5152
{
5253
var chunkSegment = BeginChunkBytes(dataCount);
53-
start.CopyFrom(chunkSegment);
54+
start.Write(chunkSegment);
5455
return chunkSegment.Count;
5556
}
5657

57-
public static void WriteEndChunkBytes(ref MemoryPoolIterator start)
58+
public static void WriteEndChunkBytes(ref WritableBuffer start)
5859
{
59-
start.CopyFrom(_endChunkBytes);
60+
start.Write(_endChunkBytes);
6061
}
6162
}
6263
}

src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Connection.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context
5858

5959
ConnectionId = GenerateConnectionId(Interlocked.Increment(ref _lastConnectionId));
6060

61-
Input = Thread.PipelineFactory.Create(ListenerContext.LibuvPipeOptions);
62-
Output = new SocketOutput(Thread, _socket, this, ConnectionId, Log, ThreadPool);
61+
Input = Thread.PipelineFactory.Create(ListenerContext.LibuvInputPipeOptions);
62+
var outputPipe = Thread.PipelineFactory.Create(ListenerContext.LibuvOutputPipeOptions);
63+
Output = new SocketOutput(outputPipe, Thread, _socket, this, ConnectionId, Log);
6364

6465
var tcpHandle = _socket as UvTcpHandle;
6566
if (tcpHandle != null)
@@ -197,19 +198,17 @@ private async Task ApplyConnectionAdaptersAsync()
197198
{
198199
_filteredStream = adapterContext.ConnectionStream;
199200
_adaptedPipeline = new AdaptedPipeline(
200-
ConnectionId,
201201
adapterContext.ConnectionStream,
202202
Thread.PipelineFactory.Create(ListenerContext.AdaptedPipeOptions),
203-
Thread.Memory,
204-
Log);
203+
Thread.PipelineFactory.Create(ListenerContext.AdaptedPipeOptions));
205204

206205
_frame.Input = _adaptedPipeline.Input;
207206
_frame.Output = _adaptedPipeline.Output;
208207

209208
// Don't attempt to read input if connection has already closed.
210209
// This can happen if a client opens a connection and immediately closes it.
211210
_readInputTask = _socketClosedTcs.Task.Status == TaskStatus.WaitingForActivation
212-
? _adaptedPipeline.ReadInputAsync()
211+
? _adaptedPipeline.StartAsync()
213212
: TaskCache.CompletedTask;
214213
}
215214

src/Microsoft.AspNetCore.Server.Kestrel/Internal/Http/Frame.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ private void CreateResponseHeader(
891891
var hasTransferEncoding = responseHeaders.HasTransferEncoding;
892892
var transferCoding = FrameHeaders.GetFinalTransferCoding(responseHeaders.HeaderTransferEncoding);
893893

894-
var end = Output.ProducingStart();
894+
var end = Output.Alloc();
895895

896896
if (_keepAlive && hasConnection)
897897
{
@@ -974,12 +974,12 @@ private void CreateResponseHeader(
974974
responseHeaders.SetRawDate(dateHeaderValues.String, dateHeaderValues.Bytes);
975975
}
976976

977-
end.CopyFrom(_bytesHttpVersion11);
978-
end.CopyFrom(statusBytes);
977+
end.Write(_bytesHttpVersion11);
978+
end.Write(statusBytes);
979979
responseHeaders.CopyTo(ref end);
980-
end.CopyFrom(_bytesEndHeaders, 0, _bytesEndHeaders.Length);
980+
end.Write(_bytesEndHeaders);
981981

982-
Output.ProducingComplete(end);
982+
end.Commit();
983983
}
984984

985985
public void ParseRequest(ReadableBuffer buffer, out ReadCursor consumed, out ReadCursor examined)

0 commit comments

Comments
 (0)