Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Write behind response buffering #131

Merged
merged 6 commits into from
Jul 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion KestrelHttpServer.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
VisualStudioVersion = 14.0.22111.0
VisualStudioVersion = 14.0.22823.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "Microsoft.AspNet.Server.Kestrel", "src\Microsoft.AspNet.Server.Kestrel\Microsoft.AspNet.Server.Kestrel.xproj", "{F510611A-3BEE-4B88-A613-5F4A74ED82A1}"
EndProject
Expand All @@ -25,6 +25,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{D3273454-E
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE}"
EndProject
Project("{8BB2217D-0F2D-49D1-97BC-3654ED321F3B}") = "LargeResponseApp", "samples\LargeResponseApp\LargeResponseApp.xproj", "{B35D4D31-E74C-4646-8A11-7A7A40F0021E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -47,6 +49,10 @@ Global
{30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{30B7617E-58EF-4382-B3EA-5B2E718CF1A6}.Release|Any CPU.Build.0 = Release|Any CPU
{B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B35D4D31-E74C-4646-8A11-7A7A40F0021E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -56,5 +62,6 @@ Global
{37F3BFB2-6454-49E5-9D7F-581BF755CCFE} = {D3273454-EA07-41D2-BF0B-FCC3675C2483}
{2C3CB3DC-EEBF-4F52-9E1C-4F2F972E76C3} = {8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE}
{30B7617E-58EF-4382-B3EA-5B2E718CF1A6} = {2D5D5227-4DBD-499A-96B1-76A36B03B750}
{B35D4D31-E74C-4646-8A11-7A7A40F0021E} = {8A3D00B8-1CCF-4BE6-A060-11104CE2D9CE}
EndGlobalSection
EndGlobal
19 changes: 19 additions & 0 deletions samples/LargeResponseApp/LargeResponseApp.xproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VisualStudioVersion Condition="'$(VisualStudioVersion)' == ''">14.0</VisualStudioVersion>
<VSToolsPath Condition="'$(VSToolsPath)' == ''">$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)</VSToolsPath>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DNX\Microsoft.DNX.Props" Condition="'$(VSToolsPath)' != ''" />
<PropertyGroup Label="Globals">
<ProjectGuid>b35d4d31-e74c-4646-8a11-7a7a40f0021e</ProjectGuid>
<RootNamespace>LargeResponseApp</RootNamespace>
<BaseIntermediateOutputPath Condition="'$(BaseIntermediateOutputPath)'=='' ">..\..\artifacts\obj\$(MSBuildProjectName)</BaseIntermediateOutputPath>
<OutputPath Condition="'$(OutputPath)'=='' ">..\..\artifacts\bin\$(MSBuildProjectName)\</OutputPath>
</PropertyGroup>
<PropertyGroup>
<SchemaVersion>2.0</SchemaVersion>
<DevelopmentServerPort>42216</DevelopmentServerPort>
</PropertyGroup>
<Import Project="$(VSToolsPath)\DNX\Microsoft.DNX.targets" Condition="'$(VSToolsPath)' != ''" />
</Project>
3 changes: 3 additions & 0 deletions samples/LargeResponseApp/Microsoft.AspNet.Hosting.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

Server = Kestrel
Server.Urls = http://localhost:5001/
38 changes: 38 additions & 0 deletions samples/LargeResponseApp/Startup.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using Microsoft.AspNet.Builder;
using System.Text;
using System.Threading.Tasks;

namespace LargeResponseApp
{
public class Startup
{
private const int _chunkSize = 4096;
private const int _defaultNumChunks = 16;
private static byte[] _chunk = Encoding.UTF8.GetBytes(new string('a', _chunkSize));
private static Task _emptyTask = Task.FromResult<object>(null);

public void Configure(IApplicationBuilder app)
{
app.Run(async (context) =>
{
int numChunks;
var path = context.Request.Path;
if (!path.HasValue || !int.TryParse(path.Value.Substring(1), out numChunks))
{
numChunks = _defaultNumChunks;
}

context.Response.ContentLength = _chunkSize * numChunks;
context.Response.ContentType = "text/plain";

for (int i = 0; i < numChunks; i++)
{
await context.Response.Body.WriteAsync(_chunk, 0, _chunkSize);
}
});
}
}
}
16 changes: 16 additions & 0 deletions samples/LargeResponseApp/project.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version": "1.0.0-*",
"dependencies": {
"Kestrel": "1.0.0-*"
},

"frameworks": {
"dnx451": { },
"dnxcore50": { }
},

"commands": {
"run": "Kestrel",
"web": "Microsoft.AspNet.Hosting"
}
}
225 changes: 179 additions & 46 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,37 @@

using Microsoft.AspNet.Server.Kestrel.Networking;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;

namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class SocketOutput : ISocketOutput
{
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;

private readonly KestrelThread _thread;
private readonly UvStreamHandle _socket;

// This locks access to to all of the below fields
private readonly object _lockObj = new object();

// The number of write operations that have been scheduled so far
// but have not completed.
private int _writesPending = 0;

private int _numBytesPreCompleted = 0;
private Exception _lastWriteError;
private WriteContext _nextWriteContext;
private readonly Queue<CallbackContext> _callbacksPending;

public SocketOutput(KestrelThread thread, UvStreamHandle socket)
{
_thread = thread;
_socket = socket;
_callbacksPending = new Queue<CallbackContext>();
}

public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback, object state)
Expand All @@ -26,71 +44,186 @@ public void Write(ArraySegment<byte> buffer, Action<Exception, object> callback,
buffer = new ArraySegment<byte>(copy);

KestrelTrace.Log.ConnectionWrite(0, buffer.Count);
var req = new ThisWriteReq();
req.Init(_thread.Loop);
req.Contextualize(this, _socket, buffer, callback, state);
req.Write();

bool triggerCallbackNow = false;

lock (_lockObj)
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
}

