Skip to content
273 changes: 202 additions & 71 deletions include/oneapi/tbb/detail/_task_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,64 @@ class task_handle;
class task_handle_task;
class task_dynamic_state;

struct successor_list_node {
successor_list_node* next_node = nullptr;
struct notify_list_node {
// First element is a pointer to a bypassed task
// Second element is true if bypass is allowed
using notify_result_type = std::pair<task_handle_task*, bool>;

notify_list_node* next_node = nullptr;

virtual notify_result_type notify_on_completion() = 0;
virtual notify_result_type notify_on_cancellation() = 0;
virtual ~notify_list_node() = default;
};

struct notify_successor_node : notify_list_node {
task_dynamic_state* successor_state = nullptr;
d1::small_object_allocator allocator;

successor_list_node(task_dynamic_state* state, d1::small_object_allocator& alloc)
: successor_state(state), allocator(alloc)
{}
notify_successor_node(task_dynamic_state* state, d1::small_object_allocator& alloc)
: successor_state(state), allocator(alloc) {}

void destroy() {
allocator.delete_object(this);
notify_result_type notify_common();

notify_result_type notify_on_completion() override {
return notify_common();
}

notify_result_type notify_on_cancellation() override {
return notify_common();
}
};

struct notify_waiter_node : notify_list_node {
d1::wait_context task_wait_context;
bool was_canceled;

notify_waiter_node() : task_wait_context(1), was_canceled(false) {}

notify_result_type notify_common() {
task_wait_context.release();

// Bypassing from list notification is not allowed if there are waiters in the list
return {nullptr, false};
}

virtual notify_result_type notify_on_completion() override {
return notify_common();
}

virtual notify_result_type notify_on_cancellation() override {
was_canceled = true;
return notify_common();
}
};

class task_dynamic_state {
public:
task_dynamic_state(task_handle_task* task, d1::small_object_allocator& alloc)
: m_task(task)
, m_successor_list_head(nullptr)
, m_notify_list_head(nullptr)
, m_new_completion_point(nullptr)
, m_num_dependencies(0)
, m_num_references(1) // reserves a task co-ownership for dynamic state
Expand Down Expand Up @@ -93,25 +132,35 @@ class task_dynamic_state {
}

task_handle_task* complete_and_try_get_successor();
task_handle_task* cancel_and_try_get_successor();

void add_successor(task_handle& successor);
void add_successor_node(successor_list_node* new_successor_node, successor_list_node* current_successor_list_head);
void add_successor_list(successor_list_node* successor_list);
bool wait_for_completion(d1::task_group_context&);
bool run_self_and_wait_for_completion(d1::task_group_context&);
void add_notify_node(notify_list_node* new_notify_node, notify_list_node* current_notify_list_head);
void add_notify_list(notify_list_node* notify_list);

using successor_list_state_flag = std::uintptr_t;
static constexpr successor_list_state_flag COMPLETED_FLAG = ~std::uintptr_t(0);
static constexpr successor_list_state_flag TRANSFERRED_FLAG = ~std::uintptr_t(1);
using notify_list_state_flag = std::uintptr_t;
static constexpr notify_list_state_flag COMPLETED_FLAG = ~std::uintptr_t(0);
static constexpr notify_list_state_flag TRANSFERRED_FLAG = ~std::uintptr_t(1);
static constexpr notify_list_state_flag CANCELED_FLAG = ~std::uintptr_t(2);

static bool represents_completed_task(notify_list_node* list_head) {
return list_head == reinterpret_cast<notify_list_node*>(COMPLETED_FLAG);
}

static bool represents_completed_task(successor_list_node* list_head) {
return list_head == reinterpret_cast<successor_list_node*>(COMPLETED_FLAG);
static bool represents_canceled_task(notify_list_node* list_head) {
return list_head == reinterpret_cast<notify_list_node*>(CANCELED_FLAG);
}

static bool represents_transferred_completion(successor_list_node* list_head) {
return list_head == reinterpret_cast<successor_list_node*>(TRANSFERRED_FLAG);
static bool represents_transferred_completion(notify_list_node* list_head) {
return list_head == reinterpret_cast<notify_list_node*>(TRANSFERRED_FLAG);
}

successor_list_node* fetch_successor_list(successor_list_state_flag new_list_state_flag) {
return m_successor_list_head.exchange(reinterpret_cast<successor_list_node*>(new_list_state_flag));
task_handle_task* try_get_successor(notify_list_state_flag);

notify_list_node* fetch_notify_list(notify_list_state_flag new_list_state_flag) {
return m_notify_list_head.exchange(reinterpret_cast<notify_list_node*>(new_list_state_flag));
}

void transfer_completion_to(task_dynamic_state* new_completion_point) {
Expand All @@ -120,20 +169,30 @@ class task_dynamic_state {
// to prevent it's early destruction
new_completion_point->reserve();
m_new_completion_point.store(new_completion_point, std::memory_order_relaxed);
successor_list_node* successor_list = fetch_successor_list(TRANSFERRED_FLAG);
new_completion_point->add_successor_list(successor_list);
notify_list_node* notify_list = fetch_notify_list(TRANSFERRED_FLAG);
new_completion_point->add_notify_list(notify_list);
}

task_handle_task* get_task() { return m_task; }

private:
task_handle_task* m_task;
std::atomic<successor_list_node*> m_successor_list_head;
std::atomic<notify_list_node*> m_notify_list_head;
std::atomic<task_dynamic_state*> m_new_completion_point;
std::atomic<std::size_t> m_num_dependencies;
std::atomic<std::size_t> m_num_references;
d1::small_object_allocator m_allocator;
};

inline std::pair<task_handle_task*, bool> notify_successor_node::notify_common() {
task_handle_task* successor_task = nullptr;
if (successor_state->release_dependency()) {
successor_task = successor_state->get_task();
allocator.delete_object(this);
}

return {successor_task, true};
}
#endif // __TBB_PREVIEW_TASK_GROUP_EXTENSIONS

class task_handle_task : public d1::task {
Expand Down Expand Up @@ -223,6 +282,16 @@ class task_handle_task : public d1::task {
return next_task;
}

task_handle_task* cancel_and_try_get_successor() {
task_handle_task* next_task = nullptr;

task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
if (current_state != nullptr) {
next_task = current_state->cancel_and_try_get_successor();
}
return next_task;
}

// Returns true if the released dependency was the last remaining one; false otherwise
bool release_dependency() {
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -304,104 +373,166 @@ inline bool operator!=(std::nullptr_t, task_handle const& th) noexcept {
}

#if __TBB_PREVIEW_TASK_GROUP_EXTENSIONS
inline void task_dynamic_state::add_successor_node(successor_list_node* new_successor_node,
successor_list_node* current_successor_list_head)
inline void task_dynamic_state::add_notify_node(notify_list_node* new_notify_node,
notify_list_node* current_notify_list_head)
{
__TBB_ASSERT(new_successor_node != nullptr, nullptr);
__TBB_ASSERT(new_notify_node != nullptr, nullptr);

new_successor_node->next_node = current_successor_list_head;
new_notify_node->next_node = current_notify_list_head;

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, new_successor_node)) {
while (!m_notify_list_head.compare_exchange_strong(current_notify_list_head, new_notify_node)) {
// Other thread updated the head of the list

if (represents_completed_task(current_successor_list_head)) {
// Current task has completed while we tried to insert the successor to the list
new_successor_node->successor_state->release_dependency();
new_successor_node->destroy();
if (represents_completed_task(current_notify_list_head)) {
// Current task has completed while we tried to insert the node to the list
new_notify_node->notify_on_completion();
break;
} else if (represents_transferred_completion(current_successor_list_head)) {
// Redirect successor to the task received the completion
} else if (represents_canceled_task(current_notify_list_head)) {
// Current task has canceled while we tried to insert the node to the list
new_notify_node->notify_on_cancellation();
break;
} else if (represents_transferred_completion(current_notify_list_head)) {
// Redirect notify_node to the task received the completion
task_dynamic_state* new_completion_point = m_new_completion_point.load(std::memory_order_relaxed);
__TBB_ASSERT(new_completion_point, "successor list is marked as transferred, but new dynamic state is not set");
new_completion_point->add_successor_node(new_successor_node, new_completion_point->m_successor_list_head.load(std::memory_order_acquire));
__TBB_ASSERT(new_completion_point, "notify list is marked as transferred, but new dynamic state is not set");
new_completion_point->add_notify_node(new_notify_node, new_completion_point->m_notify_list_head.load(std::memory_order_acquire));
break;
}

new_successor_node->next_node = current_successor_list_head;
new_notify_node->next_node = current_notify_list_head;
}
}


inline void task_dynamic_state::add_successor(task_handle& successor) {
successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);
notify_list_node* current_notify_list_head = m_notify_list_head.load(std::memory_order_acquire);

if (!represents_completed_task(current_successor_list_head)) {
if (represents_transferred_completion(current_successor_list_head)) {
if (!represents_completed_task(current_notify_list_head) && !represents_canceled_task(current_notify_list_head)) {
if (represents_transferred_completion(current_notify_list_head)) {
// Redirect successor to the task received the completion
task_dynamic_state* new_completion_point = m_new_completion_point.load(std::memory_order_relaxed);
__TBB_ASSERT(new_completion_point, "successor list is marked as transferred, but new dynamic state is not set");
__TBB_ASSERT(new_completion_point, "notify list is marked as transferred, but new dynamic state is not set");
new_completion_point->add_successor(successor);
} else {
task_dynamic_state* successor_state = task_handle_accessor::get_task_dynamic_state(successor);
successor_state->register_dependency();

d1::small_object_allocator alloc;
successor_list_node* new_successor_node = alloc.new_object<successor_list_node>(successor_state, alloc);
add_successor_node(new_successor_node, current_successor_list_head);
notify_successor_node* new_successor_node = alloc.new_object<notify_successor_node>(successor_state, alloc);
add_notify_node(new_successor_node, current_notify_list_head);
}
}
}

inline bool task_dynamic_state::wait_for_completion(d1::task_group_context& ctx) {
notify_list_node* current_notify_list_head = m_notify_list_head.load(std::memory_order_acquire);
bool was_canceled = false;

if (!represents_completed_task(current_notify_list_head)) {
if (represents_canceled_task(current_notify_list_head)) {
was_canceled = true;
} else if (represents_transferred_completion(current_notify_list_head)) {
// Redirect waiter to the task received the completion
task_dynamic_state* new_completion_point = m_new_completion_point.load(std::memory_order_relaxed);
__TBB_ASSERT(new_completion_point, "notify list is marked as transferred, but new dynamic state is not set");
was_canceled = new_completion_point->wait_for_completion(ctx);
} else {
notify_waiter_node waiter_node;
add_notify_node(&waiter_node, current_notify_list_head);
d1::wait(waiter_node.task_wait_context, ctx);
was_canceled = waiter_node.was_canceled;
}
}

return was_canceled;
}

inline bool task_dynamic_state::run_self_and_wait_for_completion(d1::task_group_context& ctx) {
__TBB_ASSERT(!has_dependencies(), nullptr);
notify_list_node* current_notify_list_head = m_notify_list_head.load(std::memory_order_acquire);

__TBB_ASSERT(!represents_completed_task(current_notify_list_head), "non-submitted task cannot be completed");
__TBB_ASSERT(!represents_canceled_task(current_notify_list_head), "non-submitted task cannot be canceled");
__TBB_ASSERT(!represents_transferred_completion(current_notify_list_head), "non-submitted task completion cannot be transferred");

notify_waiter_node waiter_node;
add_notify_node(&waiter_node, current_notify_list_head);
d1::execute_and_wait(*get_task(), ctx, waiter_node.task_wait_context, ctx);
return waiter_node.was_canceled;
}

inline void task_dynamic_state::add_successor_list(successor_list_node* successor_list) {
if (successor_list == nullptr) return;
inline void task_dynamic_state::add_notify_list(notify_list_node* notify_list) {
if (notify_list == nullptr) return;

successor_list_node* last_node = successor_list;
notify_list_node* last_node = notify_list;

while (last_node->next_node != nullptr) {
last_node = last_node->next_node;
}

successor_list_node* current_successor_list_head = m_successor_list_head.load(std::memory_order_acquire);
last_node->next_node = current_successor_list_head;
notify_list_node* current_notify_list_head = m_notify_list_head.load(std::memory_order_acquire);
last_node->next_node = current_notify_list_head;

while (!m_successor_list_head.compare_exchange_strong(current_successor_list_head, successor_list)) {
__TBB_ASSERT(!represents_completed_task(current_successor_list_head) &&
!represents_transferred_completion(current_successor_list_head),
while (!m_notify_list_head.compare_exchange_strong(current_notify_list_head, notify_list)) {
__TBB_ASSERT(!represents_completed_task(current_notify_list_head) &&
!represents_canceled_task(current_notify_list_head) &&
!represents_transferred_completion(current_notify_list_head),
"Task receiving the completion was executed or completed");
// Other thread updated the head of the list
last_node->next_node = current_successor_list_head;
last_node->next_node = current_notify_list_head;
}
}

inline task_handle_task* task_dynamic_state::complete_and_try_get_successor() {
inline task_handle_task* task_dynamic_state::try_get_successor(notify_list_state_flag state_flag) {
__TBB_ASSERT(state_flag == COMPLETED_FLAG || state_flag == CANCELED_FLAG, "Unexpected state_flag");
notify_list_node* node = fetch_notify_list(state_flag);
task_handle_task* next_task = nullptr;
bool bypass_allowed = true;

while (node != nullptr) {
notify_list_node* next_node = node->next_node;

// Don't access node after the notification
notify_list_node::notify_result_type result = state_flag == COMPLETED_FLAG ?
node->notify_on_completion() :
node->notify_on_cancellation();
if (!result.second) bypass_allowed = false;
task_handle_task* successor_task = result.first;

if (next_task == nullptr) {
next_task = successor_task;
} else if (successor_task != nullptr) {
d1::spawn(*successor_task, successor_task->ctx());
}

node = next_node;
}

if (next_task && !bypass_allowed) {
d1::spawn(*next_task, next_task->ctx());
next_task = nullptr;
}
return next_task;
}

successor_list_node* node = m_successor_list_head.load(std::memory_order_acquire);
inline task_handle_task* task_dynamic_state::complete_and_try_get_successor() {
task_handle_task* next_task = nullptr;
notify_list_node* node = m_notify_list_head.load(std::memory_order_acquire);

// Doing a single check is enough since the this function is called after the task body and
// the state of the list cannot change to transferred
if (!represents_transferred_completion(node)) {
node = fetch_successor_list(COMPLETED_FLAG);

while (node != nullptr) {
task_dynamic_state* successor_state = node->successor_state;

if (successor_state->release_dependency()) {
task_handle_task* successor_task = successor_state->get_task();
if (next_task == nullptr) {
next_task = successor_task;
} else {
d1::spawn(*successor_task, successor_task->ctx());
}
}

successor_list_node* next_node = node->next_node;
node->destroy();
node = next_node;
}
next_task = try_get_successor(COMPLETED_FLAG);
}
return next_task;
}

inline task_handle_task* task_dynamic_state::cancel_and_try_get_successor() {
__TBB_ASSERT(!represents_transferred_completion(m_notify_list_head.load(std::memory_order_relaxed)),
"canceled task completion cannot be transferred");
return try_get_successor(CANCELED_FLAG);
}

inline void task_handle_task::transfer_completion_to(task_handle& receiving_task) {
__TBB_ASSERT(receiving_task, nullptr);
task_dynamic_state* current_state = m_dynamic_state.load(std::memory_order_relaxed);
Expand Down
Loading