Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Resuse writes, initalize queues #363

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 92 additions & 18 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SocketOutput : ISocketOutput
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;

private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);
private static MemoryPoolIterator2 _defaultIterator;
Expand All @@ -39,6 +40,7 @@ public class SocketOutput : ISocketOutput

// This locks access to to all of the below fields
private readonly object _contextLock = new object();
private bool _isDisposed = false;

// The number of write operations that have been scheduled so far
// but have not completed.
Expand All @@ -49,6 +51,7 @@ public class SocketOutput : ISocketOutput
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<WriteContext> _writeContextPool;

public SocketOutput(
KestrelThread thread,
Expand All @@ -67,6 +70,7 @@ public SocketOutput(
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);

_head = memory.Lease();
_tail = _head;
Expand All @@ -93,7 +97,14 @@ public Task WriteAsync(
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
if (_writeContextPool.Count > 0)
{
_nextWriteContext = _writeContextPool.Dequeue();
}
else
{
_nextWriteContext = new WriteContext(this);
}
}

if (socketShutdownSend)
Expand Down Expand Up @@ -275,9 +286,12 @@ private void WriteAllPending()
}

// This is called on the libuv event loop
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
private void OnWriteCompleted(WriteContext writeContext)
{
_log.ConnectionWriteCallback(_connectionId, status);
var bytesWritten = writeContext.ByteCount;
var status = writeContext.WriteStatus;
var error = writeContext.WriteError;


if (error != null)
{
Expand All @@ -291,6 +305,7 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)

lock (_contextLock)
{
PoolWriteContext(writeContext);
if (_nextWriteContext != null)
{
scheduleWrite = true;
Expand Down Expand Up @@ -333,11 +348,11 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
}
}

_log.ConnectionWriteCallback(_connectionId, status);

if (scheduleWrite)
{
// ScheduleWrite();
// on right thread, fairness issues?
WriteAllPending();
ScheduleWrite();
}

_tasksCompleted.Clear();
Expand Down Expand Up @@ -368,6 +383,32 @@ private void ReturnAllBlocks()
}
}

private void PoolWriteContext(WriteContext writeContext)
{
// called inside _contextLock
if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
}
else
{
writeContext.Dispose();
}
}

private void Dispose()
{
lock (_contextLock)
{
_isDisposed = true;
while (_writeContextPool.Count > 0)
{
_writeContextPool.Dequeue().Dispose();
}
}
}

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
Expand Down Expand Up @@ -409,14 +450,14 @@ private static void BytesBetween(MemoryPoolIterator2 start, MemoryPoolIterator2
buffers++;
}

private class WriteContext
private class WriteContext : IDisposable
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);

private MemoryPoolIterator2 _lockedStart;
private MemoryPoolIterator2 _lockedEnd;
private int _bufferCount;
private int _byteCount;
public int ByteCount;

public SocketOutput Self;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovered and fixed race here; don't know if was with these changes or is in current build.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch.

If you just want to avoid the NullRefEx, you should do the following:

var newHead = _lockedEnd.Block;
newHead.Start = _lockedEnd.Index;

if (Self._head != null)
{
    Self._head = newHead;
}

With the current change, you could still get a NullRefEx if _head is set to null between the if comparison and the actual assignment to Self._head.Start. This makes the race a lot tighter than it was before, but it's still there.

To be safe, all modifications to _head should be made with the _returnLock. This fixes the race because _head is only set to null when the _returnLock is taken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait a minute! I think this race must have been caused by a change in this PR

I was wondering how I didn't see this was a race when I originally wrote this code, and now I'm pretty sure it's because _head is only ever supposed to modified in the SocketOutput ctor and on the libuv thread. If that's the case, this race should be impossible.

@benaadams Do you have any idea how _head would be modified off of the libuv thread with this change? I'm not seeing it yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All I can think is the Write callback actually completes before the next lines are run? Was going to move the set above the Write but didn't feel comfortable enough with the code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benaadams You don't want to move the set above the write in case write throws. If that's the case, we still want to ensure that the blocks get returned to the pool eventually.

Even if the write callback gets executed inline, Self._head should never be null immediately after this line: Self._head = _lockedEnd.Block.

We already know _lockedEnd.Block can't be null because LockWrite would exit early leaving ByteCount at zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, ok it is because the Write callback is completing before the rest of DoWriteIfNeeded so OnWriteCompleted is called before _head is set to _lockedEnd.Block

callstack

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhhhh.... its the behaviour of MockLibuv that's causing this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it; however gone back to the scheduled write; which means the next write needs to wait for a loop of libuv to complete; so is throttled, due to the way the queues work (switching per loop)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added throttle fix to #427

Expand All @@ -426,11 +467,15 @@ private class WriteContext
public int WriteStatus;
public Exception WriteError;

private UvWriteReq _writeReq;

public int ShutdownSendStatus;

public WriteContext(SocketOutput self)
{
Self = self;
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}

/// <summary>
Expand All @@ -440,27 +485,28 @@ public void DoWriteIfNeeded()
{
LockWrite();

if (_byteCount == 0 || Self._socket.IsClosed)
if (ByteCount == 0 || Self._socket.IsClosed)
{
DoShutdownIfNeeded();
return;
}

var writeReq = new UvWriteReq(Self._log);
writeReq.Init(Self._thread.Loop);
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;

writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
_this.WriteError = error;
_this.DoShutdownIfNeeded();
}, this);

Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
Self._head = lockedEndBlock;
Self._head.Start = lockedEndIndex;
}

