Skip to content
This repository was archived by the owner on Dec 18, 2018. It is now read-only.

Partitioned memory pools #362

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/Microsoft.AspNet.Server.Kestrel/Http/ListenerContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@ public class ListenerContext : ServiceContext
{
public ListenerContext()
{
Memory2 = new MemoryPool2();
}

public ListenerContext(ServiceContext serviceContext)
: base(serviceContext)
{
Memory2 = new MemoryPool2();
}

public ListenerContext(ListenerContext listenerContext)
Expand All @@ -25,7 +23,6 @@ public ListenerContext(ListenerContext listenerContext)
ServerAddress = listenerContext.ServerAddress;
Thread = listenerContext.Thread;
Application = listenerContext.Application;
Memory2 = listenerContext.Memory2;
Log = listenerContext.Log;
}

Expand All @@ -35,6 +32,6 @@ public ListenerContext(ListenerContext listenerContext)

public RequestDelegate Application { get; set; }

public MemoryPool2 Memory2 { get; set; }
public MemoryPool2 Memory2 => Thread.Memory2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public KestrelThread(KestrelEngine engine)
_loop = new UvLoopHandle(_log);
_post = new UvAsyncHandle(_log);
_thread = new Thread(ThreadStart);
Memory2 = new MemoryPool2();
QueueCloseHandle = PostCloseHandle;
}

Expand All @@ -50,6 +51,8 @@ public KestrelThread(KestrelEngine engine)

public Action<Action<IntPtr>, IntPtr> QueueCloseHandle { get; internal set; }

public MemoryPool2 Memory2 { get; }

public Task StartAsync()
{
var tcs = new TaskCompletionSource<int>();
Expand Down
83 changes: 69 additions & 14 deletions src/Microsoft.AspNet.Server.Kestrel/Infrastructure/MemoryPool2.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Threading;

namespace Microsoft.AspNet.Server.Kestrel.Infrastructure
{
Expand Down Expand Up @@ -36,11 +37,21 @@ public class MemoryPool2 : IDisposable
/// </summary>
private const int _slabLength = _blockStride * _blockCount;

/// <summary>
/// Max allocation block size for pooled blocks,
/// larger values can be leased but they will be disposed after use rather than returned to the pool.
/// </summary>
public const int MaxPooledBlockLength = _blockLength;

// Processor count is a sys call https://github.com/dotnet/coreclr/blob/0e0ff9d17ab586f3cc7224dd33d8781cd77f3ca8/src/mscorlib/src/System/Environment.cs#L548
// Nor does it look like its constant https://github.com/dotnet/corefx/blob/master/src/System.Threading.Tasks.Parallel/src/System/Threading/PlatformHelper.cs#L23
public static int _partitionCount = Environment.ProcessorCount;

/// <summary>
/// Thread-safe collection of blocks which are currently in the pool. A slab will pre-allocate all of the block tracking objects
/// and add them to this collection. When memory is requested it is taken from here first, and when it is returned it is re-added.
/// </summary>
private readonly ConcurrentQueue<MemoryPoolBlock2> _blocks = new ConcurrentQueue<MemoryPoolBlock2>();
private readonly ConcurrentQueue<MemoryPoolBlock2>[] _blocks;

/// <summary>
/// Thread-safe collection of slabs which have been allocated by this pool. As long as a slab is in this collection and slab.IsActive,
Expand All @@ -53,42 +64,70 @@ public class MemoryPool2 : IDisposable
/// </summary>
private bool _disposedValue = false; // To detect redundant calls

public MemoryPool2()
{
_blocks = new ConcurrentQueue<MemoryPoolBlock2>[_partitionCount];
for (var i = 0; i < _blocks.Length; i++)
{
_blocks[i] = new ConcurrentQueue<MemoryPoolBlock2>();
}
}

public void PopulatePools()
{
for (var i = 0; i < _blocks.Length; i++)
{
// Allocate the inital set
Return(AllocateSlab(i));
}
}

/// <summary>
/// Called to take a block from the pool.
/// </summary>
/// <param name="minimumSize">The block returned must be at least this size. It may be larger than this minimum size, and if so,
/// the caller may write to the block's entire size rather than being limited to the minumumSize requested.</param>
/// <returns>The block that is reserved for the called. It must be passed to Return when it is no longer being used.</returns>
public MemoryPoolBlock2 Lease(int minimumSize)
public MemoryPoolBlock2 Lease(int minimumSize = MaxPooledBlockLength)
{
if (minimumSize > _blockLength)
{
// The requested minimumSize is actually larger then the usable memory of a single block.
// Because this is the degenerate case, a one-time-use byte[] array and tracking object are allocated.
// When this block tracking object is returned it is not added to the pool - instead it will be
// allowed to be garbage collected normally.
return MemoryPoolBlock2.Create(
new ArraySegment<byte>(new byte[minimumSize]),
dataPtr: IntPtr.Zero,
pool: null,
slab: null);
return MemoryPoolBlock2.Create(new ArraySegment<byte>(new byte[minimumSize]));
}

MemoryPoolBlock2 block;
if (_blocks.TryDequeue(out block))

var preferedPartition = Thread.CurrentThread.ManagedThreadId % _partitionCount;

if (_blocks[preferedPartition].TryDequeue(out block))
{
// block successfully taken from the stack - return it
return block;
}

// no block, steal block from another parition
for (var i = 1; i < _partitionCount; i++)
{
if (_blocks[(preferedPartition + i) % _partitionCount].TryDequeue(out block))
{
// block successfully taken from the stack - return it
return block;
}
}

// no blocks available - grow the pool
return AllocateSlab();
return AllocateSlab(preferedPartition);
}

/// <summary>
/// Internal method called when a block is requested and the pool is empty. It allocates one additional slab, creates all of the
/// block tracking objects, and adds them all to the pool.
/// </summary>
private MemoryPoolBlock2 AllocateSlab()
private MemoryPoolBlock2 AllocateSlab(int partition)
{
var slab = MemoryPoolSlab2.Create(_slabLength);
_slabs.Push(slab);
Expand All @@ -107,7 +146,8 @@ private MemoryPoolBlock2 AllocateSlab()
new ArraySegment<byte>(slab.Array, offset, _blockLength),
basePtr,
this,
slab);
slab,
partition);
Return(block);
}

