Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

[For Testing] Go faster stripes MkII - Collated changes #309

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class FilteredStreamAdapter

public FilteredStreamAdapter(
Stream filteredStream,
MemoryPool2 memory,
MemoryPool memory,
IKestrelTrace logger)
{
SocketInput = new SocketInput(memory);
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Filter/LibuvStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public override void Write(byte[] buffer, int offset, int count)
_output.Write(segment);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
var segment = new ArraySegment<byte>(buffer, offset, count);
return _output.WriteAsync(segment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public override void Write(byte[] buffer, int offset, int count)
_socketInput.IncomingComplete(count, error: null);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Write(buffer, offset, count);
return _emptyTask;
Expand Down
14 changes: 7 additions & 7 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
{
public class Connection : ConnectionContext, IConnectionControl
{
private static readonly Action<UvStreamHandle, int, object> _readCallback = ReadCallback;
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = AllocCallback;
private static readonly Action<UvStreamHandle, int, object> _readCallback = (handle, status, state) => ReadCallback(handle, status, state);
private static readonly Func<UvStreamHandle, int, object, Libuv.uv_buf_t> _allocCallback = (handle, suggestedSize, state) => AllocCallback(handle, suggestedSize, state);

private static long _lastConnectionId;

Expand All @@ -39,8 +39,8 @@ public Connection(ListenerContext context, UvStreamHandle socket) : base(context

_connectionId = Interlocked.Increment(ref _lastConnectionId);

_rawSocketInput = new SocketInput(Memory2);
_rawSocketOutput = new SocketOutput(Thread, _socket, _connectionId, Log);
_rawSocketInput = new SocketInput(Memory);
_rawSocketOutput = new SocketOutput(Memory, Thread, _socket, _connectionId, Log);
}

public void Start()
Expand Down Expand Up @@ -83,12 +83,12 @@ public void Start()
if (task.IsFaulted)
{
connection.Log.LogError("ConnectionFilter.OnConnection", task.Exception);
ConnectionControl.End(ProduceEndType.SocketDisconnect);
connection.ConnectionControl.End(ProduceEndType.SocketDisconnect);
}
else if (task.IsCanceled)
{
connection.Log.LogError("ConnectionFilter.OnConnection Canceled");
ConnectionControl.End(ProduceEndType.SocketDisconnect);
connection.ConnectionControl.End(ProduceEndType.SocketDisconnect);
}
else
{
Expand All @@ -100,7 +100,7 @@ public void Start()

private void ApplyConnectionFilter()
{
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory2, Log);
var filteredStreamAdapter = new FilteredStreamAdapter(_filterContext.Connection, Memory, Log);

SocketInput = filteredStreamAdapter.SocketInput;
SocketOutput = filteredStreamAdapter.SocketOutput;
Expand Down
148 changes: 102 additions & 46 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ namespace Microsoft.AspNet.Server.Kestrel.Http
public partial class Frame : FrameContext, IFrameControl
{
private static readonly Encoding _ascii = Encoding.ASCII;
private static readonly ArraySegment<byte> _endChunkBytes = CreateAsciiByteArraySegment("\r\n");
private static readonly ArraySegment<byte> _endLineBytes = CreateAsciiByteArraySegment("\r\n");
private static readonly ArraySegment<byte> _endChunkBytes = _endLineBytes;
private static readonly ArraySegment<byte> _headerDelimiterBytes = CreateAsciiByteArraySegment(": ");
private static readonly ArraySegment<byte> _spaceBytes = CreateAsciiByteArraySegment(" ");
private static readonly ArraySegment<byte> _endChunkedResponseBytes = CreateAsciiByteArraySegment("0\r\n\r\n");
private static readonly ArraySegment<byte> _continueBytes = CreateAsciiByteArraySegment("HTTP/1.1 100 Continue\r\n\r\n");
private static readonly ArraySegment<byte> _contentLengthZeroBytes = CreateAsciiByteArraySegment("Content-Length: 0\r\n");
private static readonly ArraySegment<byte> _transferEncodingChunkedBytes = CreateAsciiByteArraySegment("Transfer-Encoding: chunked\r\n");
private static readonly ArraySegment<byte> _connectionCloseBytes = CreateAsciiByteArraySegment("Connection: close\r\n\r\n");
private static readonly ArraySegment<byte> _connectionKeepAliveBytes = CreateAsciiByteArraySegment("Connection: keep-alive\r\n\r\n");
private static readonly ArraySegment<byte> _emptyData = new ArraySegment<byte>(new byte[0]);
private static readonly byte[] _hex = Encoding.ASCII.GetBytes("0123456789abcdef");

private readonly object _onStartingSync = new Object();
private readonly object _onCompletedSync = new Object();
private readonly object _onStartingSync = new object();
private readonly object _onCompletedSync = new object();
private readonly FrameRequestHeaders _requestHeaders = new FrameRequestHeaders();
private readonly byte[] _nullBuffer = new byte[4096];
private readonly byte[] _scratchBuffer = new byte[4096];
private readonly FrameResponseHeaders _responseHeaders = new FrameResponseHeaders();

private List<KeyValuePair<Func<object, Task>, object>> _onStarting;
Expand Down Expand Up @@ -189,7 +196,7 @@ public async Task RequestProcessingAsync()
}
}

while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
while (!terminated && !_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _scratchBuffer))
{
terminated = SocketInput.RemoteIntakeFin;
if (!terminated)
Expand Down Expand Up @@ -217,6 +224,7 @@ public async Task RequestProcessingAsync()
}
finally
{
var FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length);
// Trigger OnStarting if it hasn't been called yet and the app hasn't
// already failed. If an OnStarting callback throws we can go through
// our normal error handling in ProduceEnd.
Expand All @@ -232,9 +240,10 @@ public async Task RequestProcessingAsync()

await ProduceEnd();

while (await RequestBody.ReadAsync(_nullBuffer, 0, _nullBuffer.Length) != 0)
while (await FlushTask != 0)
{
// Finish reading the request body in case the app did not.
FlushTask = RequestBody.ReadAsync(_scratchBuffer, 0, _scratchBuffer.Length);
}
}

Expand Down Expand Up @@ -471,19 +480,14 @@ public async Task ProduceStartAndFireOnStarting(bool immediate = true)
await ProduceStart(immediate, appCompleted: false);
}

private async Task ProduceStart(bool immediate, bool appCompleted)
private Task ProduceStart(bool immediate, bool appCompleted)
{
if (_responseStarted) return;
if (_responseStarted) return TaskUtilities.CompletedTask;
_responseStarted = true;

var status = ReasonPhrases.ToStatus(StatusCode, ReasonPhrase);

var responseHeader = CreateResponseHeader(status, appCompleted);

using (responseHeader.Item2)
{
await SocketOutput.WriteAsync(responseHeader.Item1, immediate: immediate);
}
return CreateResponseHeader(status, appCompleted, immediate);
}

private async Task ProduceEnd()
Expand Down Expand Up @@ -521,16 +525,54 @@ private async Task ProduceEnd()
}
}

