Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions include/async/schedulers/trigger_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,26 @@ template <typename... Args> struct trigger_task {
};

namespace run_policy {
struct base {
template <typename M, typename... Args>
static auto run(auto &&q, auto &count, Args &&...args) {
auto &task = q.front();
conc::call_in_critical_section<M>([&]() {
q.pop_front();
task.pending = false;
});
task.run(std::forward<Args>(args)...);
--count;
}
};

struct one : base {
struct one {
template <typename, typename M, typename... Args>
static auto run(auto &&tasks, auto &count, Args &&...args) {
decltype(auto) q =
requeue_policy::immediate::template get_queue<0, M>(tasks);
if (not std::empty(q)) {
base::run<M>(q, count, std::forward<Args>(args)...);
using RQP = requeue_policy::immediate;
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
if (auto task = RQP::template pop<M>(q); task) {
task->run(std::forward<Args>(args)...);
--count;
}
}
};

struct all : base {
template <typename RQP, typename M>
static auto run(auto &&tasks, auto &count, auto &&...args) {
struct all {
template <typename RQP, typename M, typename... Args>
static auto run(auto &&tasks, auto &count, Args &&...args) {
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
while (not std::empty(q)) {
base::run<M>(q, count, args...);
for (auto task = RQP::template pop<M>(q); task;
task = RQP::template pop<M>(q)) {
task->run(args...);
--count;
}
}
};
Expand Down
46 changes: 46 additions & 0 deletions test/schedulers/trigger_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
#include <catch2/catch_test_macros.hpp>
#include <fmt/format.h>

#include <atomic>
#include <concepts>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

namespace {
Expand Down Expand Up @@ -164,12 +167,15 @@ namespace {
std::vector<std::string> debug_events{};

struct debug_handler {
std::mutex m;

template <stdx::ct_string C, stdx::ct_string S, typename Ctx>
constexpr auto signal(auto &&...) {
if constexpr (std::is_same_v<async::debug::tag_of<Ctx>,
async::trigger_scheduler_sender_t>) {
static_assert(
boost::mp11::mp_empty<async::debug::children_of<Ctx>>::value);
std::lock_guard lock{m};
debug_events.push_back(
fmt::format("{} {} {}", C, async::debug::name_of<Ctx>, S));
}
Expand Down Expand Up @@ -287,3 +293,43 @@ TEMPLATE_TEST_CASE("triggered task can start a new task on the same trigger",
CHECK(var == 42);
CHECK(async::triggers<stdx::cts_t<name>>.empty());
}

TEST_CASE("thread safety for immediate execution", "[trigger_scheduler]") {
auto s = async::trigger_scheduler<"rqp_imm">{};

std::atomic<bool> ready1{};
std::atomic<bool> ready2{};
int var1{};
int var2{};

auto start = [&](auto f) {
async::sender auto sndr = async::start_on(s, async::just_result_of(f));
CHECK(async::start_detached(sndr));
};

auto t1 = std::thread{[&] {
start([&] {
var1 = 17;
start([&] { ++var1; });
});
ready1 = true;
ready1.notify_one();
}};
auto t2 = std::thread{[&] {
start([&] { var2 = 42; });
ready2 = true;
ready2.notify_one();
}};

ready1.wait(false);
async::run_triggers<"rqp_imm", async::requeue_policy::immediate>();

ready2.wait(false);
async::run_triggers<"rqp_imm", async::requeue_policy::immediate>();

t1.join();
t2.join();
CHECK(var1 == 18);
CHECK(var2 == 42);
CHECK(async::triggers<stdx::cts_t<"rqp_imm">>.empty());
}