Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Simplify SocketInput, remove lock, only use pooled blocks #525

Merged
merged 2 commits into from
Jan 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ public override void SetLength(long value)

public override void Write(byte[] buffer, int offset, int count)
{
var inputBuffer = _socketInput.IncomingStart(count);

Buffer.BlockCopy(buffer, offset, inputBuffer.Data.Array, inputBuffer.Data.Offset, count);

_socketInput.IncomingComplete(count, error: null);
_socketInput.IncomingData(buffer, offset, count);
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token)
Expand All @@ -90,7 +86,7 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
protected override void Dispose(bool disposing)
{
// Close _socketInput with a fake zero-length write that will result in a zero-length read.
_socketInput.IncomingComplete(0, error: null);
_socketInput.IncomingData(null, 0, 0);
base.Dispose(disposing);
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggested

private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = _rawSocketInput.IncomingStart(2048);
var result = _rawSocketInput.IncomingStart();

return handle.Libuv.buf_init(
result.DataPtr,
result.Data.Count);
result.Pin() + result.End,
result.Data.Offset + result.Data.Count - result.End);
}

private static void ReadCallback(UvStreamHandle handle, int status, object state)
Expand Down
190 changes: 83 additions & 107 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;

namespace Microsoft.AspNet.Server.Kestrel.Http
Expand All @@ -25,7 +24,6 @@ public class SocketInput : ICriticalNotifyCompletion
private MemoryPoolBlock2 _head;
private MemoryPoolBlock2 _tail;
private MemoryPoolBlock2 _pinned;
private readonly object _sync = new Object();

public SocketInput(MemoryPool2 memory, IThreadPool threadPool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love it when locks can be removed 👍 Are you 100% positive this won't introduce any race conditions?

{
Expand All @@ -34,99 +32,98 @@ public SocketInput(MemoryPool2 memory, IThreadPool threadPool)
_awaitableState = _awaitableIsNotCompleted;
}

public ArraySegment<byte> Buffer { get; set; }

public bool RemoteIntakeFin { get; set; }

public bool IsCompleted
public bool IsCompleted => (_awaitableState == _awaitableIsCompleted);

public MemoryPoolBlock2 IncomingStart()
{
get
const int minimumSize = 2048;

if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
{
return Equals(_awaitableState, _awaitableIsCompleted);
_pinned = _tail;
}
else
{
_pinned = _memory.Lease();
}
}

public void Skip(int count)
{
Buffer = new ArraySegment<byte>(Buffer.Array, Buffer.Offset + count, Buffer.Count - count);
}

public ArraySegment<byte> Take(int count)
{
var taken = new ArraySegment<byte>(Buffer.Array, Buffer.Offset, count);
Skip(count);
return taken;
return _pinned;
}

public IncomingBuffer IncomingStart(int minimumSize)
public void IncomingData(byte[] buffer, int offset, int count)
{
lock (_sync)
if (count > 0)
{
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
if (_tail == null)
{
_pinned = _tail;
var data = new ArraySegment<byte>(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End);
var dataPtr = _pinned.Pin() + _pinned.End;
return new IncomingBuffer
{
Data = data,
DataPtr = dataPtr,
};
_tail = _memory.Lease();
}
}

_pinned = _memory.Lease(minimumSize);
return new IncomingBuffer
var iterator = new MemoryPoolIterator2(_tail, _tail.End);
iterator.CopyFrom(buffer, offset, count);

if (_head == null)
{
_head = _tail;
}

_tail = iterator.Block;
}
else
{
Data = _pinned.Data,
DataPtr = _pinned.Pin() + _pinned.End
};
RemoteIntakeFin = true;
}

Complete();
}

public void IncomingComplete(int count, Exception error)
{
Action awaitableState;

lock (_sync)
// Unpin may called without an earlier Pin
if (_pinned != null)
{
// Unpin may called without an earlier Pin
if (_pinned != null)

_pinned.End += count;

if (_head == null)
{
_pinned.Unpin();

_pinned.End += count;
if (_head == null)
{
_head = _tail = _pinned;
}
else if (_tail == _pinned)
{
// NO-OP: this was a read into unoccupied tail-space
}
else
{
_tail.Next = _pinned;
_tail = _pinned;
}
_head = _tail = _pinned;
}
_pinned = null;

if (count == 0)
else if (_tail == _pinned)
{
RemoteIntakeFin = true;
// NO-OP: this was a read into unoccupied tail-space
}
if (error != null)
else
{
_awaitableError = error;
_tail.Next = _pinned;
_tail = _pinned;
}

awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);
_pinned = null;
}