private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
ArraySegment<byte> ShortAsciiToBytes(string input)
{

var scratch = _scratchBuffer;
var len = input.Length;

var i = 0;
for (; i < scratch.Length; i++)
{
if (i >= len)
{
break;
}
scratch[i] = (byte)input[i];
}
var buffer = new ArraySegment<byte>(scratch, 0, i);
return buffer;
}
bool LongAsciiToBytes(string input, int offset, out int newOffset, out ArraySegment<byte> buffer)
{
var scratch = _scratchBuffer;
var len = input.Length;

newOffset = offset;
var i = 0;
for (; i < scratch.Length; i++)
{
if (newOffset >= len)
{
break;
}
scratch[i] = (byte)input[newOffset];
newOffset++;
}

buffer = new ArraySegment<byte>(scratch, 0, i);
return newOffset < len;
}

private Task CreateResponseHeader(
string status,
bool appCompleted)
bool appCompleted,
bool immediate)
{
var writer = new MemoryPoolTextWriter(Memory);
writer.Write(HttpVersion);
writer.Write(' ');
writer.Write(status);
writer.Write('\r');
writer.Write('\n');
SocketOutput.Write(ShortAsciiToBytes(HttpVersion), immediate: false);
SocketOutput.Write(_spaceBytes, immediate: false);
SocketOutput.Write(ShortAsciiToBytes(status), immediate: false);
SocketOutput.Write(_endLineBytes, immediate: false);

var hasConnection = false;
var hasTransferEncoding = false;
Expand All @@ -555,21 +597,33 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
hasContentLength = true;
}

