From d723f9da210e97ead29cf4fae403ec0a8b4ad452 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 8 Jul 2015 11:50:11 -0700 Subject: [PATCH 1/6] Reduce calls to uv_write by calling it with multiple buffers when possible --- .../Http/SocketOutput.cs | 174 +++++++++++++----- 1 file changed, 130 insertions(+), 44 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 4c3e4c8cc..b611b3cf0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -3,15 +3,27 @@ using Microsoft.AspNet.Server.Kestrel.Networking; using System; +using System.Collections.Generic; using System.Threading; namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { + private const int _maxPendingWrites = 3; + private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; + private WriteContext _nextWriteContext; + + // The number of write operations that have been scheduled so far + // but have not completed. + private int _writesSending = 0; + + // This locks all access to _nextWriteContext and _writesSending + private readonly object _lockObj = new object(); + public SocketOutput(KestrelThread thread, UvStreamHandle socket) { _thread = thread; @@ -26,71 +38,145 @@ public void Write(ArraySegment buffer, Action callback, buffer = new ArraySegment(copy); KestrelTrace.Log.ConnectionWrite(0, buffer.Count); - var req = new ThisWriteReq(); - req.Init(_thread.Loop); - req.Contextualize(this, _socket, buffer, callback, state); - req.Write(); + + var context = new WriteOperation + { + Buffer = buffer, + Callback = callback, + State = state + }; + + lock (_lockObj) + { + if (_nextWriteContext == null) + { + _nextWriteContext = new WriteContext(this); + } + + _nextWriteContext.Operations.Add(context); + + if (_writesSending < _maxPendingWrites) + { + ScheduleWrite(); + _writesSending++; + } + } + } + + private void ScheduleWrite() + { + _thread.Post(obj => + { + var self = (SocketOutput)obj; + self.WriteAllPending(); + }, this); } - public class ThisWriteReq : UvWriteReq + // This is called on the libuv event loop + private void WriteAllPending() { - SocketOutput _self; - ArraySegment _buffer; - UvStreamHandle _socket; - Action _callback; - object _state; - Exception _callbackError; - - internal void Contextualize( - SocketOutput socketOutput, - UvStreamHandle socket, - ArraySegment buffer, - Action callback, - object state) + WriteContext writingContext; + + lock (_lockObj) + { + if (_nextWriteContext != null) + { + writingContext = _nextWriteContext; + _nextWriteContext = null; + } + else + { + _writesSending--; + return; + } + } + + try { - _self = socketOutput; - _socket = socket; - _buffer = buffer; - _callback = callback; - _state = state; + var buffers = new ArraySegment[writingContext.Operations.Count]; + + var i = 0; + foreach (var writeOp in writingContext.Operations) + { + buffers[i] = writeOp.Buffer; + i++; + } + + writingContext.WriteReq.Write(_socket, new ArraySegment>(buffers), (r, status, error, state) => + { + var writtenContext = (WriteContext)state; + writtenContext.Self.OnWriteCompleted(writtenContext.Operations, r, status, error); + }, writingContext); + } + catch + { + lock (_lockObj) + { + // Lock instead of using Interlocked.Decrement so _writesSending + // doesn't change in the middle of executing other synchronized code. + _writesSending--; + } + + throw; } + } - public void Write() + // This is called on the libuv event loop + private void OnWriteCompleted(List completedWrites, UvWriteReq req, int status, Exception error) + { + lock (_lockObj) { - _self._thread.Post(obj => + if (_nextWriteContext != null) + { + ScheduleWrite(); + } + else { - var req = (ThisWriteReq)obj; - req.Write( - req._socket, - new ArraySegment>( - new[] { req._buffer }), - (r, status, error, state) => ((ThisWriteReq)state).OnWrite(status, error), - req); - }, this); + _writesSending--; + } } - private void OnWrite(int status, Exception error) + req.Dispose(); + + foreach (var writeOp in completedWrites) { KestrelTrace.Log.ConnectionWriteCallback(0, status); //NOTE: pool this? - Dispose(); - // Get off the event loop before calling user code! - _callbackError = error; + writeOp.Error = error; ThreadPool.QueueUserWorkItem(obj => { - var req = (ThisWriteReq)obj; - req._callback(req._callbackError, req._state); - }, this); - } + var op = (WriteOperation)obj; + op.Callback(op.Error, op.State); + }, writeOp); + } } - - public bool Flush(Action drained) + private class WriteOperation { - return false; + public ArraySegment Buffer; + public Exception Error; + public Action Callback; + public object State; } + private class WriteContext + { + public WriteContext(SocketOutput self) + { + Self = self; + + WriteReq = new UvWriteReq(); + WriteReq.Init(self._thread.Loop); + + Operations = new List(); + } + + public SocketOutput Self; + + public UvWriteReq WriteReq; + public List Operations; + } } } From 5b06a763672a20577ef775334473ae97f1d6e540 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 8 Jul 2015 12:42:32 -0700 Subject: [PATCH 2/6] Add sample that can produce large responses --- KestrelHttpServer.sln | 9 ++++- .../LargeResponseApp/LargeResponseApp.xproj | 19 ++++++++++ .../Microsoft.AspNet.Hosting.ini | 3 ++ samples/LargeResponseApp/Startup.cs | 38 +++++++++++++++++++ samples/LargeResponseApp/project.json | 16 ++++++++ 5 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 samples/LargeResponseApp/LargeResponseApp.xproj create mode 100644 samples/LargeResponseApp/Microsoft.AspNet.Hosting.ini create mode 100644 samples/LargeResponseApp/Startup.cs create mode 100644 samples/LargeResponseApp/project.json diff --git a/KestrelHttpServer.sln b/KestrelHttpServer.sln index a9437d065..d2ebe3ed3 100644 --- a/KestrelHttpServer.sln +++ b/KestrelHttpServer.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 14 -VisualStudioVersion = 14.0.22111.0 +VisualStudioVersion = 14.0.22823.1 MinimumVisualStudioVersion = 10.0.40219.1 Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.AspNet.Server.Kestrel", "src\Microsoft.AspNet.Server.Kestrel\Microsoft.AspNet.Server.Kestrel.xproj", "{F510611A-3BEE-4B88-A613-5F4A74ED82A1}" EndProject @@ -25,6 +25,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{D3273454-E EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE}" EndProject +Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "LargeResponseApp", "samples\LargeResponseApp\LargeResponseApp.xproj", "{B35D4D31-E74C-4646-8A11-7A7A40F0021E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -47,6 +49,10 @@ Global {30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Debug|Any CPU.Build.0 = Debug|Any CPU {30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Release|Any CPU.ActiveCfg = Release|Any CPU {30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Release|Any CPU.Build.0 = Release|Any CPU + {B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -56,5 +62,6 @@ Global {37F3BFB2-6454-49E5-9D7F-581BF755CCFE} = {D3273454-EA07-41D2-BF0B-FCC3675C2483} {2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3} = {8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE} {30B7617E-58EF-4382-B3EA-5B2E718CF1A6} = {2D5D5227-4DBD-499A-96B1-76A36B03B750} + {B35D4D31-E74C-4646-8A11-7A7A40F0021E} = {8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE} EndGlobalSection EndGlobal diff --git a/samples/LargeResponseApp/LargeResponseApp.xproj b/samples/LargeResponseApp/LargeResponseApp.xproj new file mode 100644 index 000000000..48abc5f22 --- /dev/null +++ b/samples/LargeResponseApp/LargeResponseApp.xproj @@ -0,0 +1,19 @@ + + + + 14.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + + + + b35d4d31-e74c-4646-8a11-7a7a40f0021e + LargeResponseApp + ..\..\artifacts\obj\$(MSBuildProjectName) + ..\..\artifacts\bin\$(MSBuildProjectName)\ + + + 2.0 + 42216 + + + \ No newline at end of file diff --git a/samples/LargeResponseApp/Microsoft.AspNet.Hosting.ini b/samples/LargeResponseApp/Microsoft.AspNet.Hosting.ini new file mode 100644 index 000000000..3fc5452c9 --- /dev/null +++ b/samples/LargeResponseApp/Microsoft.AspNet.Hosting.ini @@ -0,0 +1,3 @@ + +Server = Kestrel +Server.Urls = http://localhost:5001/ diff --git a/samples/LargeResponseApp/Startup.cs b/samples/LargeResponseApp/Startup.cs new file mode 100644 index 000000000..407b1fdd4 --- /dev/null +++ b/samples/LargeResponseApp/Startup.cs @@ -0,0 +1,38 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using Microsoft.AspNet.Builder; +using System.Text; +using System.Threading.Tasks; + +namespace LargeResponseApp +{ + public class Startup + { + private const int _chunkSize = 4096; + private const int _defaultNumChunks = 16; + private static byte[] _chunk = Encoding.UTF8.GetBytes(new string('a', _chunkSize)); + private static Task _emptyTask = Task.FromResult(null); + + public void Configure(IApplicationBuilder app) + { + app.Run(async (context) => + { + int numChunks; + var path = context.Request.Path; + if (!path.HasValue || !int.TryParse(path.Value.Substring(1), out numChunks)) + { + numChunks = _defaultNumChunks; + } + + context.Response.ContentLength = _chunkSize * numChunks; + context.Response.ContentType = "text/plain"; + + for (int i = 0; i < numChunks; i++) + { + await context.Response.Body.WriteAsync(_chunk, 0, _chunkSize); + } + }); + } + } +} diff --git a/samples/LargeResponseApp/project.json b/samples/LargeResponseApp/project.json new file mode 100644 index 000000000..f533e7ed4 --- /dev/null +++ b/samples/LargeResponseApp/project.json @@ -0,0 +1,16 @@ +{ + "version": "1.0.0-*", + "dependencies": { + "Kestrel": "1.0.0-*" + }, + + "frameworks": { + "dnx451": { }, + "dnxcore50": { } + }, + + "commands": { + "run": "Kestrel", + "web": "Microsoft.AspNet.Hosting" + } +} From 74fa82bca73e381bd251f40750e8b6637a7f6ccd Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Mon, 13 Jul 2015 10:34:19 -0700 Subject: [PATCH 3/6] Complete WriteAsync Tasks early when there are less than 64KB buffered --- .../Http/SocketOutput.cs | 99 +++++++++++++------ 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index b611b3cf0..5deeb8367 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -11,23 +11,28 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public class SocketOutput : ISocketOutput { private const int _maxPendingWrites = 3; + private const int _maxBytesBufferedBeforeThrottling = 65536 / 8; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; - private WriteContext _nextWriteContext; + // This locks all access to to all the below + private readonly object _lockObj = new object(); // The number of write operations that have been scheduled so far // but have not completed. - private int _writesSending = 0; + private int _writesPending = 0; - // This locks all access to _nextWriteContext and _writesSending - private readonly object _lockObj = new object(); + private int _numBytesBuffered = 0; + private Exception _lastWriteError; + private WriteContext _nextWriteContext; + private readonly Queue _callbacksPending; public SocketOutput(KestrelThread thread, UvStreamHandle socket) { _thread = thread; _socket = socket; + _callbacksPending = new Queue(); } public void Write(ArraySegment buffer, Action callback, object state) @@ -39,11 +44,16 @@ public void Write(ArraySegment buffer, Action callback, KestrelTrace.Log.ConnectionWrite(0, buffer.Count); - var context = new WriteOperation + var writeOp = new WriteOperation + { + Buffer = buffer + }; + + var callbackContext = new CallbackContext { - Buffer = buffer, Callback = callback, - State = state + State = state, + BytesToWrite = buffer.Count }; lock (_lockObj) @@ -53,12 +63,26 @@ public void Write(ArraySegment buffer, Action callback, _nextWriteContext = new WriteContext(this); } - _nextWriteContext.Operations.Add(context); + _nextWriteContext.Operations.Enqueue(writeOp); + _numBytesBuffered += buffer.Count; + + // Complete the write task immediately if all previous write tasks have been completed, + // the buffers haven't grown too large, and the last write to the socket succeeded. + if (_lastWriteError == null && + _callbacksPending.Count == 0 && + _numBytesBuffered < _maxBytesBufferedBeforeThrottling) + { + TriggerCallback(callbackContext); + } + else + { + _callbacksPending.Enqueue(callbackContext); + } - if (_writesSending < _maxPendingWrites) + if (_writesPending < _maxPendingWrites) { ScheduleWrite(); - _writesSending++; + _writesPending++; } } } @@ -86,7 +110,7 @@ private void WriteAllPending() } else { - _writesSending--; + _writesPending--; return; } } @@ -114,7 +138,7 @@ private void WriteAllPending() { // Lock instead of using Interlocked.Decrement so _writesSending // doesn't change in the middle of executing other synchronized code. - _writesSending--; + _writesPending--; } throw; @@ -122,43 +146,58 @@ private void WriteAllPending() } // This is called on the libuv event loop - private void OnWriteCompleted(List completedWrites, UvWriteReq req, int status, Exception error) + private void OnWriteCompleted(Queue completedWrites, UvWriteReq req, int status, Exception error) { lock (_lockObj) { + _lastWriteError = error; + if (_nextWriteContext != null) { ScheduleWrite(); } else { - _writesSending--; + _writesPending--; + } + + foreach (var writeOp in completedWrites) + { + _numBytesBuffered -= writeOp.Buffer.Count; + } + + var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered; + while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite < bytesLeftToBuffer) + { + var context = _callbacksPending.Dequeue(); + TriggerCallback(context); } } req.Dispose(); + } - foreach (var writeOp in completedWrites) + private void TriggerCallback(CallbackContext context) + { + context.Error = _lastWriteError; + ThreadPool.QueueUserWorkItem(obj => { - KestrelTrace.Log.ConnectionWriteCallback(0, status); - //NOTE: pool this? - - // Get off the event loop before calling user code! - writeOp.Error = error; - ThreadPool.QueueUserWorkItem(obj => - { - var op = (WriteOperation)obj; - op.Callback(op.Error, op.State); - }, writeOp); - } + var c = (CallbackContext)obj; + c.Callback(c.Error, c.State); + }, context); } - private class WriteOperation + private class CallbackContext { - public ArraySegment Buffer; public Exception Error; public Action Callback; public object State; + public int BytesToWrite; + } + + private class WriteOperation + { + public ArraySegment Buffer; } private class WriteContext @@ -170,13 +209,13 @@ public WriteContext(SocketOutput self) WriteReq = new UvWriteReq(); WriteReq.Init(self._thread.Loop); - Operations = new List(); + Operations = new Queue(); } public SocketOutput Self; public UvWriteReq WriteReq; - public List Operations; + public Queue Operations; } } } From c345849707a69821b563307992700f4db0d004b2 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Wed, 15 Jul 2015 20:03:43 -0700 Subject: [PATCH 4/6] Don't use QueueUserWorkItem to trigger write callbacks immediately - In this case we are off the event loop, so we can invoke the callback directly. - Increase _maxBytesBufferedBeforeThrottling --- .../Http/SocketOutput.cs | 84 ++++++++----------- 1 file changed, 37 insertions(+), 47 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 5deeb8367..7ee91339e 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -11,12 +11,12 @@ namespace Microsoft.AspNet.Server.Kestrel.Http public class SocketOutput : ISocketOutput { private const int _maxPendingWrites = 3; - private const int _maxBytesBufferedBeforeThrottling = 65536 / 8; + private const int _maxBytesBufferedBeforeThrottling = 65536; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; - // This locks all access to to all the below + // This locks access to to all of the below fields private readonly object _lockObj = new object(); // The number of write operations that have been scheduled so far @@ -44,17 +44,7 @@ public void Write(ArraySegment buffer, Action callback, KestrelTrace.Log.ConnectionWrite(0, buffer.Count); - var writeOp = new WriteOperation - { - Buffer = buffer - }; - - var callbackContext = new CallbackContext - { - Callback = callback, - State = state, - BytesToWrite = buffer.Count - }; + bool triggerCallbackNow = false; lock (_lockObj) { @@ -63,20 +53,22 @@ public void Write(ArraySegment buffer, Action callback, _nextWriteContext = new WriteContext(this); } - _nextWriteContext.Operations.Enqueue(writeOp); + _nextWriteContext.Buffers.Enqueue(buffer); _numBytesBuffered += buffer.Count; // Complete the write task immediately if all previous write tasks have been completed, // the buffers haven't grown too large, and the last write to the socket succeeded. - if (_lastWriteError == null && - _callbacksPending.Count == 0 && - _numBytesBuffered < _maxBytesBufferedBeforeThrottling) + triggerCallbackNow = _lastWriteError == null && + _callbacksPending.Count == 0 && + _numBytesBuffered <= _maxBytesBufferedBeforeThrottling; + if (!triggerCallbackNow) { - TriggerCallback(callbackContext); - } - else - { - _callbacksPending.Enqueue(callbackContext); + _callbacksPending.Enqueue(new CallbackContext + { + Callback = callback, + State = state, + BytesToWrite = buffer.Count + }); } if (_writesPending < _maxPendingWrites) @@ -85,6 +77,11 @@ public void Write(ArraySegment buffer, Action callback, _writesPending++; } } + + if (triggerCallbackNow) + { + callback(null, state); + } } private void ScheduleWrite() @@ -117,19 +114,21 @@ private void WriteAllPending() try { - var buffers = new ArraySegment[writingContext.Operations.Count]; + var buffers = new ArraySegment[writingContext.Buffers.Count]; var i = 0; - foreach (var writeOp in writingContext.Operations) + foreach (var buffer in writingContext.Buffers) { - buffers[i] = writeOp.Buffer; - i++; + buffers[i++] = buffer; } - writingContext.WriteReq.Write(_socket, new ArraySegment>(buffers), (r, status, error, state) => + var writeReq = new UvWriteReq(); + writeReq.Init(_thread.Loop); + + writeReq.Write(_socket, new ArraySegment>(buffers), (r, status, error, state) => { var writtenContext = (WriteContext)state; - writtenContext.Self.OnWriteCompleted(writtenContext.Operations, r, status, error); + writtenContext.Self.OnWriteCompleted(writtenContext.Buffers, r, status, error); }, writingContext); } catch @@ -146,8 +145,10 @@ private void WriteAllPending() } // This is called on the libuv event loop - private void OnWriteCompleted(Queue completedWrites, UvWriteReq req, int status, Exception error) + private void OnWriteCompleted(Queue> writtenBuffers, UvWriteReq req, int status, Exception error) { + KestrelTrace.Log.ConnectionWriteCallback(0, status); + lock (_lockObj) { _lastWriteError = error; @@ -161,16 +162,16 @@ private void OnWriteCompleted(Queue completedWrites, UvWriteReq _writesPending--; } - foreach (var writeOp in completedWrites) + foreach (var writeBuffer in writtenBuffers) { - _numBytesBuffered -= writeOp.Buffer.Count; + _numBytesBuffered -= writeBuffer.Count; } var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered; - while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite < bytesLeftToBuffer) + while (_callbacksPending.Count > 0 && + _callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer) { - var context = _callbacksPending.Dequeue(); - TriggerCallback(context); + TriggerCallback(_callbacksPending.Dequeue()); } } @@ -195,27 +196,16 @@ private class CallbackContext public int BytesToWrite; } - private class WriteOperation - { - public ArraySegment Buffer; - } - private class WriteContext { public WriteContext(SocketOutput self) { Self = self; - - WriteReq = new UvWriteReq(); - WriteReq.Init(self._thread.Loop); - - Operations = new Queue(); + Buffers = new Queue>(); } public SocketOutput Self; - - public UvWriteReq WriteReq; - public Queue Operations; + public Queue> Buffers; } } } From cce9d8f09cb1a03c46628d69800325da7d880613 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Fri, 24 Jul 2015 16:47:05 -0700 Subject: [PATCH 5/6] Make SocketOutput more testable - Added a MockLibUv class - Create a SocketOutputTests class --- .../KestrelEngine.cs | 20 ++- .../Networking/Libuv.cs | 124 ++++++++-------- .../Networking/UvMemory.cs | 2 +- .../Properties/AssemblyInfo.cs | 2 + .../SocketOutputTests.cs | 133 ++++++++++++++++++ .../TestHelpers/MockLibUv.cs | 84 +++++++++++ 6 files changed, 298 insertions(+), 67 deletions(-) create mode 100644 test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs create mode 100644 test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibUv.cs diff --git a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs index 45513365a..49d3ac0a0 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/KestrelEngine.cs @@ -14,11 +14,8 @@ namespace Microsoft.AspNet.Server.Kestrel public class KestrelEngine : IDisposable { public KestrelEngine(ILibraryManager libraryManager, IApplicationShutdown appShutdownService) + : this(appShutdownService) { - AppShutdown = appShutdownService; - Threads = new List(); - Listeners = new List(); - Memory = new MemoryPool(); Libuv = new Libuv(); var libraryPath = default(string); @@ -61,6 +58,21 @@ public KestrelEngine(ILibraryManager libraryManager, IApplicationShutdown appShu Libuv.Load(libraryPath); } + // For testing + internal KestrelEngine(Libuv uv, IApplicationShutdown appShutdownService) + : this(appShutdownService) + { + Libuv = uv; + } + + private KestrelEngine(IApplicationShutdown appShutdownService) + { + AppShutdown = appShutdownService; + Threads = new List(); + Listeners = new List(); + Memory = new MemoryPool(); + } + public Libuv Libuv { get; private set; } public IMemoryPool Memory { get; set; } public IApplicationShutdown AppShutdown { get; private set; } diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs index 86468bbbd..4ff90ecd5 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/Libuv.cs @@ -84,16 +84,16 @@ public int Check(int statusCode, out Exception error) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_loop_init(UvLoopHandle a0); - uv_loop_init _uv_loop_init = default(uv_loop_init); + protected delegate int uv_loop_init(UvLoopHandle a0); + protected uv_loop_init _uv_loop_init = default(uv_loop_init); public void loop_init(UvLoopHandle handle) { Check(_uv_loop_init(handle)); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_loop_close(IntPtr a0); - uv_loop_close _uv_loop_close = default(uv_loop_close); + protected delegate int uv_loop_close(IntPtr a0); + protected uv_loop_close _uv_loop_close = default(uv_loop_close); public void loop_close(UvLoopHandle handle) { handle.Validate(closed: true); @@ -101,8 +101,8 @@ public void loop_close(UvLoopHandle handle) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_run(UvLoopHandle handle, int mode); - uv_run _uv_run = default(uv_run); + protected delegate int uv_run(UvLoopHandle handle, int mode); + protected uv_run _uv_run = default(uv_run); public int run(UvLoopHandle handle, int mode) { handle.Validate(); @@ -110,8 +110,8 @@ public int run(UvLoopHandle handle, int mode) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate void uv_stop(UvLoopHandle handle); - uv_stop _uv_stop = default(uv_stop); + protected delegate void uv_stop(UvLoopHandle handle); + protected uv_stop _uv_stop = default(uv_stop); public void stop(UvLoopHandle handle) { handle.Validate(); @@ -119,8 +119,8 @@ public void stop(UvLoopHandle handle) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate void uv_ref(UvHandle handle); - uv_ref _uv_ref = default(uv_ref); + protected delegate void uv_ref(UvHandle handle); + protected uv_ref _uv_ref = default(uv_ref); public void @ref(UvHandle handle) { handle.Validate(); @@ -128,8 +128,8 @@ public void @ref(UvHandle handle) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate void uv_unref(UvHandle handle); - uv_unref _uv_unref = default(uv_unref); + protected delegate void uv_unref(UvHandle handle); + protected uv_unref _uv_unref = default(uv_unref); public void unref(UvHandle handle) { handle.Validate(); @@ -140,8 +140,8 @@ public void unref(UvHandle handle) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_close_cb(IntPtr handle); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate void uv_close(IntPtr handle, uv_close_cb close_cb); - uv_close _uv_close = default(uv_close); + protected delegate void uv_close(IntPtr handle, uv_close_cb close_cb); + protected uv_close _uv_close = default(uv_close); public void close(UvHandle handle, uv_close_cb close_cb) { handle.Validate(closed: true); @@ -155,8 +155,8 @@ public void close(IntPtr handle, uv_close_cb close_cb) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_async_cb(IntPtr handle); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_async_init(UvLoopHandle loop, UvAsyncHandle handle, uv_async_cb cb); - uv_async_init _uv_async_init = default(uv_async_init); + protected delegate int uv_async_init(UvLoopHandle loop, UvAsyncHandle handle, uv_async_cb cb); + protected uv_async_init _uv_async_init = default(uv_async_init); public void async_init(UvLoopHandle loop, UvAsyncHandle handle, uv_async_cb cb) { loop.Validate(); @@ -165,16 +165,16 @@ public void async_init(UvLoopHandle loop, UvAsyncHandle handle, uv_async_cb cb) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_async_send(UvAsyncHandle handle); - uv_async_send _uv_async_send = default(uv_async_send); + protected delegate int uv_async_send(UvAsyncHandle handle); + protected uv_async_send _uv_async_send = default(uv_async_send); public void async_send(UvAsyncHandle handle) { Check(_uv_async_send(handle)); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle); - uv_tcp_init _uv_tcp_init = default(uv_tcp_init); + protected delegate int uv_tcp_init(UvLoopHandle loop, UvTcpHandle handle); + protected uv_tcp_init _uv_tcp_init = default(uv_tcp_init); public void tcp_init(UvLoopHandle loop, UvTcpHandle handle) { loop.Validate(); @@ -183,8 +183,8 @@ public void tcp_init(UvLoopHandle loop, UvTcpHandle handle) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_tcp_bind(UvTcpHandle handle, ref sockaddr addr, int flags); - uv_tcp_bind _uv_tcp_bind = default(uv_tcp_bind); + protected delegate int uv_tcp_bind(UvTcpHandle handle, ref sockaddr addr, int flags); + protected uv_tcp_bind _uv_tcp_bind = default(uv_tcp_bind); public void tcp_bind(UvTcpHandle handle, ref sockaddr addr, int flags) { handle.Validate(); @@ -192,8 +192,8 @@ public void tcp_bind(UvTcpHandle handle, ref sockaddr addr, int flags) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_tcp_open(UvTcpHandle handle, IntPtr hSocket); - uv_tcp_open _uv_tcp_open = default(uv_tcp_open); + protected delegate int uv_tcp_open(UvTcpHandle handle, IntPtr hSocket); + protected uv_tcp_open _uv_tcp_open = default(uv_tcp_open); public void tcp_open(UvTcpHandle handle, IntPtr hSocket) { handle.Validate(); @@ -201,8 +201,8 @@ public void tcp_open(UvTcpHandle handle, IntPtr hSocket) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_pipe_init(UvLoopHandle loop, UvPipeHandle handle, int ipc); - uv_pipe_init _uv_pipe_init = default(uv_pipe_init); + protected delegate int uv_pipe_init(UvLoopHandle loop, UvPipeHandle handle, int ipc); + protected uv_pipe_init _uv_pipe_init = default(uv_pipe_init); public void pipe_init(UvLoopHandle loop, UvPipeHandle handle, bool ipc) { loop.Validate(); @@ -211,8 +211,8 @@ public void pipe_init(UvLoopHandle loop, UvPipeHandle handle, bool ipc) } [UnmanagedFunctionPointer(CallingConvention.Cdecl, CharSet = CharSet.Ansi)] - delegate int uv_pipe_bind(UvPipeHandle loop, string name); - uv_pipe_bind _uv_pipe_bind = default(uv_pipe_bind); + protected delegate int uv_pipe_bind(UvPipeHandle loop, string name); + protected uv_pipe_bind _uv_pipe_bind = default(uv_pipe_bind); public void pipe_bind(UvPipeHandle handle, string name) { handle.Validate(); @@ -222,8 +222,8 @@ public void pipe_bind(UvPipeHandle handle, string name) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_connection_cb(IntPtr server, int status); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_listen(UvStreamHandle handle, int backlog, uv_connection_cb cb); - uv_listen _uv_listen = default(uv_listen); + protected delegate int uv_listen(UvStreamHandle handle, int backlog, uv_connection_cb cb); + protected uv_listen _uv_listen = default(uv_listen); public void listen(UvStreamHandle handle, int backlog, uv_connection_cb cb) { handle.Validate(); @@ -231,8 +231,8 @@ public void listen(UvStreamHandle handle, int backlog, uv_connection_cb cb) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_accept(UvStreamHandle server, UvStreamHandle client); - uv_accept _uv_accept = default(uv_accept); + protected delegate int uv_accept(UvStreamHandle server, UvStreamHandle client); + protected uv_accept _uv_accept = default(uv_accept); public void accept(UvStreamHandle server, UvStreamHandle client) { server.Validate(); @@ -243,8 +243,8 @@ public void accept(UvStreamHandle server, UvStreamHandle client) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_connect_cb(IntPtr req, int status); [UnmanagedFunctionPointer(CallingConvention.Cdecl, CharSet = CharSet.Ansi)] - unsafe delegate int uv_pipe_connect(UvConnectRequest req, UvPipeHandle handle, string name, uv_connect_cb cb); - uv_pipe_connect _uv_pipe_connect = default(uv_pipe_connect); + unsafe protected delegate int uv_pipe_connect(UvConnectRequest req, UvPipeHandle handle, string name, uv_connect_cb cb); + protected uv_pipe_connect _uv_pipe_connect = default(uv_pipe_connect); unsafe public void pipe_connect(UvConnectRequest req, UvPipeHandle handle, string name, uv_connect_cb cb) { req.Validate(); @@ -257,8 +257,8 @@ unsafe public void pipe_connect(UvConnectRequest req, UvPipeHandle handle, strin [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_read_cb(IntPtr server, int nread, ref uv_buf_t buf); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_read_start(UvStreamHandle handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); - uv_read_start _uv_read_start = default(uv_read_start); + protected delegate int uv_read_start(UvStreamHandle handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); + protected uv_read_start _uv_read_start = default(uv_read_start); public void read_start(UvStreamHandle handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { handle.Validate(); @@ -266,8 +266,8 @@ public void read_start(UvStreamHandle handle, uv_alloc_cb alloc_cb, uv_read_cb r } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_read_stop(UvStreamHandle handle); - uv_read_stop _uv_read_stop = default(uv_read_stop); + protected delegate int uv_read_stop(UvStreamHandle handle); + protected uv_read_stop _uv_read_stop = default(uv_read_stop); public void read_stop(UvStreamHandle handle) { handle.Validate(); @@ -275,8 +275,8 @@ public void read_stop(UvStreamHandle handle) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs); - uv_try_write _uv_try_write = default(uv_try_write); + protected delegate int uv_try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs); + protected uv_try_write _uv_try_write = default(uv_try_write); public int try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs) { handle.Validate(); @@ -286,8 +286,8 @@ public int try_write(UvStreamHandle handle, Libuv.uv_buf_t[] bufs, int nbufs) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_write_cb(IntPtr req, int status); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - unsafe delegate int uv_write(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, uv_write_cb cb); - uv_write _uv_write = default(uv_write); + unsafe protected delegate int uv_write(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, uv_write_cb cb); + protected uv_write _uv_write = default(uv_write); unsafe public void write(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, uv_write_cb cb) { req.Validate(); @@ -296,8 +296,8 @@ unsafe public void write(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* b } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - unsafe delegate int uv_write2(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, UvStreamHandle sendHandle, uv_write_cb cb); - uv_write2 _uv_write2 = default(uv_write2); + unsafe protected delegate int uv_write2(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, UvStreamHandle sendHandle, uv_write_cb cb); + protected uv_write2 _uv_write2 = default(uv_write2); unsafe public void write2(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* bufs, int nbufs, UvStreamHandle sendHandle, uv_write_cb cb) { req.Validate(); @@ -308,8 +308,8 @@ unsafe public void write2(UvRequest req, UvStreamHandle handle, Libuv.uv_buf_t* [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_shutdown_cb(IntPtr req, int status); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb); - uv_shutdown _uv_shutdown = default(uv_shutdown); + protected delegate int uv_shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb); + protected uv_shutdown _uv_shutdown = default(uv_shutdown); public void shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb) { req.Validate(); @@ -318,8 +318,8 @@ public void shutdown(UvShutdownReq req, UvStreamHandle handle, uv_shutdown_cb cb } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate IntPtr uv_err_name(int err); - uv_err_name _uv_err_name = default(uv_err_name); + protected delegate IntPtr uv_err_name(int err); + protected uv_err_name _uv_err_name = default(uv_err_name); public unsafe String err_name(int err) { IntPtr ptr = _uv_err_name(err); @@ -327,8 +327,8 @@ public unsafe String err_name(int err) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate IntPtr uv_strerror(int err); - uv_strerror _uv_strerror = default(uv_strerror); + protected delegate IntPtr uv_strerror(int err); + protected uv_strerror _uv_strerror = default(uv_strerror); public unsafe String strerror(int err) { IntPtr ptr = _uv_strerror(err); @@ -336,42 +336,42 @@ public unsafe String strerror(int err) } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_loop_size(); - uv_loop_size _uv_loop_size = default(uv_loop_size); + protected delegate int uv_loop_size(); + protected uv_loop_size _uv_loop_size = default(uv_loop_size); public int loop_size() { return _uv_loop_size(); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_handle_size(HandleType handleType); - uv_handle_size _uv_handle_size = default(uv_handle_size); + protected delegate int uv_handle_size(HandleType handleType); + protected uv_handle_size _uv_handle_size = default(uv_handle_size); public int handle_size(HandleType handleType) { return _uv_handle_size(handleType); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_req_size(RequestType reqType); - uv_req_size _uv_req_size = default(uv_req_size); + protected delegate int uv_req_size(RequestType reqType); + protected uv_req_size _uv_req_size = default(uv_req_size); public int req_size(RequestType reqType) { return _uv_req_size(reqType); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_ip4_addr(string ip, int port, out sockaddr addr); + protected delegate int uv_ip4_addr(string ip, int port, out sockaddr addr); - uv_ip4_addr _uv_ip4_addr = default(uv_ip4_addr); + protected uv_ip4_addr _uv_ip4_addr = default(uv_ip4_addr); public int ip4_addr(string ip, int port, out sockaddr addr, out Exception error) { return Check(_uv_ip4_addr(ip, port, out addr), out error); } [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - delegate int uv_ip6_addr(string ip, int port, out sockaddr addr); + protected delegate int uv_ip6_addr(string ip, int port, out sockaddr addr); - uv_ip6_addr _uv_ip6_addr = default(uv_ip6_addr); + protected uv_ip6_addr _uv_ip6_addr = default(uv_ip6_addr); public int ip6_addr(string ip, int port, out sockaddr addr, out Exception error) { return Check(_uv_ip6_addr(ip, port, out addr), out error); @@ -380,8 +380,8 @@ public int ip6_addr(string ip, int port, out sockaddr addr, out Exception error) [UnmanagedFunctionPointer(CallingConvention.Cdecl)] public delegate void uv_walk_cb(IntPtr handle, IntPtr arg); [UnmanagedFunctionPointer(CallingConvention.Cdecl)] - unsafe delegate int uv_walk(UvLoopHandle loop, uv_walk_cb walk_cb, IntPtr arg); - uv_walk _uv_walk = default(uv_walk); + unsafe protected delegate int uv_walk(UvLoopHandle loop, uv_walk_cb walk_cb, IntPtr arg); + protected uv_walk _uv_walk = default(uv_walk); unsafe public void walk(UvLoopHandle loop, uv_walk_cb walk_cb, IntPtr arg) { loop.Validate(); diff --git a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs index 37e75ef53..f9d0e11ed 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Networking/UvMemory.cs @@ -14,7 +14,7 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking public abstract class UvMemory : SafeHandle { protected Libuv _uv; - private int _threadId; + protected int _threadId; public UvMemory() : base(IntPtr.Zero, true) { diff --git a/src/Microsoft.AspNet.Server.Kestrel/Properties/AssemblyInfo.cs b/src/Microsoft.AspNet.Server.Kestrel/Properties/AssemblyInfo.cs index 025a94598..573f5c363 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Properties/AssemblyInfo.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Properties/AssemblyInfo.cs @@ -2,5 +2,7 @@ // Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. using System.Reflection; +using System.Runtime.CompilerServices; +[assembly: InternalsVisibleTo("Microsoft.AspNet.Server.KestrelTests")] [assembly: AssemblyMetadata("Serviceable", "True")] \ No newline at end of file diff --git a/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs new file mode 100644 index 000000000..bb95a9650 --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestrelTests/SocketOutputTests.cs @@ -0,0 +1,133 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading; +using Microsoft.AspNet.Server.Kestrel; +using Microsoft.AspNet.Server.Kestrel.Http; +using Microsoft.AspNet.Server.Kestrel.Networking; +using Microsoft.AspNet.Server.KestrelTests.TestHelpers; +using Xunit; + +namespace Microsoft.AspNet.Server.KestrelTests +{ + public class SocketOutputTests + { + [Fact] + public void CanWrite1MB() + { + // This test was added because when initially implementing write-behind buffering in + // SocketOutput, the write callback would never be invoked for writes larger than + // _maxBytesPreCompleted even after the write actually completed. + + // Arrange + var mockLibuv = new MockLibuv + { + OnWrite = (socket, buffers, triggerCompleted) => + { + triggerCompleted(0); + return 0; + } + }; + + using (var kestrelEngine = new KestrelEngine(mockLibuv, new ShutdownNotImplemented())) + { + kestrelEngine.Start(count: 1); + + var kestrelThread = kestrelEngine.Threads[0]; + var socket = new MockSocket(kestrelThread.Loop.ThreadId); + var socketOutput = new SocketOutput(kestrelThread, socket); + + // I doubt _maxBytesPreCompleted will ever be over a MB. If it is, we should change this test. + var bufferSize = 1048576; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, state: null); + + // Assert + Assert.True(completedWh.Wait(100)); + } + } + + [Fact] + public void WritesDontCompleteImmediatelyWhenTooManyBytesAreAlreadyPreCompleted() + { + // This should match _maxBytesPreCompleted in SocketOutput + var maxBytesPreCompleted = 65536; + var completeQueue = new Queue>(); + + // Arrange + var mockLibuv = new MockLibuv + { + OnWrite = (socket, buffers, triggerCompleted) => + { + completeQueue.Enqueue(triggerCompleted); + return 0; + } + }; + + using (var kestrelEngine = new KestrelEngine(mockLibuv, new ShutdownNotImplemented())) + { + kestrelEngine.Start(count: 1); + + var kestrelThread = kestrelEngine.Threads[0]; + var socket = new MockSocket(kestrelThread.Loop.ThreadId); + var socketOutput = new SocketOutput(kestrelThread, socket); + + var bufferSize = maxBytesPreCompleted; + var buffer = new ArraySegment(new byte[bufferSize], 0, bufferSize); + var completedWh = new ManualResetEventSlim(); + Action onCompleted = (ex, state) => + { + Assert.Null(ex); + Assert.Null(state); + completedWh.Set(); + }; + + // Act + socketOutput.Write(buffer, onCompleted, state: null); + // Assert + // The first write should pre-complete since it is <= _maxBytesPreCompleted. + Assert.True(completedWh.Wait(100)); + // Arrange + completedWh.Reset(); + // Act + socketOutput.Write(buffer, onCompleted, state: null); + // Assert + // Too many bytes are already pre-completed for the second write to pre-complete. + Assert.False(completedWh.Wait(100)); + // Act + completeQueue.Dequeue()(0); + // Assert + // Finishing the first write should allow the second write to pre-complete. + Assert.True(completedWh.Wait(100)); + } + } + + private class MockSocket : UvStreamHandle + { + public MockSocket(int threadId) + { + // Set the handle to something other than IntPtr.Zero + // so handle.Validate doesn't fail in Libuv.write + handle = (IntPtr)1; + _threadId = threadId; + } + + protected override bool ReleaseHandle() + { + // No-op + return true; + } + } + } +} diff --git a/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibUv.cs b/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibUv.cs new file mode 100644 index 000000000..a23cd076c --- /dev/null +++ b/test/Microsoft.AspNet.Server.KestrelTests/TestHelpers/MockLibUv.cs @@ -0,0 +1,84 @@ +using System; +using System.Threading; +using Microsoft.AspNet.Server.Kestrel.Networking; + +namespace Microsoft.AspNet.Server.KestrelTests.TestHelpers +{ + public class MockLibuv : Libuv + { + private UvAsyncHandle _postHandle; + private uv_async_cb _onPost; + + private bool _stopLoop; + private readonly ManualResetEventSlim _loopWh = new ManualResetEventSlim(); + + private Func>, Action, int> _onWrite; + + unsafe public MockLibuv() + { + _uv_write = UvWrite; + + _uv_async_send = postHandle => + { + _loopWh.Set(); + + return 0; + }; + + _uv_async_init = (loop, postHandle, callback) => + { + _postHandle = postHandle; + _onPost = callback; + + return 0; + }; + + _uv_run = (loopHandle, mode) => + { + while (!_stopLoop) + { + _loopWh.Wait(); + _loopWh.Reset(); + _onPost(_postHandle.InternalGetHandle()); + } + + _postHandle.Dispose(); + loopHandle.Dispose(); + return 0; + }; + + _uv_stop = handle => + { + _stopLoop = true; + _loopWh.Set(); + }; + + _uv_req_size = reqType => IntPtr.Size; + _uv_loop_size = () => IntPtr.Size; + _uv_handle_size = handleType => IntPtr.Size; + _uv_loop_init = loop => 0; + _uv_tcp_init = (loopHandle, tcpHandle) => 0; + _uv_close = (handle, callback) => callback(handle); + _uv_loop_close = handle => 0; + _uv_unref = handle => { }; + _uv_walk = (loop, callback, ignore) => 0; + } + + public Func>, Action, int> OnWrite + { + get + { + return _onWrite; + } + set + { + _onWrite = value; + } + } + + unsafe private int UvWrite(UvRequest req, UvStreamHandle handle, uv_buf_t* bufs, int nbufs, uv_write_cb cb) + { + return _onWrite(handle, new ArraySegment>(), status => cb(req.InternalGetHandle(), status)); + } + } +} From 47d7f73bdce175e6a47acbd96c6ec04b91c078b4 Mon Sep 17 00:00:00 2001 From: Stephen Halter Date: Mon, 27 Jul 2015 19:40:27 -0700 Subject: [PATCH 6/6] Fix SocketOutput so that it can now complete large writes - Complete in this context means that the callback gets invoked. - Previously, calls to write would never complete if the buffer contained more than 64 KB (_maxBytesPreCompleted). - This is tested by SocketOutputTests.CanWrite1MB. --- .../Http/SocketOutput.cs | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs index 7ee91339e..45253dfe9 100644 --- a/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs +++ b/src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs @@ -5,13 +5,14 @@ using System; using System.Collections.Generic; using System.Threading; +using System.Diagnostics; namespace Microsoft.AspNet.Server.Kestrel.Http { public class SocketOutput : ISocketOutput { private const int _maxPendingWrites = 3; - private const int _maxBytesBufferedBeforeThrottling = 65536; + private const int _maxBytesPreCompleted = 65536; private readonly KestrelThread _thread; private readonly UvStreamHandle _socket; @@ -23,7 +24,7 @@ public class SocketOutput : ISocketOutput // but have not completed. private int _writesPending = 0; - private int _numBytesBuffered = 0; + private int _numBytesPreCompleted = 0; private Exception _lastWriteError; private WriteContext _nextWriteContext; private readonly Queue _callbacksPending; @@ -54,14 +55,17 @@ public void Write(ArraySegment buffer, Action callback, } _nextWriteContext.Buffers.Enqueue(buffer); - _numBytesBuffered += buffer.Count; // Complete the write task immediately if all previous write tasks have been completed, // the buffers haven't grown too large, and the last write to the socket succeeded. triggerCallbackNow = _lastWriteError == null && _callbacksPending.Count == 0 && - _numBytesBuffered <= _maxBytesBufferedBeforeThrottling; - if (!triggerCallbackNow) + _numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted; + if (triggerCallbackNow) + { + _numBytesPreCompleted += buffer.Count; + } + else { _callbacksPending.Enqueue(new CallbackContext { @@ -78,6 +82,7 @@ public void Write(ArraySegment buffer, Action callback, } } + // Make sure we call user code outside of the lock. if (triggerCallbackNow) { callback(null, state); @@ -164,15 +169,28 @@ private void OnWriteCompleted(Queue> writtenBuffers, UvWriteR foreach (var writeBuffer in writtenBuffers) { - _numBytesBuffered -= writeBuffer.Count; + // _numBytesPreCompleted can temporarily go negative in the event there are + // completed writes that we haven't triggered callbacks for yet. + _numBytesPreCompleted -= writeBuffer.Count; } - var bytesLeftToBuffer = _maxBytesBufferedBeforeThrottling - _numBytesBuffered; + + // bytesLeftToBuffer can be greater than _maxBytesPreCompleted + // This allows large writes to complete once they've actually finished. + var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted; while (_callbacksPending.Count > 0 && _callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer) { - TriggerCallback(_callbacksPending.Dequeue()); + var callbackContext = _callbacksPending.Dequeue(); + + _numBytesPreCompleted += callbackContext.BytesToWrite; + + TriggerCallback(callbackContext); } + + // Now that the while loop has completed the following invariants should hold true: + Trace.Assert(_numBytesPreCompleted >= 0); + Trace.Assert(_numBytesPreCompleted <= _maxBytesPreCompleted); } req.Dispose();