Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Only use pooled buffers #402

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public override void SetLength(long value)

public override void Write(byte[] buffer, int offset, int count)
{
var inputBuffer = _socketInput.IncomingStart(count);

Buffer.BlockCopy(buffer, offset, inputBuffer.Data.Array, inputBuffer.Data.Offset, count);
_socketInput.IncomingStart(buffer, offset, count);

_socketInput.IncomingComplete(count, error: null);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Microsoft.AspNet.Server.Kestrel/Http/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ private static Libuv.uv_buf_t AllocCallback(UvStreamHandle handle, int suggested

private Libuv.uv_buf_t OnAlloc(UvStreamHandle handle, int suggestedSize)
{
var result = _rawSocketInput.IncomingStart(2048);
var result = _rawSocketInput.IncomingRawStart();

return handle.Libuv.buf_init(
result.DataPtr,
Expand Down
5 changes: 1 addition & 4 deletions src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ public class ListenerContext : ServiceContext
{
public ListenerContext()
{
Memory2 = new MemoryPool2();
}

public ListenerContext(ServiceContext serviceContext)
: base(serviceContext)
{
Memory2 = new MemoryPool2();
}

public ListenerContext(ListenerContext listenerContext)
Expand All @@ -25,7 +23,6 @@ public ListenerContext(ListenerContext listenerContext)
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Application = listenerContext.Application;
Memory2 = listenerContext.Memory2;
Log = listenerContext.Log;
}

Expand All @@ -35,6 +32,6 @@ public ListenerContext(ListenerContext listenerContext)

public RequestDelegate Application { get; set; }

public MemoryPool2 Memory2 { get; set; }
public MemoryPool2 Memory2 => Thread.Memory2;
}
}
50 changes: 45 additions & 5 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketInput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,52 @@ public ArraySegment<byte> Take(int count)
return taken;
}

public IncomingBuffer IncomingStart(int minimumSize)
public void IncomingStart(byte[] buffer, int offset, int count)
{
var remaining = count;
lock (_sync)
{
if (_tail != null && minimumSize <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
if (_tail == null)
{
_tail = _memory.Lease();
}
if (_head == null)
{
_head = _tail;
}

var blockRemaining = _tail.Data.Offset + _tail.Data.Count - _tail.End;
while (remaining > 0)
{
var copyAmount = blockRemaining >= remaining ? remaining : blockRemaining;
if (buffer != null)
{
System.Buffer.BlockCopy(buffer, offset, _tail.Data.Array, _tail.End, copyAmount);
}
remaining -= copyAmount;
blockRemaining -= copyAmount;
_tail.End += copyAmount;

if (blockRemaining == 0)
{
var block = _memory.Lease();
_tail.Next = block;
_tail = block;
blockRemaining = block.Data.Count;
}
else
{
offset += copyAmount;
}
}
}
}

public IncomingBuffer IncomingRawStart()
{
lock (_sync)
{
if (_tail != null && 2048 <= _tail.Data.Offset + _tail.Data.Count - _tail.End)
{
_pinned = _tail;
var data = new ArraySegment<byte>(_pinned.Data.Array, _pinned.End, _pinned.Data.Offset + _pinned.Data.Count - _pinned.End);
Expand All @@ -73,14 +114,15 @@ public IncomingBuffer IncomingStart(int minimumSize)
}
}

_pinned = _memory.Lease(minimumSize);
_pinned = _memory.Lease();
return new IncomingBuffer
{
Data = _pinned.Data,
DataPtr = _pinned.Pin()
};
}


public void IncomingComplete(int count, Exception error)
{
Action awaitableState;
Expand All @@ -90,8 +132,6 @@ public void IncomingComplete(int count, Exception error)
// Unpin may called without an earlier Pin
if (_pinned != null)
{
_pinned.Unpin();

_pinned.End += count;
if (_head == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class KestrelThread
public KestrelThread(KestrelEngine engine)
{
_engine = engine;
Memory2 = new MemoryPool2();
_appLifetime = engine.AppLifetime;
_log = engine.Log;
_loop = new UvLoopHandle(_log);
Expand All @@ -46,6 +47,9 @@ public KestrelThread(KestrelEngine engine)
}

public UvLoopHandle Loop { get { return _loop; } }

public MemoryPool2 Memory2 { get; }

public ExceptionDispatchInfo FatalError { get { return _closeError; } }

public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; internal set; }
Expand All @@ -61,6 +65,7 @@ public void Stop(TimeSpan timeout)
{
if (!_initCompleted)
{
Memory2.Dispose();
return;
}

Expand Down Expand Up @@ -94,6 +99,8 @@ public void Stop(TimeSpan timeout)
}
}

Memory2.Dispose();

if (_closeError != null)
{
_closeError.Throw();
Expand Down
69 changes: 30 additions & 39 deletions src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;

namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
{
Expand Down Expand Up @@ -48,32 +49,19 @@ public class MemoryPool2 : IDisposable
/// </summary>
private readonly ConcurrentStack<MemoryPoolSlab2> _slabs = new ConcurrentStack<MemoryPoolSlab2>();

/// <summary>
/// This is part of implementing the IDisposable pattern.
/// </summary>
private bool _disposedValue = false; // To detect redundant calls
public MemoryPool2()
{

}

/// <summary>
/// Called to take a block from the pool.
/// </summary>
/// <param name="minimumSize">The block returned must be at least this size. It may be larger than this minimum size, and if so,
/// the caller may write to the block's entire size rather than being limited to the minumumSize requested.</param>
/// <returns>The block that is reserved for the called. It must be passed to Return when it is no longer being used.</returns>
public MemoryPoolBlock2 Lease(int minimumSize)
public MemoryPoolBlock2 Lease()
{
if (minimumSize > _blockLength)
{
// The requested minimumSize is actually larger then the usable memory of a single block.
// Because this is the degenerate case, a one-time-use byte[] array and tracking object are allocated.
// When this block tracking object is returned it is not added to the pool - instead it will be
// allowed to be garbage collected normally.
return MemoryPoolBlock2.Create(
new ArraySegment<byte>(new byte[minimumSize]),
dataPtr: IntPtr.Zero,
pool: null,
slab: null);
}

MemoryPoolBlock2 block;
if (_blocks.TryDequeue(out block))
{
Expand Down Expand Up @@ -131,45 +119,48 @@ private MemoryPoolBlock2 AllocateSlab()
/// <param name="block">The block to return. It must have been acquired by calling Lease on the same memory pool instance.</param>
public void Return(MemoryPoolBlock2 block)
{
if (block.Pool == null)
{
return;
}

Debug.Assert(block.Pool == this, "Block returned to wrong pool!");

block.Reset();
_blocks.Enqueue(block);
}

protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
MemoryPoolSlab2 slab;
while (_slabs.TryPop(out slab))
{
if (disposing)
{
MemoryPoolSlab2 slab;
while (_slabs.TryPop(out slab))
{
// dispose managed state (managed objects).
slab.Dispose();
}
}

// N/A: free unmanaged resources (unmanaged objects) and override a finalizer below.

// N/A: set large fields to null.
// dispose managed state (managed objects).
slab.Dispose();
}

_disposedValue = true;
MemoryPoolBlock2 block;
while (_blocks.TryDequeue(out block))
{
// Deactivate finalizers
block.Dispose();
}
}

// N/A: override a finalizer only if Dispose(bool disposing) above has code to free unmanaged resources.
// ~MemoryPool2() {
// // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
// Dispose(false);
// }
~MemoryPool2()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(false);
}

// This code added to correctly implement the disposable pattern.
public void Dispose()
{
// Do not change this code. Put cleanup code in Dispose(bool disposing) above.
Dispose(true);
// N/A: uncomment the following line if the finalizer is overridden above.
// GC.SuppressFinalize(this);

GC.SuppressFinalize(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ protected MemoryPoolBlock2()
/// </summary>
public MemoryPoolBlock2 Next { get; set; }

#if DEBUG
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a Debug.Assert for when Slab != null to ensure that we are always returning the block (in our tests at least).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also probably shouldn't add blocks that aren't pooled to the pool in Return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed non-pooled blocks instead (from Lease)

// See http://www.philosophicalgeek.com/2014/09/29/digging-into-net-object-allocation-fundamentals/
// for the cost of including a finalizer
~MemoryPoolBlock2()
{
Debug.Assert(!_pinHandle.IsAllocated, "Ad-hoc memory block wasn't unpinned");
// Debug.Assert(Slab == null || !Slab.IsActive, "Block being garbage collected instead of returned to pool");
Debug.Assert(Slab == null || !Slab.IsActive, "Block being garbage collected instead of returned to pool");

if (_pinHandle.IsAllocated)
{
Expand All @@ -96,37 +99,23 @@ protected MemoryPoolBlock2()
});
}
}
internal void Dispose()
{
GC.SuppressFinalize(this);
}
#endif

/// <summary>
/// Called to ensure that a block is pinned, and return the pointer to native memory just after
/// the range of "active" bytes. This is where arriving data is read into.
/// </summary>
/// <returns></returns>
public IntPtr Pin()
/// <summary>
/// Called to ensure that a block is pinned, and return the pointer to native memory just after
/// the range of "active" bytes. This is where arriving data is read into.
/// </summary>
/// <returns></returns>
public IntPtr Pin()
{
Debug.Assert(!_pinHandle.IsAllocated);

if (_dataArrayPtr != IntPtr.Zero)
{
// this is a slab managed block - use the native address of the slab which is always locked
return _dataArrayPtr + End;
}
else
{
// this is one-time-use memory - lock the managed memory until Unpin is called
_pinHandle = GCHandle.Alloc(Data.Array, GCHandleType.Pinned);
return _pinHandle.AddrOfPinnedObject() + End;
}
}

public void Unpin()
{
if (_dataArrayPtr == IntPtr.Zero)
{
// this is one-time-use memory - unlock the managed memory
Debug.Assert(_pinHandle.IsAllocated);
_pinHandle.Free();
}
// this is a slab managed block - use the native address of the slab which is always locked
return _dataArrayPtr + End;
}

public static MemoryPoolBlock2 Create(
Expand Down
29 changes: 16 additions & 13 deletions test/Microsoft.AspNet.Server.KestrelTests/FrameTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void ChunkedPrefixMustBeHexCrLfWithoutLeadingZeros(int dataCount, string

Assert.Equal(Encoding.ASCII.GetBytes(expected), beginChunkBytes.ToArray());
}

[Theory]
[InlineData("Cookie: \r\n\r\n", 1)]
[InlineData("Cookie:\r\n\r\n", 1)]
Expand All @@ -48,22 +48,25 @@ public void ChunkedPrefixMustBeHexCrLfWithoutLeadingZeros(int dataCount, string
[InlineData("Connection:\r\n \r\nCookie \r\n", 1)]
public void EmptyHeaderValuesCanBeParsed(string rawHeaders, int numHeaders)
{
var socketInput = new SocketInput(new MemoryPool2());
var headerCollection = new FrameRequestHeaders();
using (var memory = new MemoryPool2())
{
var socketInput = new SocketInput(memory);
var headerCollection = new FrameRequestHeaders();

var headerArray = Encoding.ASCII.GetBytes(rawHeaders);
socketInput.IncomingStart(headerArray, 0, headerArray.Length);

var headerArray = Encoding.ASCII.GetBytes(rawHeaders);
var inputBuffer = socketInput.IncomingStart(headerArray.Length);
Buffer.BlockCopy(headerArray, 0, inputBuffer.Data.Array, inputBuffer.Data.Offset, headerArray.Length);
socketInput.IncomingComplete(headerArray.Length, null);
socketInput.IncomingComplete(headerArray.Length, null);

var success = Frame.TakeMessageHeaders(socketInput, headerCollection);
var success = Frame.TakeMessageHeaders(socketInput, headerCollection);

Assert.True(success);
Assert.Equal(numHeaders, headerCollection.Count());
Assert.True(success);
Assert.Equal(numHeaders, headerCollection.Count());

// Assert TakeMessageHeaders consumed all the input
var scan = socketInput.ConsumingStart();
Assert.True(scan.IsEnd);
// Assert TakeMessageHeaders consumed all the input
var scan = socketInput.ConsumingStart();
Assert.True(scan.IsEnd);
}
}
}
}
Loading