Skip to content

Commit 0a0845a

Browse files
committed
🐛 Fix thread safety in run_triggers
Problem: - Running all triggers with the `immediate` requeue policy is not thread-safe. Solution: - Use the policy functions to be thread safe, like `task_manager` does.
1 parent 6b711b3 commit 0a0845a

File tree

2 files changed

+59
-23
lines changed

2 files changed

+59
-23
lines changed

include/async/schedulers/trigger_manager.hpp

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,36 +35,26 @@ template <typename... Args> struct trigger_task {
3535
};
3636

3737
namespace run_policy {
38-
struct base {
39-
template <typename M, typename... Args>
40-
static auto run(auto &&q, auto &count, Args &&...args) {
41-
auto &task = q.front();
42-
conc::call_in_critical_section<M>([&]() {
43-
q.pop_front();
44-
task.pending = false;
45-
});
46-
task.run(std::forward<Args>(args)...);
47-
--count;
48-
}
49-
};
50-
51-
struct one : base {
38+
struct one {
5239
template <typename, typename M, typename... Args>
5340
static auto run(auto &&tasks, auto &count, Args &&...args) {
54-
decltype(auto) q =
55-
requeue_policy::immediate::template get_queue<0, M>(tasks);
56-
if (not std::empty(q)) {
57-
base::run<M>(q, count, std::forward<Args>(args)...);
41+
using RQP = requeue_policy::immediate;
42+
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
43+
if (auto task = RQP::template pop<M>(q); task) {
44+
task->run(std::forward<Args>(args)...);
45+
--count;
5846
}
5947
}
6048
};
6149

62-
struct all : base {
63-
template <typename RQP, typename M>
64-
static auto run(auto &&tasks, auto &count, auto &&...args) {
50+
struct all {
51+
template <typename RQP, typename M, typename... Args>
52+
static auto run(auto &&tasks, auto &count, Args &&...args) {
6553
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
66-
while (not std::empty(q)) {
67-
base::run<M>(q, count, args...);
54+
for (auto task = RQP::template pop<M>(q); task;
55+
task = RQP::template pop<M>(q)) {
56+
task->run(args...);
57+
--count;
6858
}
6959
}
7060
};

test/schedulers/trigger_scheduler.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
#include <catch2/catch_test_macros.hpp>
2020
#include <fmt/format.h>
2121

22+
#include <atomic>
2223
#include <concepts>
24+
#include <mutex>
2325
#include <string>
26+
#include <thread>
2427
#include <vector>
2528

2629
namespace {
@@ -164,12 +167,15 @@ namespace {
164167
std::vector<std::string> debug_events{};
165168

166169
struct debug_handler {
170+
std::mutex m;
171+
167172
template <stdx::ct_string C, stdx::ct_string S, typename Ctx>
168173
constexpr auto signal(auto &&...) {
169174
if constexpr (std::is_same_v<async::debug::tag_of<Ctx>,
170175
async::trigger_scheduler_sender_t>) {
171176
static_assert(
172177
boost::mp11::mp_empty<async::debug::children_of<Ctx>>::value);
178+
std::lock_guard lock{m};
173179
debug_events.push_back(
174180
fmt::format("{} {} {}", C, async::debug::name_of<Ctx>, S));
175181
}
@@ -287,3 +293,43 @@ TEMPLATE_TEST_CASE("triggered task can start a new task on the same trigger",
287293
CHECK(var == 42);
288294
CHECK(async::triggers<stdx::cts_t<name>>.empty());
289295
}
296+
297+
TEST_CASE("thread safety for immediate execution", "[trigger_scheduler]") {
298+
auto s = async::trigger_scheduler<"rqp_imm">{};
299+
300+
std::atomic<bool> ready1{};
301+
std::atomic<bool> ready2{};
302+
int var1{};
303+
int var2{};
304+
305+
auto start = [&](auto f) {
306+
async::sender auto sndr = async::start_on(s, async::just_result_of(f));
307+
CHECK(async::start_detached(sndr));
308+
};
309+
310+
auto t1 = std::thread{[&] {
311+
start([&] {
312+
var1 = 17;
313+
start([&] { ++var1; });
314+
});
315+
ready1 = true;
316+
ready1.notify_one();
317+
}};
318+
auto t2 = std::thread{[&] {
319+
start([&] { var2 = 42; });
320+
ready2 = true;
321+
ready2.notify_one();
322+
}};
323+
324+
ready1.wait(false);
325+
async::run_triggers<"rqp_imm", async::requeue_policy::immediate>();
326+
327+
ready2.wait(false);
328+
async::run_triggers<"rqp_imm", async::requeue_policy::immediate>();
329+
330+
t1.join();
331+
t2.join();
332+
CHECK(var1 == 18);
333+
CHECK(var2 == 42);
334+
CHECK(async::triggers<stdx::cts_t<"rqp_imm">>.empty());
335+
}

0 commit comments

Comments
 (0)