Expand All @@ -116,7 +156,8 @@ private MemoryPoolBlock2 AllocateSlab()
new ArraySegment<byte>(slab.Array, offset, _blockLength),
basePtr,
this,
slab);
slab,
partition);

return newBlock;
}
Expand All @@ -131,8 +172,22 @@ private MemoryPoolBlock2 AllocateSlab()
/// <param name="block">The block to return. It must have been acquired by calling Lease on the same memory pool instance.</param>
public void Return(MemoryPoolBlock2 block)
{
block.Reset();
_blocks.Enqueue(block);
var owningPool = block.Pool;
if (owningPool == null)
{
// not pooled block, throw away
return;
}
if (owningPool != this)
{
throw new InvalidOperationException("Returning " + nameof(MemoryPoolBlock2) + " to incorrect pool.");
}
if (owningPool == this)
{
block.Reset();
// return to owning parition
_blocks[block.Partition].Enqueue(block);
}
}

protected virtual void Dispose(bool disposing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ protected MemoryPoolBlock2()
/// </summary>
public int End { get; set; }

/// <summary>
/// The Partition represents the memory pool partition the block belongs to.
/// </summary>
public int Partition { get; private set; }

/// <summary>
/// Reference to the next block of data when the overall "active" bytes spans multiple blocks. At the point when the block is
/// leased Next is guaranteed to be null. Start, End, and Next are used together in order to create a linked-list of discontiguous
Expand All @@ -74,6 +79,7 @@ protected MemoryPoolBlock2()
/// </summary>
public MemoryPoolBlock2 Next { get; set; }

#if DEBUG
~MemoryPoolBlock2()
{
Debug.Assert(!_pinHandle.IsAllocated, "Ad-hoc memory block wasn't unpinned");
Expand All @@ -89,13 +95,17 @@ protected MemoryPoolBlock2()
{
Pool.Return(new MemoryPoolBlock2
{
_dataArrayPtr = _dataArrayPtr,
Data = Data,
_dataArrayPtr = _dataArrayPtr,
Pool = Pool,
Slab = Slab,
Start = Start,
End = End,
Partition = Partition
});
}
}
#endif

/// <summary>
/// Called to ensure that a block is pinned, and return the pointer to native memory just after
Expand Down Expand Up @@ -129,11 +139,17 @@ public void Unpin()
}
}