ArraySegment<byte> buffer;
int inputOffset;
foreach (var value in header.Value)
{
writer.Write(header.Key);
writer.Write(':');
writer.Write(' ');
writer.Write(value);
writer.Write('\r');
writer.Write('\n');
inputOffset = 0;
while (LongAsciiToBytes(header.Key, inputOffset, out inputOffset, out buffer))
{
SocketOutput.Write(buffer, immediate: false);
}
SocketOutput.Write(buffer, immediate: false);

SocketOutput.Write(_headerDelimiterBytes, immediate: false);

inputOffset = 0;
while (LongAsciiToBytes(value, inputOffset, out inputOffset, out buffer))
{
SocketOutput.Write(buffer, immediate: false);
}
SocketOutput.Write(buffer, immediate: false);

SocketOutput.Write(_endLineBytes, immediate: false);

if (isConnection && value.IndexOf("close", StringComparison.OrdinalIgnoreCase) != -1)
{
_keepAlive = false;
}
}

}

if (_keepAlive && !hasTransferEncoding && !hasContentLength)
Expand All @@ -582,15 +636,15 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(
{
// Since the app has completed and we are only now generating
// the headers we can safely set the Content-Length to 0.
writer.Write("Content-Length: 0\r\n");
SocketOutput.Write(_contentLengthZeroBytes, immediate: false);
}
}
else
{
if (HttpVersion == "HTTP/1.1")
{
_autoChunk = true;
writer.Write("Transfer-Encoding: chunked\r\n");
SocketOutput.Write(_transferEncodingChunkedBytes, immediate: false);
}
else
{
Expand All @@ -601,19 +655,17 @@ private Tuple<ArraySegment<byte>, IDisposable> CreateResponseHeader(

if (_keepAlive == false && hasConnection == false && HttpVersion == "HTTP/1.1")
{
writer.Write("Connection: close\r\n\r\n");
return SocketOutput.WriteAsync(_connectionCloseBytes, immediate: true);
}
else if (_keepAlive && hasConnection == false && HttpVersion == "HTTP/1.0")
{
writer.Write("Connection: keep-alive\r\n\r\n");
return SocketOutput.WriteAsync(_connectionKeepAliveBytes, immediate: true);
}
else
{
writer.Write('\r');
writer.Write('\n');
return SocketOutput.WriteAsync(_endLineBytes, immediate: true);
}
writer.Flush();
return new Tuple<ArraySegment<byte>, IDisposable>(writer.Buffer, writer);

}

private bool TakeStartLine(SocketInput input)
Expand All @@ -627,7 +679,7 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
var method = begin.GetString(scan);
var method = begin.GetAsciiString(ref scan);

scan.Take();
begin = scan;
Expand All @@ -651,7 +703,7 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
queryString = begin.GetString(scan);
queryString = begin.GetAsciiString(ref scan);
}

scan.Take();
Expand All @@ -660,21 +712,25 @@ private bool TakeStartLine(SocketInput input)
{
return false;
}
var httpVersion = begin.GetString(scan);
var httpVersion = begin.GetAsciiString(ref scan);

scan.Take();
if (scan.Take() != '\n')
{
return false;
}

string requestUrlPath;
if (needDecode)
{
pathEnd = UrlPathDecoder.Unescape(pathBegin, pathEnd);
requestUrlPath = pathBegin.GetUtf8String(ref pathEnd);
}

var requestUrlPath = pathBegin.GetString(pathEnd);

else
{
requestUrlPath = pathBegin.GetAsciiString(ref pathEnd);
}

consumed = scan;
Method = method;
RequestUri = requestUrlPath;
Expand All @@ -694,7 +750,7 @@ static string GetString(ArraySegment<byte> range, int startIndex, int endIndex)
return Encoding.UTF8.GetString(range.Array, range.Offset + startIndex, endIndex - startIndex);
}

public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders)
public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders, byte[] scratchBuffer)
{
var scan = input.ConsumingStart();
var consumed = scan;
Expand Down Expand Up @@ -784,8 +840,8 @@ public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders req
continue;
}

var name = beginName.GetArraySegment(endName);
var value = beginValue.GetString(endValue);
var name = beginName.GetArraySegment(scratchBuffer, ref endName);
var value = beginValue.GetAsciiString(ref endValue);
if (wrapping)
{
value = value.Replace("\r\n", " ");
Expand Down
Loading