Skip to content

Commit da10d37

Browse files
authored
coro::when_any (#298)
Adds a new construct that will return the first task's result upon its completion. All other task results will be discarded/detached/orphaned. There is two ways to currently invoke when_any, one with a std::stop_token that will signal to the other tasks that a task has already completed, this requires the user to check for that stop token requesting a stop, it is not automatic. The other way is fire and forget, all tasks will be required to complete but only the first tasks result will be used. This method isn't particularly recommended but the API is available in the case where a stop token isn't required. EMSCRIPTEN does not support std::stop_source|token so this new feature is currently disabled on that platform, I do not want to shim it in. Closes #279
1 parent fec6779 commit da10d37

File tree

11 files changed

+399
-4
lines changed

11 files changed

+399
-4
lines changed

.githooks/pre-commit

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,8 @@ template_contents=$(cat 'README.md')
8282
example_contents=$(cat 'examples/coro_when_all.cpp')
8383
echo "${template_contents/\$\{EXAMPLE_CORO_WHEN_ALL\}/$example_contents}" > README.md
8484

85+
template_contents=$(cat 'README.md')
86+
example_contents=$(cat 'examples/coro_when_any.cpp')
87+
echo "${template_contents/\$\{EXAMPLE_CORO_WHEN_ANY\}/$example_contents}" > README.md
88+
8589
git add README.md

.githooks/readme-template.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Higher level coroutine constructs
1717
- [coro::sync_wait(awaitable)](#sync_wait)
1818
- [coro::when_all(awaitable...) -> awaitable](#when_all)
19+
- [coro::when_any(awaitable...) -> awaitable](#when_any)
1920
- [coro::task<T>](#task)
2021
- [coro::generator<T>](#generator)
2122
- [coro::event](#event)
@@ -70,7 +71,7 @@ Offload Result = 20
7071
```
7172

7273
### when_all
73-
The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutinne context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto a scheduler like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
74+
The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutine context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
7475

7576
```C++
7677
${EXAMPLE_CORO_WHEN_ALL}
@@ -87,6 +88,21 @@ $ ./examples/coro_when_all
8788
first: 1.21 second: 20
8889
```
8990

91+
### when_any
92+
The `when_any` construct can be used within coroutines to await a set of tasks and only return the result of the first task that completes. This can also be used outside of a coroutine context in conjunction with `sync_wait` to await the first result. Each task passed into `when_any` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
93+
94+
```C++
95+
${EXAMPLE_CORO_WHEN_ANY}
96+
```
97+
98+
Expected output:
99+
```bash
100+
$ ./examples/coro_when_any
101+
result = 1
102+
result = -1
103+
```
104+
105+
90106
### task
91107
The `coro::task<T>` is the main coroutine building block within `libcoro`. Use task to create your coroutines and `co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them.
92108

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ set(LIBCORO_SOURCE_FILES
9393
include/coro/thread_pool.hpp src/thread_pool.cpp
9494
include/coro/time.hpp
9595
include/coro/when_all.hpp
96+
include/coro/when_any.hpp
9697
)
9798

9899
if(LIBCORO_FEATURE_NETWORKING)

README.md

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Higher level coroutine constructs
1717
- [coro::sync_wait(awaitable)](#sync_wait)
1818
- [coro::when_all(awaitable...) -> awaitable](#when_all)
19+
- [coro::when_any(awaitable...) -> awaitable](#when_any)
1920
- [coro::task<T>](#task)
2021
- [coro::generator<T>](#generator)
2122
- [coro::event](#event)
@@ -101,7 +102,7 @@ Offload Result = 20
101102
```
102103

103104
### when_all
104-
The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutinne context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto a scheduler like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
105+
The `when_all` construct can be used within coroutines to await a set of tasks, or it can be used outside coroutine context in conjunction with `sync_wait` to await multiple tasks. Each task passed into `when_all` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
105106

106107
```C++
107108
#include <coro/coro.hpp>
@@ -169,6 +170,68 @@ $ ./examples/coro_when_all
169170
first: 1.21 second: 20
170171
```
171172

173+
### when_any
174+
The `when_any` construct can be used within coroutines to await a set of tasks and only return the result of the first task that completes. This can also be used outside of a coroutine context in conjunction with `sync_wait` to await the first result. Each task passed into `when_any` will initially be executed serially by the calling thread so it is recommended to offload the tasks onto an executor like `coro::thread_pool` or `coro::io_scheduler` so they can execute in parallel.
175+
176+
```C++
177+
#include <coro/coro.hpp>
178+
#include <iostream>
179+
180+
int main()
181+
{
182+
// Create a scheduler to execute all tasks in parallel and also so we can
183+
// suspend a task to act like a timeout event.
184+
auto scheduler = coro::io_scheduler::make_shared();
185+
186+
// This task will behave like a long running task and will produce a valid result.
187+
auto make_long_running_task = [](std::shared_ptr<coro::io_scheduler> scheduler,
188+
std::chrono::milliseconds execution_time) -> coro::task<int64_t>
189+
{
190+
// Schedule the task to execute in parallel.
191+
co_await scheduler->schedule();
192+
// Fake doing some work...
193+
co_await scheduler->yield_for(execution_time);
194+
// Return the result.
195+
co_return 1;
196+
};
197+
198+
auto make_timeout_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<int64_t>
199+
{
200+
// Schedule a timer to be fired so we know the task timed out.
201+
co_await scheduler->schedule_after(std::chrono::milliseconds{100});
202+
co_return -1;
203+
};
204+
205+
// Example showing the long running task completing first.
206+
{
207+
std::vector<coro::task<int64_t>> tasks{};
208+
tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{50}));
209+
tasks.emplace_back(make_timeout_task(scheduler));
210+
211+
auto result = coro::sync_wait(coro::when_any(std::move(tasks)));
212+
std::cout << "result = " << result << "\n";
213+
}
214+
215+
// Example showing the long running task timing out.
216+
{
217+
std::vector<coro::task<int64_t>> tasks{};
218+
tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{500}));
219+
tasks.emplace_back(make_timeout_task(scheduler));
220+
221+
auto result = coro::sync_wait(coro::when_any(std::move(tasks)));
222+
std::cout << "result = " << result << "\n";
223+
}
224+
}
225+
```
226+
227+
Expected output:
228+
```bash
229+
$ ./examples/coro_when_any
230+
result = 1
231+
result = -1
232+
```
233+
234+
172235
### task
173236
The `coro::task<T>` is the main coroutine building block within `libcoro`. Use task to create your coroutines and `co_await` or `co_yield` tasks within tasks to perform asynchronous operations, lazily evaluation or even spreading work out across a `coro::thread_pool`. Tasks are lightweight and only begin execution upon awaiting them.
174237

examples/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,10 @@ if(LIBCORO_FEATURE_NETWORKING)
6969
add_executable(coro_http_200_ok_server coro_http_200_ok_server.cpp)
7070
target_link_libraries(coro_http_200_ok_server PUBLIC libcoro)
7171
target_compile_options(coro_http_200_ok_server PUBLIC ${LIBCORO_EXAMPLE_OPTIONS})
72+
73+
if(NOT EMSCRIPTEN)
74+
add_executable(coro_when_any coro_when_any.cpp)
75+
target_link_libraries(coro_when_any PUBLIC libcoro)
76+
target_compile_options(coro_when_any PUBLIC ${LIBCORO_EXAMPLE_OPTIONS})
77+
endif()
7278
endif()

examples/coro_when_any.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#include <coro/coro.hpp>
2+
#include <iostream>
3+
4+
int main()
5+
{
6+
// Create a scheduler to execute all tasks in parallel and also so we can
7+
// suspend a task to act like a timeout event.
8+
auto scheduler = coro::io_scheduler::make_shared();
9+
10+
// This task will behave like a long running task and will produce a valid result.
11+
auto make_long_running_task = [](std::shared_ptr<coro::io_scheduler> scheduler,
12+
std::chrono::milliseconds execution_time) -> coro::task<int64_t>
13+
{
14+
// Schedule the task to execute in parallel.
15+
co_await scheduler->schedule();
16+
// Fake doing some work...
17+
co_await scheduler->yield_for(execution_time);
18+
// Return the result.
19+
co_return 1;
20+
};
21+
22+
auto make_timeout_task = [](std::shared_ptr<coro::io_scheduler> scheduler) -> coro::task<int64_t>
23+
{
24+
// Schedule a timer to be fired so we know the task timed out.
25+
co_await scheduler->schedule_after(std::chrono::milliseconds{100});
26+
co_return -1;
27+
};
28+
29+
// Example showing the long running task completing first.
30+
{
31+
std::vector<coro::task<int64_t>> tasks{};
32+
tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{50}));
33+
tasks.emplace_back(make_timeout_task(scheduler));
34+
35+
auto result = coro::sync_wait(coro::when_any(std::move(tasks)));
36+
std::cout << "result = " << result << "\n";
37+
}
38+
39+
// Example showing the long running task timing out.
40+
{
41+
std::vector<coro::task<int64_t>> tasks{};
42+
tasks.emplace_back(make_long_running_task(scheduler, std::chrono::milliseconds{500}));
43+
tasks.emplace_back(make_timeout_task(scheduler));
44+
45+
auto result = coro::sync_wait(coro::when_any(std::move(tasks)));
46+
std::cout << "result = " << result << "\n";
47+
}
48+
}

include/coro/coro.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@
4141
#include "coro/thread_pool.hpp"
4242
#include "coro/time.hpp"
4343
#include "coro/when_all.hpp"
44+
#include "coro/when_any.hpp"

include/coro/when_all.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ class when_all_ready_awaitable
180180
when_all_ready_awaitable(when_all_ready_awaitable&& other) noexcept(
181181
std::is_nothrow_move_constructible_v<task_container_type>)
182182
: m_latch(std::move(other.m_latch)),
183-
m_tasks(std::move(m_tasks))
183+
m_tasks(std::move(other.m_tasks))
184184
{
185185
}
186186

187187
auto operator=(const when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
188-
auto operator=(when_all_ready_awaitable&) -> when_all_ready_awaitable& = delete;
188+
auto operator=(when_all_ready_awaitable&&) -> when_all_ready_awaitable& = delete;
189189

190190
auto operator co_await() & noexcept
191191
{

include/coro/when_any.hpp

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#pragma once
2+
3+
// EMSCRIPTEN does not currently support std::jthread or std::stop_source|token.
4+
#ifndef EMSCRIPTEN
5+
6+
#include "coro/concepts/awaitable.hpp"
7+
#include "coro/detail/task_self_deleting.hpp"
8+
#include "coro/event.hpp"
9+
#include "coro/mutex.hpp"
10+
#include "coro/task.hpp"
11+
12+
#include <atomic>
13+
#include <cassert>
14+
#include <coroutine>
15+
#include <stop_token>
16+
#include <utility>
17+
#include <vector>
18+
19+
namespace coro
20+
{
21+
22+
namespace detail
23+
{
24+
25+
template<concepts::awaitable awaitable, typename return_type>
26+
static auto make_when_any_task(
27+
awaitable a,
28+
coro::mutex& m,
29+
std::atomic<bool>& return_value_set,
30+
coro::event& notify,
31+
std::optional<return_type>& return_value) -> coro::task<void>
32+
{
33+
auto result = co_await static_cast<awaitable&&>(a);
34+
co_await m.lock();
35+
// Its important to only touch return_value and notify once since their lifetimes will be destroyed
36+
// after being set ane notified the first time.
37+
if (return_value_set.load(std::memory_order::acquire) == false)
38+
{
39+
return_value_set.store(true, std::memory_order::release);
40+
return_value = std::move(result);
41+
notify.set();
42+
}
43+
44+
co_return;
45+
}
46+
47+
template<
48+
std::ranges::range range_type,
49+
concepts::awaitable awaitable_type = std::ranges::range_value_t<range_type>,
50+
typename return_type = typename concepts::awaitable_traits<awaitable_type>::awaiter_return_type,
51+
typename return_type_base = std::remove_reference_t<return_type>>
52+
static auto make_when_any_controller_task(
53+
range_type awaitables, coro::event& notify, std::optional<return_type_base>& return_value)
54+
-> coro::detail::task_self_deleting
55+
{
56+
// These must live for as long as the longest running when_any task since each task tries to see
57+
// if it was the first to complete. Only the very first task to complete will set the return_value
58+
// and notify.
59+
coro::mutex m{};
60+
std::atomic<bool> return_value_set{false};
61+
62+
// This detatched task will maintain the lifetime of all the when_any tasks.
63+
std::vector<coro::task<void>> tasks{};
64+
65+
if constexpr (std::ranges::sized_range<range_type>)
66+
{
67+
tasks.reserve(std::size(awaitables));
68+
}
69+
70+
for (auto&& a : awaitables)
71+
{
72+
tasks.emplace_back(make_when_any_task<awaitable_type, return_type_base>(
73+
std::move(a), m, return_value_set, notify, return_value));
74+
}
75+
76+
co_await coro::when_all(std::move(tasks));
77+
co_return;
78+
}
79+
80+
} // namespace detail
81+
82+
template<
83+
std::ranges::range range_type,
84+
concepts::awaitable awaitable_type = std::ranges::range_value_t<range_type>,
85+
typename return_type = typename concepts::awaitable_traits<awaitable_type>::awaiter_return_type,
86+
typename return_type_base = std::remove_reference_t<return_type>>
87+
[[nodiscard]] auto when_any(std::stop_source stop_source, range_type awaitables) -> coro::task<return_type_base>
88+
{
89+
// Using an std::optional to prevent the need to default construct the type on the stack.
90+
std::optional<return_type_base> return_value{std::nullopt};
91+
coro::event notify{};
92+
93+
auto controller_task =
94+
detail::make_when_any_controller_task(std::forward<range_type>(awaitables), notify, return_value);
95+
controller_task.handle().resume();
96+
97+
co_await notify;
98+
stop_source.request_stop();
99+
co_return std::move(return_value.value());
100+
}
101+
102+
template<
103+
std::ranges::range range_type,
104+
concepts::awaitable awaitable_type = std::ranges::range_value_t<range_type>,
105+
typename return_type = typename concepts::awaitable_traits<awaitable_type>::awaiter_return_type,
106+
typename return_type_base = std::remove_reference_t<return_type>>
107+
[[nodiscard]] auto when_any(range_type awaitables) -> coro::task<return_type_base>
108+
{
109+
std::optional<return_type_base> return_value{std::nullopt};
110+
coro::event notify{};
111+
112+
auto controller_task =
113+
detail::make_when_any_controller_task(std::forward<range_type>(awaitables), notify, return_value);
114+
controller_task.handle().resume();
115+
116+
co_await notify;
117+
co_return std::move(return_value.value());
118+
}
119+
120+
} // namespace coro
121+
122+
#endif // EMSCRIPTEN

test/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ set(LIBCORO_TEST_SOURCE_FILES
1717
catch_amalgamated.cpp
1818
)
1919

20+
if(NOT EMSCRIPTEN)
21+
list(APPEND LIBCORO_TEST_SOURCE_FILES
22+
test_when_any.cpp
23+
)
24+
endif()
25+
2026
if(LIBCORO_FEATURE_NETWORKING)
2127
list(APPEND LIBCORO_TEST_SOURCE_FILES
2228
net/test_ip_address.cpp

0 commit comments

Comments
 (0)