/// <summary>
Expand Down Expand Up @@ -493,21 +539,28 @@ public void DoShutdownIfNeeded()
/// </summary>
public void DoDisconnectIfNeeded()
{
if (SocketDisconnect == false || Self._socket.IsClosed)
if (SocketDisconnect == false)
{
Complete();
return;
}
else if (Self._socket.IsClosed)
{
Self.Dispose();
Complete();
return;
}

Self._socket.Dispose();
Self.ReturnAllBlocks();
Self.Dispose();
Self._log.ConnectionStop(Self._connectionId);
Complete();
}

public void Complete()
{
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
Self.OnWriteCompleted(this);
}

private void ScheduleReturnFullyWrittenBlocks()
Expand Down Expand Up @@ -556,8 +609,29 @@ private void LockWrite()

_lockedStart = new MemoryPoolIterator2(head, head.Start);
_lockedEnd = new MemoryPoolIterator2(tail, tail.End);

BytesBetween(_lockedStart, _lockedEnd, out ByteCount, out _bufferCount);
}

BytesBetween(_lockedStart, _lockedEnd, out _byteCount, out _bufferCount);
public void Reset()
{
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;

SocketShutdownSend = false;
SocketDisconnect = false;

WriteStatus = 0;
WriteError = null;

ShutdownSendStatus = 0;
}

public void Dispose()
{
_writeReq.Dispose();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ namespace Microsoft.AspNet.Server.Kestrel
/// </summary>
public class KestrelThread
{
// maximum times the work queues swapped and are processed in a single pass
// as completing a task may immediately have write data to put on the network
// otherwise it needs to wait till the next pass of the libuv loop
private const int _maxLoops = 8;

private static Action<object, object> _threadCallbackAdapter = (callback, state) => ((Action<KestrelThread>)callback).Invoke((KestrelThread)state);
private KestrelEngine _engine;
private readonly IApplicationLifetime _appLifetime;
private Thread _thread;
private UvLoopHandle _loop;
private UvAsyncHandle _post;
private Queue<Work> _workAdding = new Queue<Work>();
private Queue<Work> _workRunning = new Queue<Work>();
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>();
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>();
private Queue<Work> _workAdding = new Queue<Work>(1024);
private Queue<Work> _workRunning = new Queue<Work>(1024);
private Queue<CloseHandle> _closeHandleAdding = new Queue<CloseHandle>(256);
private Queue<CloseHandle> _closeHandleRunning = new Queue<CloseHandle>(256);
private object _workSync = new Object();
private bool _stopImmediate = false;
private bool _initCompleted = false;
Expand Down Expand Up @@ -249,11 +254,17 @@ private void ThreadStart(object parameter)

private void OnPost()
{
DoPostWork();
DoPostCloseHandle();
var loopsRemaining = _maxLoops;
bool wasWork;
do
{
wasWork = DoPostWork();
wasWork = DoPostCloseHandle() || wasWork;
loopsRemaining--;
} while (wasWork && loopsRemaining > 0);
}

private void DoPostWork()
private bool DoPostWork()
{
Queue<Work> queue;
lock (_workSync)
Expand All @@ -262,6 +273,9 @@ private void DoPostWork()
_workAdding = _workRunning;
_workRunning = queue;
}

bool wasWork = queue.Count > 0;

while (queue.Count != 0)
{
var work = queue.Dequeue();
Expand All @@ -286,8 +300,10 @@ private void DoPostWork()
}
}
}

return wasWork;
}
private void DoPostCloseHandle()
private bool DoPostCloseHandle()
{
Queue<CloseHandle> queue;
lock (_workSync)
Expand All @@ -296,6 +312,9 @@ private void DoPostCloseHandle()
_closeHandleAdding = _closeHandleRunning;
_closeHandleRunning = queue;
}

bool wasWork = queue.Count > 0;

while (queue.Count != 0)
{
var closeHandle = queue.Dequeue();
Expand All @@ -309,6 +328,8 @@ private void DoPostCloseHandle()
throw;
}
}

return wasWork;
}

private struct Work
Expand Down
4 changes: 2 additions & 2 deletions src/Microsoft.AspNet.Server.Kestrel/Networking/UvWriteReq.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ namespace Microsoft.AspNet.Server.Kestrel.Networking
/// </summary>
public class UvWriteReq : UvRequest
{
private readonly static Libuv.uv_write_cb _uv_write_cb = UvWriteCb;
private readonly static Libuv.uv_write_cb _uv_write_cb = (IntPtr ptr, int status) => UvWriteCb(ptr, status);

private IntPtr _bufs;

private Action<UvWriteReq, int, Exception, object> _callback;
private object _state;
private const int BUFFER_COUNT = 4;

private List<GCHandle> _pins = new List<GCHandle>();
private List<GCHandle> _pins = new List<GCHandle>(BUFFER_COUNT + 1);

public UvWriteReq(IKestrelTrace logger) : base(logger)
{
Expand Down