_nextWriteContext.Buffers.Enqueue(buffer);

// Complete the write task immediately if all previous write tasks have been completed,
// the buffers haven't grown too large, and the last write to the socket succeeded.
triggerCallbackNow = _lastWriteError == null &&
_callbacksPending.Count == 0 &&
_numBytesPreCompleted + buffer.Count <= _maxBytesPreCompleted;
if (triggerCallbackNow)
{
_numBytesPreCompleted += buffer.Count;
}
else
{
_callbacksPending.Enqueue(new CallbackContext
{
Callback = callback,
State = state,
BytesToWrite = buffer.Count
});
}

if (_writesPending < _maxPendingWrites)
{
ScheduleWrite();
_writesPending++;
}
}

// Make sure we call user code outside of the lock.
if (triggerCallbackNow)
{
callback(null, state);
}
}

public class ThisWriteReq : UvWriteReq
private void ScheduleWrite()
{
SocketOutput _self;
ArraySegment<byte> _buffer;
UvStreamHandle _socket;
Action<Exception, object> _callback;
object _state;
Exception _callbackError;

internal void Contextualize(
SocketOutput socketOutput,
UvStreamHandle socket,
ArraySegment<byte> buffer,
Action<Exception, object> callback,
object state)
_thread.Post(obj =>
{
_self = socketOutput;
_socket = socket;
_buffer = buffer;
_callback = callback;
_state = state;
var self = (SocketOutput)obj;
self.WriteAllPending();
}, this);
}

// This is called on the libuv event loop
private void WriteAllPending()
{
WriteContext writingContext;

lock (_lockObj)
{
if (_nextWriteContext != null)
{
writingContext = _nextWriteContext;
_nextWriteContext = null;
}
else
{
_writesPending--;
return;
}
}

public void Write()
try
{
var buffers = new ArraySegment<byte>[writingContext.Buffers.Count];

var i = 0;
foreach (var buffer in writingContext.Buffers)
{
buffers[i++] = buffer;
}

var writeReq = new UvWriteReq();
writeReq.Init(_thread.Loop);

writeReq.Write(_socket, new ArraySegment<ArraySegment<byte>>(buffers), (r, status, error, state) =>
{
var writtenContext = (WriteContext)state;
writtenContext.Self.OnWriteCompleted(writtenContext.Buffers, r, status, error);
}, writingContext);
}
catch
{
_self._thread.Post(obj =>
lock (_lockObj)
{
var req = (ThisWriteReq)obj;
req.Write(
req._socket,
new ArraySegment<ArraySegment<byte>>(
new[] { req._buffer }),
(r, status, error, state) => ((ThisWriteReq)state).OnWrite(status, error),
req);
}, this);
// Lock instead of using Interlocked.Decrement so _writesSending
// doesn't change in the middle of executing other synchronized code.
_writesPending--;
}

throw;
}
}

private void OnWrite(int status, Exception error)
// This is called on the libuv event loop
private void OnWriteCompleted(Queue<ArraySegment<byte>> writtenBuffers, UvWriteReq req, int status, Exception error)
{
KestrelTrace.Log.ConnectionWriteCallback(0, status);

lock (_lockObj)
{
KestrelTrace.Log.ConnectionWriteCallback(0, status);
//NOTE: pool this?
_lastWriteError = error;

if (_nextWriteContext != null)
{
ScheduleWrite();
}
else
{
_writesPending--;
}

foreach (var writeBuffer in writtenBuffers)
{
// _numBytesPreCompleted can temporarily go negative in the event there are
// completed writes that we haven't triggered callbacks for yet.
_numBytesPreCompleted -= writeBuffer.Count;
}

Dispose();

// Get off the event loop before calling user code!
_callbackError = error;
ThreadPool.QueueUserWorkItem(obj =>
// bytesLeftToBuffer can be greater than _maxBytesPreCompleted
// This allows large writes to complete once they've actually finished.
var bytesLeftToBuffer = _maxBytesPreCompleted - _numBytesPreCompleted;
while (_callbacksPending.Count > 0 &&
_callbacksPending.Peek().BytesToWrite <= bytesLeftToBuffer)
{
var req = (ThisWriteReq)obj;
req._callback(req._callbackError, req._state);
}, this);
}
var callbackContext = _callbacksPending.Dequeue();

_numBytesPreCompleted += callbackContext.BytesToWrite;

TriggerCallback(callbackContext);
}

// Now that the while loop has completed the following invariants should hold true:
Trace.Assert(_numBytesPreCompleted >= 0);
Trace.Assert(_numBytesPreCompleted <= _maxBytesPreCompleted);
}

req.Dispose();
}

private void TriggerCallback(CallbackContext context)
{
context.Error = _lastWriteError;
ThreadPool.QueueUserWorkItem(obj =>
{
var c = (CallbackContext)obj;
c.Callback(c.Error, c.State);
}, context);
}

public bool Flush(Action drained)
private class CallbackContext
{
return false;
public Exception Error;
public Action<Exception, object> Callback;
public object State;
public int BytesToWrite;
}

private class WriteContext
{
public WriteContext(SocketOutput self)
{
Self = self;
Buffers = new Queue<ArraySegment<byte>>();
}

public SocketOutput Self;
public Queue<ArraySegment<byte>> Buffers;
}
}
}
Loading