From f7bdc5a4e9ed9f2b2162301f9e3ff1ac86547ec0 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 14 Nov 2015 22:37:28 +0000 Subject: [PATCH 1/6] Skip non-consumed rather than reading --- .../Http/Frame.cs | 2 +- .../Http/MessageBody.cs | 115 ++++++++++++++++++ .../Http/SocketInputExtensions.cs | 22 ++++ .../Infrastructure/MemoryPoolIterator2.cs | 32 +++++ 4 files changed, 170 insertions(+), 1 deletion(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index c08d0eb68..dac00934d 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -232,7 +232,7 @@ public async Task RequestProcessingAsync() await ProduceEnd(); - while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0) + while (await MessageBody.SkipAsync() != 0) { // Finish reading the request body in case the app did not. } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index e8e438512..393803e75 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -38,8 +38,26 @@ protected MessageBody(FrameContext context) return result; } + public Task SkipAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + Task result = null; + var send100Continue = 0; + result = SkipImplementation(cancellationToken); + if (!result.IsCompleted) + { + send100Continue = Interlocked.Exchange(ref _send100Continue, 0); + } + if (send100Continue == 1) + { + _context.FrameControl.ProduceContinue(); + } + return result; + } + public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); + public abstract Task SkipImplementation(CancellationToken cancellationToken); + public static MessageBody For( string httpVersion, IDictionary headers, @@ -110,6 +128,10 @@ public override Task ReadAsyncImplementation(ArraySegment buffer, Can { return _context.SocketInput.ReadAsync(buffer); } + public override Task SkipImplementation(CancellationToken cancellationToken) + { + return _context.SocketInput.SkipAsync(4096); + } } class ForContentLength : MessageBody @@ -146,6 +168,27 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe return actual; } + + public override async Task SkipImplementation(CancellationToken cancellationToken) + { + var input = _context.SocketInput; + + var limit = Math.Min(4096, _inputLength); + if (limit == 0) + { + return 0; + } + + var actual = await _context.SocketInput.SkipAsync(limit); + _inputLength -= actual; + + if (actual == 0) + { + throw new InvalidDataException("Unexpected end of request content"); + } + + return actual; + } } @@ -236,6 +279,78 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe return 0; } + public override async Task SkipImplementation(CancellationToken cancellationToken) + { + var input = _context.SocketInput; + + while (_mode != Mode.Complete) + { + while (_mode == Mode.ChunkPrefix) + { + var chunkSize = 0; + if (!TakeChunkedLine(input, ref chunkSize)) + { + await input; + } + else if (chunkSize == 0) + { + _mode = Mode.Complete; + } + else + { + _mode = Mode.ChunkData; + } + _inputLength = chunkSize; + } + while (_mode == Mode.ChunkData) + { + var limit = Math.Min(4096, _inputLength); + if (limit != 0) + { + await input; + } + + var begin = input.ConsumingStart(); + int actual; + var end = begin.Skip(limit, out actual); + _inputLength -= actual; + input.ConsumingComplete(end, end); + + if (_inputLength == 0) + { + _mode = Mode.ChunkSuffix; + } + if (actual != 0) + { + return actual; + } + } + while (_mode == Mode.ChunkSuffix) + { + var scan = input.ConsumingStart(); + var consumed = scan; + var ch1 = scan.Take(); + var ch2 = scan.Take(); + if (ch1 == -1 || ch2 == -1) + { + input.ConsumingComplete(consumed, scan); + await input; + } + else if (ch1 == '\r' && ch2 == '\n') + { + input.ConsumingComplete(scan, scan); + _mode = Mode.ChunkPrefix; + } + else + { + throw new NotImplementedException("INVALID REQUEST FORMAT"); + } + } + } + + return 0; + } + private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut) { var scan = baton.ConsumingStart(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs index 9c5d69070..70a491565 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs @@ -29,5 +29,27 @@ public static async Task ReadAsync(this SocketInput input, ArraySegment SkipAsync(this SocketInput input, int limit) + { + while (true) + { + await input; + + var begin = input.ConsumingStart(); + int actual; + var end = begin.Skip(limit, out actual); + input.ConsumingComplete(end, end); + + if (actual != 0) + { + return actual; + } + if (input.RemoteIntakeFin) + { + return 0; + } + } + } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs index 7de5c7fb1..c14534a17 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs @@ -619,5 +619,37 @@ public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int a } } } + public MemoryPoolIterator2 Skip(int limit, out int actual) + { + if (IsDefault) + { + actual = 0; + return this; + } + + var block = _block; + var index = _index; + var remaining = limit; + while (true) + { + var following = block.End - index; + if (remaining <= following) + { + actual = limit; + return new MemoryPoolIterator2(block, index + remaining); + } + else if (block.Next == null) + { + actual = limit - remaining + following; + return new MemoryPoolIterator2(block, index + following); + } + else + { + remaining -= following; + block = block.Next; + index = block.Start; + } + } + } } } From f48e6ba51af0e8a78ed9042b80e81064baa9a347 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sat, 14 Nov 2015 22:59:37 +0000 Subject: [PATCH 2/6] Add Async in method name --- .../Http/MessageBody.cs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 393803e75..545325641 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -42,7 +42,7 @@ protected MessageBody(FrameContext context) { Task result = null; var send100Continue = 0; - result = SkipImplementation(cancellationToken); + result = SkipAsyncImplementation(cancellationToken); if (!result.IsCompleted) { send100Continue = Interlocked.Exchange(ref _send100Continue, 0); @@ -56,7 +56,7 @@ protected MessageBody(FrameContext context) public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); - public abstract Task SkipImplementation(CancellationToken cancellationToken); + public abstract Task SkipAsyncImplementation(CancellationToken cancellationToken); public static MessageBody For( string httpVersion, @@ -128,7 +128,7 @@ public override Task ReadAsyncImplementation(ArraySegment buffer, Can { return _context.SocketInput.ReadAsync(buffer); } - public override Task SkipImplementation(CancellationToken cancellationToken) + public override Task SkipAsyncImplementation(CancellationToken cancellationToken) { return _context.SocketInput.SkipAsync(4096); } @@ -169,7 +169,7 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe return actual; } - public override async Task SkipImplementation(CancellationToken cancellationToken) + public override async Task SkipAsyncImplementation(CancellationToken cancellationToken) { var input = _context.SocketInput; @@ -279,7 +279,7 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe return 0; } - public override async Task SkipImplementation(CancellationToken cancellationToken) + public override async Task SkipAsyncImplementation(CancellationToken cancellationToken) { var input = _context.SocketInput; From f089abd33727731434c4a2dab44fc5b4bc36391b Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 15 Nov 2015 21:33:31 +0000 Subject: [PATCH 3/6] Consume in single call --- .../Http/Frame.cs | 6 ++--- .../Http/MessageBody.cs | 26 ++++++++++--------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs index dac00934d..e8310aab0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs @@ -232,10 +232,8 @@ public async Task RequestProcessingAsync() await ProduceEnd(); - while (await MessageBody.SkipAsync() != 0) - { - // Finish reading the request body in case the app did not. - } + // Finish reading the request body in case the app did not. + await MessageBody.Consume(); } terminated = !_keepAlive; diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 545325641..1604d0145 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -38,20 +38,22 @@ protected MessageBody(FrameContext context) return result; } - public Task SkipAsync(CancellationToken cancellationToken = default(CancellationToken)) + public async Task Consume(CancellationToken cancellationToken = default(CancellationToken)) { - Task result = null; - var send100Continue = 0; - result = SkipAsyncImplementation(cancellationToken); - if (!result.IsCompleted) - { - send100Continue = Interlocked.Exchange(ref _send100Continue, 0); - } - if (send100Continue == 1) + Task result; + do { - _context.FrameControl.ProduceContinue(); - } - return result; + var send100Continue = 0; + result = SkipAsyncImplementation(cancellationToken); + if (!result.IsCompleted) + { + send100Continue = Interlocked.Exchange(ref _send100Continue, 0); + } + if (send100Continue == 1) + { + _context.FrameControl.ProduceContinue(); + } + } while (await result != 0); } public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); From 1589b54018b5a084c3a2ec067bbef49f1d3133fd Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 15 Nov 2015 21:56:20 +0000 Subject: [PATCH 4/6] Early bail, completed tasks+Interlocked --- .../Http/MessageBody.cs | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 1604d0145..7e75fcb1b 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -41,17 +41,27 @@ protected MessageBody(FrameContext context) public async Task Consume(CancellationToken cancellationToken = default(CancellationToken)) { Task result; + var firstLoop = true; do { - var send100Continue = 0; result = SkipAsyncImplementation(cancellationToken); if (!result.IsCompleted) { - send100Continue = Interlocked.Exchange(ref _send100Continue, 0); + if (firstLoop && Interlocked.Exchange(ref _send100Continue, 0) == 1) + { + firstLoop = false; + _context.FrameControl.ProduceContinue(); + } + } + else if (result.GetAwaiter().GetResult() == 0) + { + // Completed Task, end of stream + return; } - if (send100Continue == 1) + else { - _context.FrameControl.ProduceContinue(); + // Completed Task, get next Task rather than await + continue; } } while (await result != 0); } From ecc439555e8a89bca9d5431b7e4f4cecc4fd437f Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Sun, 15 Nov 2015 22:27:39 +0000 Subject: [PATCH 5/6] Reuse ReadAsync for Consume --- .../Filter/LibuvStream.cs | 2 +- .../Http/MessageBody.cs | 110 +----------------- .../Http/SocketInputExtensions.cs | 26 +---- .../Infrastructure/MemoryPoolIterator2.cs | 47 ++------ 4 files changed, 20 insertions(+), 165 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs b/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs index 4f6319576..d61c0eafb 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs @@ -85,7 +85,7 @@ public override void Flush() private Task ReadAsync(ArraySegment buffer) { - return _input.ReadAsync(buffer); + return _input.ReadAsync(buffer.Array, buffer.Offset, buffer.Count); } } } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 7e75fcb1b..11a15e249 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -44,7 +44,7 @@ protected MessageBody(FrameContext context) var firstLoop = true; do { - result = SkipAsyncImplementation(cancellationToken); + result = ReadAsyncImplementation(default(ArraySegment), cancellationToken); if (!result.IsCompleted) { if (firstLoop && Interlocked.Exchange(ref _send100Continue, 0) == 1) @@ -68,8 +68,6 @@ protected MessageBody(FrameContext context) public abstract Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken); - public abstract Task SkipAsyncImplementation(CancellationToken cancellationToken); - public static MessageBody For( string httpVersion, IDictionary headers, @@ -138,11 +136,7 @@ public ForRemainingData(FrameContext context) public override Task ReadAsyncImplementation(ArraySegment buffer, CancellationToken cancellationToken) { - return _context.SocketInput.ReadAsync(buffer); - } - public override Task SkipAsyncImplementation(CancellationToken cancellationToken) - { - return _context.SocketInput.SkipAsync(4096); + return _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, buffer.Array == null ? 8192 : buffer.Count); } } @@ -163,35 +157,13 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe { var input = _context.SocketInput; - var limit = Math.Min(buffer.Count, _inputLength); + var limit = buffer.Array == null ? _inputLength : Math.Min(buffer.Count, _inputLength); if (limit == 0) { return 0; } - var limitedBuffer = new ArraySegment(buffer.Array, buffer.Offset, limit); - var actual = await _context.SocketInput.ReadAsync(limitedBuffer); - _inputLength -= actual; - - if (actual == 0) - { - throw new InvalidDataException("Unexpected end of request content"); - } - - return actual; - } - - public override async Task SkipAsyncImplementation(CancellationToken cancellationToken) - { - var input = _context.SocketInput; - - var limit = Math.Min(4096, _inputLength); - if (limit == 0) - { - return 0; - } - - var actual = await _context.SocketInput.SkipAsync(limit); + var actual = await _context.SocketInput.ReadAsync(buffer.Array, buffer.Offset, limit); _inputLength -= actual; if (actual == 0) @@ -244,7 +216,7 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe } while (_mode == Mode.ChunkData) { - var limit = Math.Min(buffer.Count, _inputLength); + var limit = buffer.Array == null ? _inputLength : Math.Min(buffer.Count, _inputLength); if (limit != 0) { await input; @@ -291,78 +263,6 @@ public override async Task ReadAsyncImplementation(ArraySegment buffe return 0; } - public override async Task SkipAsyncImplementation(CancellationToken cancellationToken) - { - var input = _context.SocketInput; - - while (_mode != Mode.Complete) - { - while (_mode == Mode.ChunkPrefix) - { - var chunkSize = 0; - if (!TakeChunkedLine(input, ref chunkSize)) - { - await input; - } - else if (chunkSize == 0) - { - _mode = Mode.Complete; - } - else - { - _mode = Mode.ChunkData; - } - _inputLength = chunkSize; - } - while (_mode == Mode.ChunkData) - { - var limit = Math.Min(4096, _inputLength); - if (limit != 0) - { - await input; - } - - var begin = input.ConsumingStart(); - int actual; - var end = begin.Skip(limit, out actual); - _inputLength -= actual; - input.ConsumingComplete(end, end); - - if (_inputLength == 0) - { - _mode = Mode.ChunkSuffix; - } - if (actual != 0) - { - return actual; - } - } - while (_mode == Mode.ChunkSuffix) - { - var scan = input.ConsumingStart(); - var consumed = scan; - var ch1 = scan.Take(); - var ch2 = scan.Take(); - if (ch1 == -1 || ch2 == -1) - { - input.ConsumingComplete(consumed, scan); - await input; - } - else if (ch1 == '\r' && ch2 == '\n') - { - input.ConsumingComplete(scan, scan); - _mode = Mode.ChunkPrefix; - } - else - { - throw new NotImplementedException("INVALID REQUEST FORMAT"); - } - } - } - - return 0; - } - private static bool TakeChunkedLine(SocketInput baton, ref int chunkSizeOut) { var scan = baton.ConsumingStart(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs index 70a491565..c36d88155 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketInputExtensions.cs @@ -8,7 +8,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Http { public static class SocketInputExtensions { - public static async Task ReadAsync(this SocketInput input, ArraySegment buffer) + public static async Task ReadAsync(this SocketInput input, byte[] buffer, int offset, int count) { while (true) { @@ -16,29 +16,7 @@ public static async Task ReadAsync(this SocketInput input, ArraySegment SkipAsync(this SocketInput input, int limit) - { - while (true) - { - await input; - - var begin = input.ConsumingStart(); - int actual; - var end = begin.Skip(limit, out actual); + var end = begin.CopyTo(buffer, offset, count, out actual); input.ConsumingComplete(end, end); if (actual != 0) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs index c14534a17..ddb12bff7 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPoolIterator2.cs @@ -600,18 +600,27 @@ public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int a if (remaining <= following) { actual = count; - Buffer.BlockCopy(block.Array, index, array, offset, remaining); + if (array != null) + { + Buffer.BlockCopy(block.Array, index, array, offset, remaining); + } return new MemoryPoolIterator2(block, index + remaining); } else if (block.Next == null) { actual = count - remaining + following; - Buffer.BlockCopy(block.Array, index, array, offset, following); + if (array != null) + { + Buffer.BlockCopy(block.Array, index, array, offset, following); + } return new MemoryPoolIterator2(block, index + following); } else { - Buffer.BlockCopy(block.Array, index, array, offset, following); + if (array != null) + { + Buffer.BlockCopy(block.Array, index, array, offset, following); + } offset += following; remaining -= following; block = block.Next; @@ -619,37 +628,5 @@ public MemoryPoolIterator2 CopyTo(byte[] array, int offset, int count, out int a } } } - public MemoryPoolIterator2 Skip(int limit, out int actual) - { - if (IsDefault) - { - actual = 0; - return this; - } - - var block = _block; - var index = _index; - var remaining = limit; - while (true) - { - var following = block.End - index; - if (remaining <= following) - { - actual = limit; - return new MemoryPoolIterator2(block, index + remaining); - } - else if (block.Next == null) - { - actual = limit - remaining + following; - return new MemoryPoolIterator2(block, index + following); - } - else - { - remaining -= following; - block = block.Next; - index = block.Start; - } - } - } } } From 7691a7cc2312056418756aaceb2a3b2c57c2f761 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Mon, 16 Nov 2015 06:43:49 +0000 Subject: [PATCH 6/6] Improved Send100 Check --- .../Http/MessageBody.cs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs index 11a15e249..8edd424a1 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/MessageBody.cs @@ -41,16 +41,19 @@ protected MessageBody(FrameContext context) public async Task Consume(CancellationToken cancellationToken = default(CancellationToken)) { Task result; - var firstLoop = true; + var send100checked = false; do { result = ReadAsyncImplementation(default(ArraySegment), cancellationToken); if (!result.IsCompleted) { - if (firstLoop && Interlocked.Exchange(ref _send100Continue, 0) == 1) + if (!send100checked) { - firstLoop = false; - _context.FrameControl.ProduceContinue(); + if (Interlocked.Exchange(ref _send100Continue, 0) == 1) + { + _context.FrameControl.ProduceContinue(); + } + send100checked = true; } } else if (result.GetAwaiter().GetResult() == 0)