public static MemoryPoolBlock2 Create(ArraySegment<byte> data)
{
return Create(data, IntPtr.Zero, null, null, -1);
}

public static MemoryPoolBlock2 Create(
ArraySegment<byte> data,
IntPtr dataPtr,
MemoryPool2 pool,
MemoryPoolSlab2 slab)
MemoryPoolSlab2 slab,
int partition)
{
return new MemoryPoolBlock2
{
Expand All @@ -143,6 +159,7 @@ public static MemoryPoolBlock2 Create(
Slab = slab,
Start = data.Offset,
End = data.Offset,
Partition = partition
};
}

Expand Down
19 changes: 19 additions & 0 deletions src/Microsoft.AspNet.Server.Kestrel/KestrelServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Collections.Generic;
using System.Runtime;
using Microsoft.AspNet.Hosting;
using Microsoft.AspNet.Hosting.Server;
using Microsoft.AspNet.Http;
Expand Down Expand Up @@ -106,6 +107,24 @@ public void Start(RequestDelegate requestDelegate)
}

engine.Start(threadCount);

for (var i = 0; i < engine.Threads.Count; i++)
{
engine.Threads[i].Memory2.PopulatePools();
}

// Move all MemoryPoolBlock2 into Gen2
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();

GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);
GC.WaitForPendingFinalizers();

GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect(GC.MaxGeneration, GCCollectionMode.Forced, true);

var atLeastOneListener = false;

foreach (var address in information.Addresses)
Expand Down
14 changes: 7 additions & 7 deletions test/Microsoft.AspNet.Server.KestrelTests/AsciiDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private void FullByteRangeSupported()
{
var byteRange = Enumerable.Range(0, 255).Select(x => (byte)x).ToArray();

var mem = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
mem.End = byteRange.Length;

var begin = mem.GetIterator();
Expand Down Expand Up @@ -44,10 +44,10 @@ private void MultiBlockProducesCorrectResults()
.Concat(byteRange)
.ToArray();

var mem0 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem1 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem2 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem3 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem0 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
var mem1 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
var mem2 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
var mem3 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
mem0.End = byteRange.Length;
mem1.End = byteRange.Length;
mem2.End = byteRange.Length;
Expand Down Expand Up @@ -79,8 +79,8 @@ private void HeapAllocationProducesCorrectResults()
var byteRange = Enumerable.Range(0, 16384 + 64).Select(x => (byte)x).ToArray();
var expectedByteRange = byteRange.Concat(byteRange).ToArray();

var mem0 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem1 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange), IntPtr.Zero, null, null);
var mem0 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
var mem1 = MemoryPoolBlock2.Create(new ArraySegment<byte>(byteRange));
mem0.End = byteRange.Length;
mem1.End = byteRange.Length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void DecodeWithBoundary(string raw, int rawLength, string expect, int exp
private MemoryPoolIterator2 BuildSample(string data)
{
var store = data.Select(c => (byte)c).ToArray();
var mem = MemoryPoolBlock2.Create(new ArraySegment<byte>(store), IntPtr.Zero, null, null);
var mem = MemoryPoolBlock2.Create(new ArraySegment<byte>(store));
mem.End = store.Length;

return mem.GetIterator();
Expand Down