Skip to content

Commit 640908f

Browse files
authored
infra: raise MaxTasksReachedError in TaskGroup (#2866)
1 parent 7f1d9af commit 640908f

File tree

3 files changed

+50
-13
lines changed

3 files changed

+50
-13
lines changed

silkworm/infra/concurrency/task_group.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ void TaskGroup::spawn(const any_io_executor& executor, Task<void> task) {
2525
if (is_closed_) {
2626
throw SpawnAfterCloseError();
2727
}
28+
if (tasks_.size() == max_tasks_) {
29+
throw MaxTasksReachedError(max_tasks_);
30+
}
2831

2932
auto task_id = ++last_task_id_;
3033

@@ -102,7 +105,7 @@ void TaskGroup::on_complete(size_t task_id, const std::exception_ptr& ex_ptr) {
102105
// if a task threw during cancellation - rethrow from wait()
103106
auto result_ex_ptr = (ex_ptr && !is_cancelled) ? ex_ptr : std::exception_ptr{};
104107

105-
bool ok = completions_.try_send({task_id, result_ex_ptr});
108+
const bool ok = completions_.try_send({task_id, result_ex_ptr});
106109
if (!ok) {
107110
throw std::runtime_error("TaskGroup::on_complete: completions queue is full, unexpected max_tasks limit breach");
108111
}

silkworm/infra/concurrency/task_group.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ namespace silkworm::concurrency {
5454
class TaskGroup {
5555
public:
5656
TaskGroup(const boost::asio::any_io_executor& executor, size_t max_tasks)
57-
: completions_(executor, max_tasks),
57+
: max_tasks_(max_tasks),
58+
completions_(executor, max_tasks),
5859
exceptions_(executor, 1) {}
5960

6061
TaskGroup(const TaskGroup&) = delete;
@@ -64,6 +65,11 @@ class TaskGroup {
6465
public:
6566
SpawnAfterCloseError() : std::runtime_error("TaskGroup can't spawn after it was closed") {}
6667
};
68+
class MaxTasksReachedError : public std::runtime_error {
69+
public:
70+
explicit MaxTasksReachedError(size_t max_tasks)
71+
: std::runtime_error("TaskGroup can't spawn more than " + std::to_string(max_tasks) + " tasks") {}
72+
};
6773

6874
//! Similar to co_spawn, but also adds the task to this group until it completes.
6975
void spawn(const boost::asio::any_io_executor& executor, Task<void> task);
@@ -80,6 +86,7 @@ class TaskGroup {
8086
bool is_closed_{false};
8187
size_t last_task_id_{0};
8288
std::map<size_t, boost::asio::cancellation_signal> tasks_;
89+
size_t max_tasks_{0};
8390
concurrency::Channel<std::pair<size_t, std::exception_ptr>> completions_;
8491
concurrency::Channel<std::exception_ptr> exceptions_;
8592
};

silkworm/infra/concurrency/task_group_test.cpp

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,14 @@ static Task<void> async_throw() {
3838
throw TestException();
3939
}
4040

41-
static Task<void> wait_until_cancelled(bool* is_cancelled) {
41+
static Task<void> wait_until_cancelled(bool& is_cancelled) {
4242
try {
4343
auto executor = co_await this_coro::executor;
4444
steady_timer timer(executor);
4545
timer.expires_after(1h);
4646
co_await timer.async_wait(use_awaitable);
4747
} catch (const boost::system::system_error&) {
48-
*is_cancelled = true;
48+
is_cancelled = true;
4949
}
5050
}
5151

@@ -76,7 +76,7 @@ TEST_CASE("TaskGroup.1.wait_until_cancelled") {
7676
auto executor = runner.executor();
7777
TaskGroup group{executor, 1};
7878
bool is_cancelled = false;
79-
group.spawn(executor, wait_until_cancelled(&is_cancelled));
79+
group.spawn(executor, wait_until_cancelled(is_cancelled));
8080
CHECK_THROWS_AS(runner.run(group.wait() && async_throw()), TestException);
8181
CHECK(is_cancelled);
8282
}
@@ -86,9 +86,9 @@ TEST_CASE("TaskGroup.some.wait_until_cancelled") {
8686
auto executor = runner.executor();
8787
TaskGroup group{executor, 3};
8888
std::array<bool, 3> is_cancelled{};
89-
group.spawn(executor, wait_until_cancelled(&is_cancelled[0]));
90-
group.spawn(executor, wait_until_cancelled(&is_cancelled[1]));
91-
group.spawn(executor, wait_until_cancelled(&is_cancelled[2]));
89+
group.spawn(executor, wait_until_cancelled(is_cancelled[0]));
90+
group.spawn(executor, wait_until_cancelled(is_cancelled[1]));
91+
group.spawn(executor, wait_until_cancelled(is_cancelled[2]));
9292
CHECK_THROWS_AS(runner.run(group.wait() && async_throw()), TestException);
9393
CHECK(is_cancelled[0]);
9494
CHECK(is_cancelled[1]);
@@ -101,11 +101,11 @@ TEST_CASE("TaskGroup.some.mix") {
101101
TaskGroup group{executor, 6};
102102
std::array<bool, 3> is_cancelled{};
103103
group.spawn(executor, async_ok());
104-
group.spawn(executor, wait_until_cancelled(&is_cancelled[0]));
104+
group.spawn(executor, wait_until_cancelled(is_cancelled[0]));
105105
group.spawn(executor, async_ok());
106-
group.spawn(executor, wait_until_cancelled(&is_cancelled[1]));
106+
group.spawn(executor, wait_until_cancelled(is_cancelled[1]));
107107
group.spawn(executor, async_ok());
108-
group.spawn(executor, wait_until_cancelled(&is_cancelled[2]));
108+
group.spawn(executor, wait_until_cancelled(is_cancelled[2]));
109109
CHECK_THROWS_AS(runner.run(group.wait() && async_throw()), TestException);
110110
CHECK(is_cancelled[0]);
111111
CHECK(is_cancelled[1]);
@@ -156,11 +156,11 @@ TEST_CASE("TaskGroup.task_cancelled_exception_is_ignored") {
156156
TEST_CASE("TaskGroup.task_exception_during_cancellation_is_rethrown") {
157157
test_util::TaskRunner runner;
158158
auto executor = runner.executor();
159-
TaskGroup group{executor, 1};
159+
TaskGroup group{executor, 2};
160160

161161
auto task = [&]() -> Task<void> {
162162
bool is_cancelled = false;
163-
co_await wait_until_cancelled(&is_cancelled);
163+
co_await wait_until_cancelled(is_cancelled);
164164
if (is_cancelled) {
165165
throw std::runtime_error("exception_during_cancellation");
166166
}
@@ -181,4 +181,31 @@ TEST_CASE("TaskGroup.task_exception_during_cancellation_is_rethrown") {
181181
CHECK(runner.run(test()));
182182
}
183183

184+
TEST_CASE("TaskGroup.avoid_max_tasks_limit_breach") {
185+
log::init(log::Settings{
186+
.log_std_out = true,
187+
.log_nocolor = true,
188+
.log_threads = true,
189+
.log_verbosity = log::Level::kInfo,
190+
});
191+
test_util::TaskRunner runner;
192+
auto executor = runner.executor();
193+
const size_t max_tasks = 10;
194+
TaskGroup group{executor, max_tasks};
195+
196+
auto task = [&]() -> Task<void> {
197+
bool is_cancelled = false;
198+
co_await wait_until_cancelled(is_cancelled);
199+
if (is_cancelled) {
200+
throw std::runtime_error("exception_during_cancellation");
201+
}
202+
};
203+
// Spawn max_tasks tasks
204+
for (size_t i = 0; i < max_tasks; ++i) {
205+
REQUIRE_NOTHROW(group.spawn(executor, task()));
206+
}
207+
// Trying to spawn one more *must* trigger MaxTasksReachedError exception
208+
CHECK_THROWS_AS(group.spawn(executor, task()), TaskGroup::MaxTasksReachedError);
209+
}
210+
184211
} // namespace silkworm::concurrency

0 commit comments

Comments
 (0)