Skip to content

Commit ec11737

Browse files
authored
Merge pull request #247 from elbeno/update-cpp20-branch
Update cpp20 branch
2 parents afdf674 + 8f96533 commit ec11737

File tree

8 files changed

+315
-95
lines changed

8 files changed

+315
-95
lines changed

.github/workflows/unit_tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ on:
55
workflow_dispatch:
66
merge_group:
77
push:
8-
branches: [ main ]
8+
branches: [ cpp20 ]
99
pull_request:
10-
branches: [ main ]
10+
branches: [ cpp20 ]
1111

1212
env:
1313
DEBIAN_FRONTEND: noninteractive

.github/workflows/usage_test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ on:
55
workflow_dispatch:
66
merge_group:
77
push:
8-
branches: [ main ]
8+
branches: [ cpp20 ]
99
pull_request:
10-
branches: [ main ]
10+
branches: [ cpp20 ]
1111

1212
env:
1313
DEBIAN_FRONTEND: noninteractive

include/async/schedulers/trigger_manager.hpp

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -35,36 +35,26 @@ template <typename... Args> struct trigger_task {
3535
};
3636

3737
namespace run_policy {
38-
struct base {
39-
template <typename M, typename... Args>
40-
static auto run(auto &&q, auto &count, Args &&...args) {
41-
auto &task = q.front();
42-
conc::call_in_critical_section<M>([&]() {
43-
q.pop_front();
44-
task.pending = false;
45-
});
46-
task.run(std::forward<Args>(args)...);
47-
--count;
48-
}
49-
};
50-
51-
struct one : base {
38+
struct one {
5239
template <typename, typename M, typename... Args>
5340
static auto run(auto &&tasks, auto &count, Args &&...args) {
54-
decltype(auto) q =
55-
requeue_policy::immediate::template get_queue<0, M>(tasks);
56-
if (not std::empty(q)) {
57-
base::run<M>(q, count, std::forward<Args>(args)...);
41+
using RQP = requeue_policy::immediate;
42+
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
43+
if (auto task = RQP::template pop<M>(q); task) {
44+
task->run(std::forward<Args>(args)...);
45+
--count;
5846
}
5947
}
6048
};
6149

62-
struct all : base {
63-
template <typename RQP, typename M>
64-
static auto run(auto &&tasks, auto &count, auto &&...args) {
50+
struct all {
51+
template <typename RQP, typename M, typename... Args>
52+
static auto run(auto &&tasks, auto &count, Args &&...args) {
6553
decltype(auto) q = RQP::template get_queue<0, M>(tasks);
66-
while (not std::empty(q)) {
67-
base::run<M>(q, count, args...);
54+
for (auto task = RQP::template pop<M>(q); task;
55+
task = RQP::template pop<M>(q)) {
56+
task->run(args...);
57+
--count;
6858
}
6959
}
7060
};

include/async/when_all.hpp

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,16 @@ template <typename E, typename... Sndrs>
171171
using error_senders = boost::mp11::mp_copy_if_q<boost::mp11::mp_list<Sndrs...>,
172172
is_error_sender<E>>;
173173

174+
template <typename Rcvr> struct base_op_state {
175+
[[no_unique_address]] Rcvr rcvr;
176+
// NOLINTNEXTLINE (readability-redundant-member-init)
177+
inplace_stop_source stop_source{};
178+
};
179+
174180
template <stdx::ct_string Name, typename Rcvr, typename... Sndrs>
175181
struct op_state
176-
: error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
182+
: base_op_state<Rcvr>,
183+
error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
177184
sub_op_state<op_state<Name, Rcvr, Sndrs...>, Rcvr, Sndrs,
178185
inplace_stop_token>... {
179186
template <typename S>
@@ -189,8 +196,8 @@ struct op_state
189196

190197
template <typename S, typename R>
191198
constexpr op_state(S &&s, R &&r)
192-
: sub_op_state_t<Sndrs>{std::forward<S>(s)}...,
193-
rcvr{std::forward<R>(r)} {}
199+
: base_op_state<Rcvr>{std::forward<R>(r)},
200+
sub_op_state_t<Sndrs>{std::forward<S>(s)}... {}
194201

195202
auto notify() -> void {
196203
if (--count == 0) {
@@ -200,14 +207,14 @@ struct op_state
200207

201208
template <typename... Args> auto notify_error(Args &&...args) -> void {
202209
this->store_error(std::forward<Args>(args)...);
203-
stop_source.request_stop();
210+
this->stop_source.request_stop();
204211
if (--count == 0) {
205212
complete();
206213
}
207214
}
208215

209216
auto notify_stopped() -> void {
210-
stop_source.request_stop();
217+
this->stop_source.request_stop();
211218
if (--count == 0) {
212219
complete();
213220
}
@@ -219,37 +226,40 @@ struct op_state
219226

220227
auto complete() -> void {
221228
stop_cb.reset();
222-
if (this->template release_error<Name, op_state>(std::move(rcvr))) {
223-
} else if (stop_source.stop_requested()) {
229+
if (this->template release_error<Name, op_state>(
230+
std::move(this->rcvr))) {
231+
} else if (this->stop_source.stop_requested()) {
224232
debug_signal<set_stopped_t::name,
225-
debug::erased_context_for<op_state>>(get_env(rcvr));
226-
set_stopped(std::move(rcvr));
233+
debug::erased_context_for<op_state>>(
234+
get_env(this->rcvr));
235+
set_stopped(std::move(this->rcvr));
227236
} else {
228237
using value_senders =
229238
boost::mp11::mp_copy_if<boost::mp11::mp_list<Sndrs...>,
230239
single_value_sender_t>;
231240
[&]<typename... Ss>(boost::mp11::mp_list<Ss...>) {
232241
debug_signal<set_value_t::name,
233242
debug::erased_context_for<op_state>>(
234-
get_env(rcvr));
243+
get_env(this->rcvr));
235244
stdx::tuple_cat(
236245
static_cast<sub_op_state_t<Ss> &&>(*this).load()...)
237246
.apply([&](auto &...args) {
238-
set_value(std::move(rcvr), std::move(args)...);
247+
set_value(std::move(this->rcvr), std::move(args)...);
239248
});
240249
}(value_senders{});
241250
}
242251
}
243252

244253
constexpr auto start() & -> void {
245254
debug_signal<"start", debug::erased_context_for<op_state>>(
246-
get_env(rcvr));
247-
stop_cb.emplace(async::get_stop_token(get_env(rcvr)),
248-
stop_callback_fn{std::addressof(stop_source)});
249-
if (stop_source.stop_requested()) {
255+
get_env(this->rcvr));
256+
stop_cb.emplace(async::get_stop_token(get_env(this->rcvr)),
257+
stop_callback_fn{std::addressof(this->stop_source)});
258+
if (this->stop_source.stop_requested()) {
250259
debug_signal<set_stopped_t::name,
251-
debug::erased_context_for<op_state>>(get_env(rcvr));
252-
set_stopped(std::move(rcvr));
260+
debug::erased_context_for<op_state>>(
261+
get_env(this->rcvr));
262+
set_stopped(std::move(this->rcvr));
253263
} else {
254264
count = sizeof...(Sndrs);
255265
(async::start(static_cast<sub_op_state_t<Sndrs> &>(*this).ops),
@@ -258,21 +268,24 @@ struct op_state
258268
}
259269

260270
[[nodiscard]] auto get_stop_token() const -> inplace_stop_token {
261-
return stop_source.get_token();
271+
return this->stop_source.get_token();
262272
}
263273

264274
using stop_callback_t =
265275
stop_callback_for_t<stop_token_of_t<env_of_t<Rcvr>>, stop_callback_fn>;
266276

267-
[[no_unique_address]] Rcvr rcvr;
268277
stdx::atomic<std::size_t> count;
269-
inplace_stop_source stop_source;
270278
std::optional<stop_callback_t> stop_cb{};
271279
};
272280

281+
template <typename Rcvr> struct nostop_base_op_state {
282+
[[no_unique_address]] Rcvr rcvr;
283+
};
284+
273285
template <stdx::ct_string Name, typename Rcvr, typename... Sndrs>
274286
struct nostop_op_state
275-
: error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
287+
: nostop_base_op_state<Rcvr>,
288+
error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
276289
sub_op_state<nostop_op_state<Name, Rcvr, Sndrs...>, Rcvr, Sndrs,
277290
never_stop_token>... {
278291
template <typename S>
@@ -284,8 +297,8 @@ struct nostop_op_state
284297

285298
template <typename S, typename R>
286299
constexpr nostop_op_state(S &&s, R &&r)
287-
: sub_op_state_t<Sndrs>{std::forward<S>(s)}...,
288-
rcvr{std::forward<R>(r)} {}
300+
: nostop_base_op_state<Rcvr>{std::forward<R>(r)},
301+
sub_op_state_t<Sndrs>{std::forward<S>(s)}... {}
289302

290303
auto notify() -> void {
291304
if (--count == 0) {
@@ -308,40 +321,42 @@ struct nostop_op_state
308321

309322
auto complete() -> void {
310323
if (this->template release_error<Name, nostop_op_state>(
311-
std::move(rcvr))) {
324+
std::move(this->rcvr))) {
312325
} else {
313326
using value_senders =
314327
boost::mp11::mp_copy_if<boost::mp11::mp_list<Sndrs...>,
315328
single_value_sender_t>;
316329
[&]<typename... Ss>(boost::mp11::mp_list<Ss...>) {
317330
debug_signal<set_value_t::name,
318331
debug::erased_context_for<nostop_op_state>>(
319-
get_env(rcvr));
332+
get_env(this->rcvr));
320333
stdx::tuple_cat(
321334
static_cast<sub_op_state_t<Ss> &&>(*this).load()...)
322335
.apply([&](auto &...args) {
323-
set_value(std::move(rcvr), std::move(args)...);
336+
set_value(std::move(this->rcvr), std::move(args)...);
324337
});
325338
}(value_senders{});
326339
}
327340
}
328341

329342
constexpr auto start() & -> void {
330343
debug_signal<"start", debug::erased_context_for<nostop_op_state>>(
331-
get_env(rcvr));
344+
get_env(this->rcvr));
332345
count = sizeof...(Sndrs);
333346
(async::start(static_cast<sub_op_state_t<Sndrs> &>(*this).ops), ...);
334347
}
335348

336-
[[nodiscard]] auto get_stop_token() const -> never_stop_token { return {}; }
349+
[[nodiscard]] static auto get_stop_token() -> never_stop_token {
350+
return {};
351+
}
337352

338-
[[no_unique_address]] Rcvr rcvr;
339353
stdx::atomic<std::size_t> count;
340354
};
341355

342356
template <stdx::ct_string Name, typename Rcvr, typename... Sndrs>
343357
struct sync_op_state
344-
: error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
358+
: nostop_base_op_state<Rcvr>,
359+
error_op_state<env_of_t<Rcvr>, error_senders<env_of_t<Rcvr>, Sndrs...>>,
345360
sub_op_state<sync_op_state<Name, Rcvr, Sndrs...>, Rcvr, Sndrs,
346361
never_stop_token>... {
347362
template <typename S>
@@ -353,8 +368,8 @@ struct sync_op_state
353368

354369
template <typename S, typename R>
355370
constexpr sync_op_state(S &&s, R &&r)
356-
: sub_op_state_t<Sndrs>{std::forward<S>(s)}...,
357-
rcvr{std::forward<R>(r)} {}
371+
: nostop_base_op_state<Rcvr>{std::forward<R>(r)},
372+
sub_op_state_t<Sndrs>{std::forward<S>(s)}... {}
358373

359374
auto notify() -> void {}
360375

@@ -370,27 +385,27 @@ struct sync_op_state
370385

371386
auto complete() -> void {
372387
if (this->template release_error<Name, sync_op_state>(
373-
std::move(rcvr))) {
388+
std::move(this->rcvr))) {
374389
} else {
375390
using value_senders =
376391
boost::mp11::mp_copy_if<boost::mp11::mp_list<Sndrs...>,
377392
single_value_sender_t>;
378393
[&]<typename... Ss>(boost::mp11::mp_list<Ss...>) {
379394
debug_signal<set_value_t::name,
380395
debug::erased_context_for<sync_op_state>>(
381-
get_env(rcvr));
396+
get_env(this->rcvr));
382397
stdx::tuple_cat(
383398
static_cast<sub_op_state_t<Ss> &&>(*this).load()...)
384399
.apply([&](auto &...args) {
385-
set_value(std::move(rcvr), std::move(args)...);
400+
set_value(std::move(this->rcvr), std::move(args)...);
386401
});
387402
}(value_senders{});
388403
}
389404
}
390405

391406
constexpr auto start() & -> void {
392407
debug_signal<"start", debug::erased_context_for<sync_op_state>>(
393-
get_env(rcvr));
408+
get_env(this->rcvr));
394409
(async::start(static_cast<sub_op_state_t<Sndrs> &>(*this).ops), ...);
395410
complete();
396411
}
@@ -399,9 +414,9 @@ struct sync_op_state
399414
return prop{completes_synchronously_t{}, std::true_type{}};
400415
}
401416

402-
[[nodiscard]] auto get_stop_token() const -> never_stop_token { return {}; }
403-
404-
[[no_unique_address]] Rcvr rcvr;
417+
[[nodiscard]] static auto get_stop_token() -> never_stop_token {
418+
return {};
419+
}
405420
};
406421

407422
template <typename S, typename R>

0 commit comments

Comments
 (0)