_manualResetEvent.Set();
if (count == 0)
{
RemoteIntakeFin = true;
}
if (error != null)
{
_awaitableError = error;
}

Complete();
}

private void Complete()
{
var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);

_manualResetEvent.Set();

if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
Expand All @@ -136,10 +133,7 @@ public void IncomingComplete(int count, Exception error)

public MemoryPoolIterator2 ConsumingStart()
{
lock (_sync)
{
return new MemoryPoolIterator2(_head);
}
return new MemoryPoolIterator2(_head);
}

public void ConsumingComplete(
Expand All @@ -148,51 +142,39 @@ public void ConsumingComplete(
{
MemoryPoolBlock2 returnStart = null;
MemoryPoolBlock2 returnEnd = null;
lock (_sync)
if (!consumed.IsDefault)
{
if (!consumed.IsDefault)
{
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();
returnStart = _head;
returnEnd = consumed.Block;
_head = consumed.Block;
_head.Start = consumed.Index;
}
if (!examined.IsDefault &&
examined.IsEnd &&
RemoteIntakeFin == false &&
_awaitableError == null)
{
_manualResetEvent.Reset();

var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}
var awaitableState = Interlocked.CompareExchange(
ref _awaitableState,
_awaitableIsNotCompleted,
_awaitableIsCompleted);
}

while (returnStart != returnEnd)
{
var returnBlock = returnStart;
returnStart = returnStart.Next;
returnBlock.Pool?.Return(returnBlock);
returnBlock.Pool.Return(returnBlock);
}
}

public void AbortAwaiting()
{
_awaitableError = new ObjectDisposedException(nameof(SocketInput), "The request was aborted");

var awaitableState = Interlocked.Exchange(
ref _awaitableState,
_awaitableIsCompleted);

_manualResetEvent.Set();

if (awaitableState != _awaitableIsCompleted &&
awaitableState != _awaitableIsNotCompleted)
{
_threadPool.Run(awaitableState);
}
Complete();
}

public SocketInput GetAwaiter()
Expand Down Expand Up @@ -247,11 +229,5 @@ public void GetResult()
throw new IOException(error.Message, error);
}
}

public struct IncomingBuffer
{
public ArraySegment<byte> Data;
public IntPtr DataPtr;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;

namespace Microsoft.AspNet.Server.Kestrel.Http
{
public static class SocketInputExtensions
{
public static ValueTask<int> ReadAsync(this SocketInput input, byte[] buffer, int offset, int count)
{
while (true)
while (input.IsCompleted)
{
if (!input.IsCompleted)
{
return input.ReadAsyncAwaited(buffer, offset, count);
}

var begin = input.ConsumingStart();

int actual;
var end = begin.CopyTo(buffer, offset, count, out actual);
input.ConsumingComplete(end, end);
Expand All @@ -32,6 +26,8 @@ public static ValueTask<int> ReadAsync(this SocketInput input, byte[] buffer, in
return 0;
}
}

return input.ReadAsyncAwaited(buffer, offset, count);
}

private static async Task<int> ReadAsyncAwaited(this SocketInput input, byte[] buffer, int offset, int count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ public static class TaskUtilities
#else
public static Task CompletedTask = Task.FromResult<object>(null);
#endif
public static Task<int> ZeroTask = Task.FromResult(0);
}
}
4 changes: 1 addition & 3 deletions test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders)
var headerCollection = new FrameRequestHeaders();

var headerArray = Encoding.ASCII.GetBytes(rawHeaders);
var inputBuffer = socketInput.IncomingStart(headerArray.Length);
Buffer.BlockCopy(headerArray, 0, inputBuffer.Data.Array, inputBuffer.Data.Offset, headerArray.Length);
socketInput.IncomingComplete(headerArray.Length, null);
socketInput.IncomingData(headerArray, 0, headerArray.Length);

var success = Frame.TakeMessageHeaders(socketInput, headerCollection);

Expand Down
7 changes: 2 additions & 5 deletions test/Microsoft.AspNet.Server.KestrelTests/TestInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ public TestInput()

public void Add(string text, bool fin = false)
{
var encoding = System.Text.Encoding.ASCII;
var count = encoding.GetByteCount(text);
var buffer = FrameContext.SocketInput.IncomingStart(text.Length);
count = encoding.GetBytes(text, 0, text.Length, buffer.Data.Array, buffer.Data.Offset);
FrameContext.SocketInput.IncomingComplete(count, null);
var data = System.Text.Encoding.ASCII.GetBytes(text);
FrameContext.SocketInput.IncomingData(data, 0, data.Length);
if (fin)
{
FrameContext.SocketInput.RemoteIntakeFin = true;
Expand Down