Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 134 additions & 41 deletions quantum/impl/quantum_coroutine_pool_allocator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@
//NOTE: DO NOT INCLUDE DIRECTLY
#include <type_traits>
#include <algorithm>
#include <assert.h>
#include <cassert>
#include <algorithm>
#include <cstring>

#if defined(_WIN32) && !defined(__CYGWIN__)
//TODO: Windows headers for memory mapping and page protection
#else
#include <sys/mman.h>
#endif

#if defined(BOOST_USE_VALGRIND)
#include <valgrind/valgrind.h>
Expand All @@ -31,34 +39,48 @@ namespace quantum {
template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(index_type size) :
_size(size),
_blocks(new Header*[size]),
_freeBlocks(new index_type[size]),
_freeBlockIndex(size-1),
_blocks(nullptr),
_freeBlocks(nullptr),
_freeBlockIndex(size-1), //point to the last element
_numHeapAllocatedBlocks(0),
_stackSize(std::min(std::max(traits::default_size(), traits::minimum_size()), traits::maximum_size()))
_stackSize(std::min(std::max(traits::default_size(),
traits::minimum_size()),
traits::maximum_size()))
{
if (!_blocks || !_freeBlocks) {
if (_size == 0)
{
throw std::runtime_error("Invalid coroutine allocator pool size");
}
_freeBlocks = new index_type[size];
if (!_freeBlocks)
{
throw std::bad_alloc();
}
if (_size == 0) {
throw std::runtime_error("Invalid coroutine allocator pool size");
_blocks = new uint8_t*[size];
if (!_blocks)
{
delete[] _freeBlocks;
throw std::bad_alloc();
}
//pre-allocate all the coroutine stack blocks
for (index_type i = 0; i < size; ++i) {
_blocks[i] = reinterpret_cast<Header*>(new char[_stackSize]);
if (!_blocks[i]) {
//pre-allocate all the coroutine stack blocks and protect the last stack page to
//track coroutine stack overflows.
for (size_t i = 0; i < size; ++i)
{
_blocks[i] = allocateCoroutine(ProtectMemPage::On);
if (!_blocks[i])
{
deallocateBlocks(i);
throw std::bad_alloc();
}
_blocks[i]->_pos = i; //mark position
//set the block position
header(_blocks[i])->_pos = i;
}
//initialize the free block list
for (index_type i = 0; i < size; ++i) {
_freeBlocks[i] = i;
}
std::iota(_freeBlocks, _freeBlocks + size, 0);
}

template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(CoroutinePoolAllocator<STACK_TRAITS>&& other)
CoroutinePoolAllocator<STACK_TRAITS>::CoroutinePoolAllocator(CoroutinePoolAllocator<STACK_TRAITS>&& other) noexcept
{
*this = other;
}
Expand All @@ -82,63 +104,120 @@ CoroutinePoolAllocator<STACK_TRAITS>& CoroutinePoolAllocator<STACK_TRAITS>::oper
template <typename STACK_TRAITS>
CoroutinePoolAllocator<STACK_TRAITS>::~CoroutinePoolAllocator()
{
for (size_t i = 0; i < _size; ++i) {
delete[] (char*)_blocks[i];
deallocateBlocks(_size);
}

template <typename STACK_TRAITS>
void CoroutinePoolAllocator<STACK_TRAITS>::deallocateBlocks(size_t pos)
{
for (size_t j = 0; j < pos; ++j)
{
deallocateCoroutine(_blocks[j]);
}
delete[] _blocks;
delete[] _freeBlocks;
}

template <typename STACK_TRAITS>
uint8_t* CoroutinePoolAllocator<STACK_TRAITS>::allocateCoroutine(ProtectMemPage protect) const
{
#if defined(_WIN32) && !defined(__CYGWIN__)
return new uint8_t[_stackSize];
#else
uint8_t* block = (uint8_t*)mmap(nullptr,
_stackSize,
PROT_WRITE | PROT_READ | PROT_EXEC,
MAP_ANONYMOUS | MAP_PRIVATE,
-1, //invalid fd
0); //no offset
if (block == MAP_FAILED)
{
return nullptr;
}
//Add protection to the lowest page
if ((protect == ProtectMemPage::On) &&
mprotect(block, traits::page_size(), PROT_NONE) != 0)
{
munmap(block, _stackSize); //free region
return nullptr;
}
return block;
#endif
}

template <typename STACK_TRAITS>
int CoroutinePoolAllocator<STACK_TRAITS>::deallocateCoroutine(uint8_t* block) const
{
assert(block);
#if defined(_WIN32) && !defined(__CYGWIN__)
delete[] block;
return 0;
#else
return munmap(block, _stackSize);
#endif
}

template <typename STACK_TRAITS>
boost::context::stack_context CoroutinePoolAllocator<STACK_TRAITS>::allocate() {
boost::context::stack_context ctx;
Header* block = nullptr;
uint8_t* block = nullptr;
{
SpinLock::Guard lock(_spinlock);
if (!isEmpty())
{
block = _blocks[_freeBlocks[_freeBlockIndex--]];
}
}
if (!block) {
// Use heap allocation
block = (Header*)new char[_stackSize];
if (!block) {
if (!block)
{
//Do not protect last memory page for performance reasons
block = allocateCoroutine(ProtectMemPage::Off);
if (!block)
{
throw std::bad_alloc();
}
block->_pos = -1; //mark position as non-managed
header(block)->_pos = -1; //mark position as non-managed
SpinLock::Guard lock(_spinlock);
++_numHeapAllocatedBlocks;
}
char* block_start = reinterpret_cast<char*>(block) + sizeof(Header);
//populate stack context
boost::context::stack_context ctx;
ctx.size = _stackSize - sizeof(Header);
ctx.sp = block_start + ctx.size;
#if defined(BOOST_USE_VALGRIND)
ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block_start);
#endif
ctx.sp = block + ctx.size;
#if defined(BOOST_USE_VALGRIND)
ctx.valgrind_stack_id = VALGRIND_STACK_REGISTER(ctx.sp, block);
#endif
return ctx;
}

template <typename STACK_TRAITS>
void CoroutinePoolAllocator<STACK_TRAITS>::deallocate(const boost::context::stack_context& ctx) {
if (!ctx.sp) {
if (!ctx.sp)
{
return;
}
#if defined(BOOST_USE_VALGRIND)
VALGRIND_STACK_DEREGISTER(ctx.valgrind_stack_id);
#endif
int bi = blockIndex(ctx);
assert(bi >= -1 && bi < _size); //guard against coroutine stack overflow or corruption
if (isManaged(ctx)) {
if (isManaged(ctx))
{
//find index of the block
SpinLock::Guard lock(_spinlock);
_freeBlocks[++_freeBlockIndex] = bi;
}
else {
delete[] (char*)getHeader(ctx);
SpinLock::Guard lock(_spinlock);
--_numHeapAllocatedBlocks;
assert(_numHeapAllocatedBlocks >= 0);
else
{
//Unlink coroutine stack
{
SpinLock::Guard lock(_spinlock);
--_numHeapAllocatedBlocks;
assert(_numHeapAllocatedBlocks >= 0);
}
if (deallocateCoroutine(stackEnd(ctx)) != 0)
{
throw std::runtime_error("Bad de-allocation");
}
}
}

Expand Down Expand Up @@ -168,9 +247,23 @@ bool CoroutinePoolAllocator<STACK_TRAITS>::isEmpty() const

template <typename STACK_TRAITS>
typename CoroutinePoolAllocator<STACK_TRAITS>::Header*
CoroutinePoolAllocator<STACK_TRAITS>::getHeader(const boost::context::stack_context& ctx) const
CoroutinePoolAllocator<STACK_TRAITS>::header(const boost::context::stack_context& ctx) const
{
return reinterpret_cast<Header*>(ctx.sp);
}

template <typename STACK_TRAITS>
typename CoroutinePoolAllocator<STACK_TRAITS>::Header*
CoroutinePoolAllocator<STACK_TRAITS>::header(uint8_t* block) const
{
return reinterpret_cast<Header*>(block + _stackSize - sizeof(Header));
}

template <typename STACK_TRAITS>
uint8_t*
CoroutinePoolAllocator<STACK_TRAITS>::stackEnd(const boost::context::stack_context& ctx) const
{
return reinterpret_cast<Header*>(reinterpret_cast<char*>(ctx.sp) - ctx.size - sizeof(Header));
return static_cast<uint8_t*>(ctx.sp) - ctx.size;
}

template <typename STACK_TRAITS>
Expand All @@ -182,7 +275,7 @@ bool CoroutinePoolAllocator<STACK_TRAITS>::isManaged(const boost::context::stack
template <typename STACK_TRAITS>
int CoroutinePoolAllocator<STACK_TRAITS>::blockIndex(const boost::context::stack_context& ctx) const
{
return getHeader(ctx)->_pos;
return header(ctx)->_pos;
}

}}
2 changes: 1 addition & 1 deletion quantum/impl/quantum_task_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ TaskQueue::~TaskQueue()
inline
void TaskQueue::pinToCore(int coreId)
{
#ifdef _WIN32
#if defined(_WIN32) && !defined(__CYGWIN__)
SetThreadAffinityMask(_thread->native_handle(), 1 << coreId);
#else
int cpuSetSize = sizeof(cpu_set_t);
Expand Down
18 changes: 12 additions & 6 deletions quantum/quantum_coroutine_pool_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ struct CoroutinePoolAllocator
typedef STACK_TRAITS traits;

//------------------------------- Methods ----------------------------------
CoroutinePoolAllocator(index_type size);
explicit CoroutinePoolAllocator(index_type size);
CoroutinePoolAllocator(const this_type&) = delete;
CoroutinePoolAllocator(this_type&&);
CoroutinePoolAllocator(this_type&&) noexcept;
CoroutinePoolAllocator& operator=(const this_type&) = delete;
CoroutinePoolAllocator& operator=(this_type&&);
virtual ~CoroutinePoolAllocator();
Expand All @@ -65,14 +65,19 @@ struct CoroutinePoolAllocator
struct Header {
int _pos;
};

enum class ProtectMemPage { On, Off };
int blockIndex(const boost::context::stack_context& ctx) const;
bool isManaged(const boost::context::stack_context& ctx) const;
Header* getHeader(const boost::context::stack_context& ctx) const;
Header* header(const boost::context::stack_context& ctx) const;
Header* header(uint8_t* block) const;
uint8_t* stackEnd(const boost::context::stack_context& ctx) const;
void deallocateBlocks(size_t pos);
uint8_t* allocateCoroutine(ProtectMemPage protect) const;
int deallocateCoroutine(uint8_t*) const;

//------------------------------- Members ----------------------------------
index_type _size;
Header** _blocks;
uint8_t** _blocks;
index_type* _freeBlocks;
ssize_t _freeBlockIndex;
size_t _numHeapAllocatedBlocks;
Expand All @@ -85,7 +90,8 @@ struct CoroutinePoolAllocatorProxy
{
typedef std::false_type default_constructor;

CoroutinePoolAllocatorProxy(uint16_t size) : _alloc(new CoroutinePoolAllocator<STACK_TRAITS>(size))
explicit CoroutinePoolAllocatorProxy(uint16_t size) :
_alloc(new CoroutinePoolAllocator<STACK_TRAITS>(size))
{
if (!_alloc) {
throw std::bad_alloc();
Expand Down
2 changes: 1 addition & 1 deletion quantum/quantum_dispatcher_core.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#include <thread>
#include <functional>
#include <algorithm>
#ifdef _WIN32
#if defined(_WIN32) && !defined(__CYGWIN__)
#include <winbase.h>
#else
#include <pthread.h>
Expand Down
2 changes: 1 addition & 1 deletion tests/quantum_sequencer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST_P(SequencerTest, BasicTaskOrder)
{
using namespace Bloomberg::quantum;

const int taskCount = 100;
const int taskCount = 2000;
const int sequenceKeyCount = 3;
SequencerTestData testData;
SequencerTestData::SequenceKeyMap sequenceKeys;
Expand Down