From 7fb48e31f10bd993133c7a8640fc690885f80885 Mon Sep 17 00:00:00 2001 From: Robert Leahy Date: Wed, 26 Mar 2025 22:00:02 -0400 Subject: [PATCH] asioexec::completion_token & ::use_sender Adds two completion tokens for interop with Asio (either Boost.Asio or standalone Asio). asioexec::completion_token performs the most basic transformations necessary to transform an Asio initiating function into a sender factory: - The initiating function returns a sender - Initiation is deferred until the above-mentioned sender is connected and the resulting operation state is started - The completion handler provided to the initiation (see asio:: async_initiate) has the following properties: - Invocation results in the arguments thereto being sent via a value completion signal (this means that errors transmitted via a leading error_code parameter (i.e. in Asio style) are delivered via the value channel, see below) - Abandonment thereof (i.e. allowing the lifetime of the completion handler, and all objects transitively derived by moving therefrom, to end without invoking any of them) results in a stopped completion signal - Any exception thrown from any intermediate completion handler, or the final completion handler, is sent via an error completion signal with a std::exception_ptr representing that exception (this is accomplished by wrapping the associated executor) - The cancellation slot is connected to a cancellation signal which is sent when a stop request is received via the receiver's associated stop token The fact that invocations of the completion handler are passed to the value channel untouched reflects the design intent that the above- described completion token perform only "the most basic transformations necessary." This means that the full context of partial success must be made available and since the error channel is unary this must be transmitted in the value channel. For a more ergonomic experience than that described above asioexec:: use_sender is also provided. This uses asioexec::completion_token to adapt an Asio initiating function into a sender factory and wraps the returned sender with an additional layer which performs the following transformations to value completion signals with a leading error_code parameter (note that when configured for standalone Asio std::error_code is matched whereas when configured for Boost.Asio both boost::system:: error_code and std::error_code are matched): - If that argument compares equal to errc::operation_cancelled transforms the value completion signal into a stopped completion signal, otherwise - If that argument is truthy transforms the value completion signal into an error completion signal with an appropriate std::exception_ptr (i.e. one which points to a std::system_error for std::error_code, boost::system::system_error for boost::system::error_code), otherwise - Sends the remainder of the arguments (i.e. all but the error_code) as a value completion signal --- .gitignore | 1 + CMakeLists.txt | 27 ++ include/asioexec/asio_config.hpp.in | 40 ++ include/asioexec/completion_token.hpp | 588 ++++++++++++++++++++++++ include/asioexec/use_sender.hpp | 220 +++++++++ test/CMakeLists.txt | 4 + test/asioexec/CMakeLists.txt | 18 + test/asioexec/test_completion_token.cpp | 585 +++++++++++++++++++++++ test/asioexec/test_use_sender.cpp | 243 ++++++++++ 9 files changed, 1726 insertions(+) create mode 100644 include/asioexec/asio_config.hpp.in create mode 100644 include/asioexec/completion_token.hpp create mode 100644 include/asioexec/use_sender.hpp create mode 100644 test/asioexec/CMakeLists.txt create mode 100644 test/asioexec/test_completion_token.cpp create mode 100644 test/asioexec/test_use_sender.cpp diff --git a/.gitignore b/.gitignore index 46ecf5658..1e75f12a8 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ callgrind.* a.out *.code-workspace sanitizer-ignorelist.txt +/include/asioexec/asio_config.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index aed43a485..f1f629bc8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -433,6 +433,13 @@ if(STDEXEC_ENABLE_ASIO) set(STDEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/execpools/asio) configure_file(include/execpools/asio/asio_config.hpp.in ${STDEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp) + set(ASIOEXEC_USES_BOOST ${STDEXEC_ASIO_USES_BOOST}) + set(ASIOEXEC_USES_STANDALONE ${STDEXEC_ASIO_USES_STANDALONE}) + + file(GLOB_RECURSE asioexec_sources include/asioexec/*.hpp) + set(ASIOEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/asioexec) + configure_file(include/asioexec/asio_config.hpp.in ${ASIOEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp) + if(${STDEXEC_ASIO_USES_BOOST}) set(BOOST_ENABLE_COMPATIBILITY_TARGETS TRUE) rapids_cpm_find(Boost 1.86.0 @@ -449,6 +456,16 @@ if(STDEXEC_ENABLE_ASIO) STDEXEC::stdexec Boost::boost ) + + add_library(asioexec_boost INTERFACE ${asioexec_sources}) + list(APPEND stdexec_export_targets asioexec_boost) + add_library(STDEXEC::asioexec_boost ALIAS asioexec_boost) + + target_link_libraries(asioexec_boost + INTERFACE + STDEXEC::stdexec + Boost::boost + ) elseif(${STDEXEC_ASIO_USES_STANDALONE}) include(cmake/import_standalone_asio.cmake) import_standalone_asio( @@ -464,6 +481,16 @@ if(STDEXEC_ENABLE_ASIO) STDEXEC::stdexec asio ) + + add_library(asioexec_asio INTERFACE ${asioexec_sources}) + list(APPEND stdexec_export_targets asioexec_asio) + add_library(STDEXEC::asioexec_asio ALIAS asioexec_asio) + + target_link_libraries(asioexec_asio + INTERFACE + STDEXEC::stdexec + asio + ) else() message(FATAL_ERROR "ASIO implementation is not configured") endif() diff --git a/include/asioexec/asio_config.hpp.in b/include/asioexec/asio_config.hpp.in new file mode 100644 index 000000000..6046a5e43 --- /dev/null +++ b/include/asioexec/asio_config.hpp.in @@ -0,0 +1,40 @@ +/* + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#cmakedefine01 ASIOEXEC_USES_STANDALONE +#cmakedefine01 ASIOEXEC_USES_BOOST + +#if ASIOEXEC_USES_BOOST +# include +# include +# include +# include +# define ASIOEXEC_ASIO_NAMESPACE boost::asio +#elif ASIOEXEC_USES_STANDALONE +# include +# include +# define ASIOEXEC_ASIO_NAMESPACE asio +#endif + +namespace asioexec { +#if ASIOEXEC_USES_BOOST + namespace asio_impl = ::boost::asio; + using error_code = ::boost::system::error_code; + using error_condition = ::boost::system::error_condition; + namespace errc = ::boost::system::errc; + using system_error = ::boost::system::system_error; +#elif ASIOEXEC_USES_STANDALONE + namespace asio_impl = ::asio; + using error_code = std::error_code; + using error_condition = std::error_condition; + using errc = std::errc; + using system_error = std::system_error; +#endif +} diff --git a/include/asioexec/completion_token.hpp b/include/asioexec/completion_token.hpp new file mode 100644 index 000000000..97729a3ea --- /dev/null +++ b/include/asioexec/completion_token.hpp @@ -0,0 +1,588 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asioexec { + +namespace detail::completion_token { + +// The machinery from here down through completion_token::set_value is to +// workaround the fact that Asio allows operations to declare signatures which +// aren't reflective of the actual cv- & ref-qualifications with which the +// values will actually be sent. This means that one or more conversions may be +// required to actually send the values correctly which may throw and which +// therefore must occur before the call to ::stdexec::set_value. +// +// The technique used to achieve this is to utilize an unevaluated call against +// overload_set to determine the matching Asio signature. Then convert is used +// to transform the values actually sent by the Asio operation so that they +// match the selected completion signature (this may throw). The converted +// values may then be sent to the receiver with no possibility of throwing. +template +struct has_function_call_operator { + struct type {}; + std::tuple<> operator()(type) const; +}; +template +struct has_function_call_operator<::stdexec::set_value_t(Args...)> { + std::tuple operator()(Args...) const; +}; + +template +struct overload_set; +template +struct overload_set<::stdexec::completion_signatures> + : has_function_call_operator... +{ + using has_function_call_operator::operator()...; +}; + +template + requires std::is_same_v +constexpr T&& convert(U&& u) noexcept { + return static_cast(u); +} +template +constexpr std::remove_cvref_t convert(U&& u) { + return static_cast(u); +} + +template +constexpr void set_value_impl( + Receiver&& r, + std::index_sequence, + Args&&... args) +{ + ::stdexec::set_value( + static_cast(r), + completion_token::convert>( + static_cast(args))...); +} + +template +constexpr void set_value(Receiver&& r, Args&&... args) { + using tuple = decltype( + std::declval>()( + std::declval()...)); + completion_token::set_value_impl( + static_cast(r), + std::make_index_sequence>{}, + static_cast(args)...); +} + +template +struct signature; +template +struct signature { + using type = ::stdexec::set_value_t(Args...); +}; + +template +using completion_signatures = ::stdexec::completion_signatures< + typename signature::type..., + ::stdexec::set_error_t(std::exception_ptr), + ::stdexec::set_stopped_t()>; + +struct stop_callback { + constexpr explicit stop_callback(asio_impl::cancellation_signal& signal) + noexcept : signal_(signal) {} + void operator()() && noexcept { + signal_.emit(asio_impl::cancellation_type::partial); + } +private: + asio_impl::cancellation_signal& signal_; +}; + +template +class completion_handler; + +template +struct operation_state_base { + struct frame_; + template + requires std::constructible_from + explicit operation_state_base(T&& t) noexcept( + std::is_nothrow_constructible_v && + std::is_nothrow_default_constructible_v) + : r_(static_cast(t)) + {} + Receiver r_; + asio_impl::cancellation_signal signal_; + std::recursive_mutex m_; + frame_* frames_{nullptr}; + std::exception_ptr ex_; + completion_handler* h_{nullptr}; + class frame_ { + operation_state_base& self_; + std::unique_lock l_; + frame_* prev_; + public: + explicit frame_(operation_state_base& self) noexcept + : self_(self), + l_(self.m_), + prev_(self.frames_) + { + self.frames_ = this; + } + frame_(const frame_&) = delete; + ~frame_() noexcept { + if (l_) { + STDEXEC_ASSERT(self_.frames_ == this); + self_.frames_ = prev_; + if (!self_.frames_ && !self_.h_) { + // We are the last frame and the handler is gone so it's up to us to + // finalize the operation + l_.unlock(); + if (self_.ex_) { + ::stdexec::set_error( + static_cast(self_.r_), + std::move(self_.ex_)); + } else { + ::stdexec::set_stopped(static_cast(self_.r_)); + } + } + } + } + explicit operator bool() const noexcept { + return bool(l_); + } + void release() noexcept { + auto ptr = this; + do { + STDEXEC_ASSERT(ptr->l_); + STDEXEC_ASSERT(self_.frames_ == ptr); + ptr = ptr->prev_; + self_.frames_->l_.unlock(); + self_.frames_->prev_ = nullptr; + self_.frames_ = ptr; + } while (ptr); + } + }; + template + void run_(F&& f) noexcept { + const frame_ frame(*this); + try { + static_cast(f)(); + } catch (...) { + STDEXEC_ASSERT(frame); + // Do not overwrite the first exception encountered + if (!ex_) { + ex_ = std::current_exception(); + } + } + } +protected: + std::optional< + ::stdexec::stop_callback_for_t< + ::stdexec::stop_token_of_t< + ::stdexec::env_of_t>, + stop_callback>> callback_; +}; + +template +class completion_handler { + operation_state_base* self_; +public: + explicit completion_handler(operation_state_base& self) + noexcept + : self_(&self) + { + STDEXEC_ASSERT(!self_->h_); + self_->h_ = this; + } + completion_handler(completion_handler&& other) noexcept + : self_(std::exchange(other.self_, nullptr)) + { + if (self_) { + self_->h_ = this; + } + } + completion_handler& operator=(const completion_handler&) = delete; + ~completion_handler() noexcept { + if (self_) { + // When this goes out of scope it might send set stopped or set error, or + // it might defer that to the executor frames above us on the call stack + // (if any) + const typename operation_state_base::frame_ frame( + *self_); + self_->h_ = nullptr; + } + } + template + void operator()(Args&&... args) && noexcept { + STDEXEC_ASSERT(self_); + { + const std::lock_guard l(self_->m_); + if (self_->frames_) { + self_->frames_->release(); + } + STDEXEC_ASSERT(!self_->frames_); + } + if (self_->ex_) { + ::stdexec::set_error( + static_cast(self_->r_), + std::move(self_->ex_)); + } else { + try { + completion_token::set_value( + static_cast(self_->r_), + static_cast(args)...); + } catch (...) { + ::stdexec::set_error( + static_cast(self_->r_), + std::current_exception()); + } + } + // Makes destructor a no op, the operation is complete so there's nothing + // more to do + self_ = nullptr; + } + using cancellation_slot_type = asio_impl::cancellation_slot; + auto get_cancellation_slot() const noexcept { + STDEXEC_ASSERT(self_); + return self_->signal_.slot(); + } + operation_state_base& state() const noexcept { + STDEXEC_ASSERT(self_); + return *self_; + } +}; + +template< + typename Signatures, + typename Receiver, + typename Initiation, + typename Args> +class operation_state : operation_state_base { + using base_ = operation_state_base; + Initiation init_; + Args args_; +public: + template + requires + std::constructible_from && + std::constructible_from && + std::constructible_from + explicit operation_state(T&& t, U&& u, V&& v) noexcept( + std::is_nothrow_constructible_v && + std::is_nothrow_constructible_v && + std::is_nothrow_constructible_v) + : base_(static_cast(t)), + init_(static_cast(u)), + args_(static_cast(v)) + {} + void start() & noexcept { + const typename base_::frame_ frame(*this); + try { + std::apply( + [&](auto&&... args) { + std::invoke( + static_cast(init_), + completion_handler(*this), + static_cast(args)...); + }, + std::move(args_)); + } catch (...) { + if (!base_::ex_) { + base_::ex_ = std::current_exception(); + } + return; + } + // Removing this and attempting to simply construct the stop callback with + // the operation state seems to lead to missed cancellations, therefore + // it's likely that Asio cancellation signals aren't "sticky" in the same + // way that stop sources and tokens are + base_::callback_.emplace( + ::stdexec::get_stop_token(::stdexec::get_env(base_::r_)), + stop_callback(base_::signal_)); + } +}; + +template +class sender { + using args_type_ = std::tuple...>; +public: + using sender_concept = ::stdexec::sender_t; + template + requires + std::constructible_from && + std::constructible_from + explicit constexpr sender( + T&& t, + Us&&... us) noexcept( + std::is_nothrow_constructible_v && + std::is_nothrow_constructible_v) + : init_(static_cast(t)), + args_(static_cast(us)...) + {} + template + requires + std::is_copy_constructible_v && + std::is_copy_constructible_v + consteval Signatures get_completion_signatures(const Env&) const& noexcept { + return {}; + } + template + requires + std::is_move_constructible_v && + std::is_move_constructible_v + consteval Signatures get_completion_signatures(const Env&) && noexcept { + return {}; + } + template + requires ::stdexec::receiver_of< + std::remove_cvref_t, + ::stdexec::completion_signatures_of_t< + const sender&, + ::stdexec::env_of_t>> + constexpr auto connect(Receiver&& receiver) const& noexcept( + std::is_nothrow_constructible_v< + operation_state< + Signatures, + std::remove_cvref_t, + Initiation, + args_type_>, + Receiver, + const Initiation&, + const args_type_&>) + { + return operation_state< + Signatures, + std::remove_cvref_t, + Initiation, + args_type_>( + static_cast(receiver), + init_, + args_); + } + template + requires ::stdexec::receiver_of< + std::remove_cvref_t, + ::stdexec::completion_signatures_of_t< + sender, + ::stdexec::env_of_t>> + constexpr auto connect(Receiver&& receiver) && noexcept( + std::is_nothrow_constructible_v< + operation_state< + Signatures, + std::remove_cvref_t, + Initiation, + args_type_>, + Receiver, + Initiation, + args_type_>) + { + return operation_state< + Signatures, + std::remove_cvref_t, + Initiation, + args_type_>( + static_cast(receiver), + static_cast(init_), + static_cast(args_)); + } +private: + Initiation init_; + args_type_ args_; +}; + +template +class executor { + operation_state_base& self_; + Executor ex_; + template + constexpr auto wrap_(F f) const noexcept( + std::is_nothrow_move_constructible_v) + { + return [&self = self_, f = std::move(f)]() mutable noexcept { + self.run_(std::move(f)); + }; + } +public: + explicit constexpr executor( + operation_state_base& self, + const Executor& ex) noexcept + : self_(self), + ex_(ex) + {} + template + requires requires { + asio_impl::query( + std::declval(), + std::declval()); + } + constexpr decltype(auto) query(const Query& q) const noexcept { + return asio_impl::query(ex_, q); + } + template + requires requires { + asio_impl::prefer( + std::declval(), + std::declval()...); + } + constexpr decltype(auto) prefer(Args&&... args) const noexcept { + const auto ex = asio_impl::prefer(ex_, static_cast(args)...); + return executor< + Signatures, + Receiver, + std::remove_cvref_t>( + self_, + ex); + } + template + requires requires { + asio_impl::require( + std::declval(), + std::declval()...); + } + constexpr decltype(auto) require(Args&&... args) const noexcept { + const auto ex = asio_impl::require(ex_, static_cast(args)...); + return executor< + Signatures, + Receiver, + std::remove_cvref_t>( + self_, + ex); + } + template + void execute(T&& t) const noexcept { + self_.run_([&]() { + ex_.execute(wrap_(static_cast(t))); + }); + } + constexpr void on_work_started() const noexcept requires requires { + std::declval().on_work_started(); + } + { + ex_.on_work_started(); + } + constexpr void on_work_finished() const noexcept requires requires { + std::declval().on_work_finished(); + } + { + ex_.on_work_finished(); + } + template + requires requires { + std::declval().dispatch( + std::declval(), + std::declval()); + } + constexpr void dispatch(F&& f, const A& a) const noexcept { + self_.run_([&]() { + ex_.dispatch(wrap_(static_cast(f)), a); + }); + } + template + requires requires { + std::declval().post( + std::declval(), + std::declval()); + } + constexpr void post(F&& f, const A& a) const noexcept { + self_.run_([&]() { + ex_.post(wrap_(static_cast(f)), a); + }); + } + template + requires requires { + std::declval().defer( + std::declval(), + std::declval()); + } + constexpr void defer(F&& f, const A& a) const noexcept { + self_.run_([&]() { + ex_.defer(wrap_(static_cast(f)), a); + }); + } + constexpr bool operator==(const executor& rhs) const noexcept { + return (&self_ == &rhs.self_) && (ex_ == rhs.ex_); + } + bool operator!=(const executor& rhs) const = default; +}; + +} + +struct completion_token_t {}; +inline const completion_token_t completion_token{}; + +} + +namespace ASIOEXEC_ASIO_NAMESPACE { + +template +struct async_result<::asioexec::completion_token_t, Signatures...> { + template + static constexpr auto initiate( + Initiation&& i, + const ::asioexec::completion_token_t&, + Args&&... args) + { + return ::asioexec::detail::completion_token::sender< + ::asioexec::detail::completion_token::completion_signatures< + Signatures...>, + std::remove_cvref_t, + Args...>( + static_cast(i), + static_cast(args)...); + } +}; + +template +struct associated_executor< + ::asioexec::detail::completion_token::completion_handler< + Signatures, + Receiver>, + Executor> +{ + using type = ::asioexec::detail::completion_token::executor< + Signatures, + Receiver, + Executor>; + static type get( + const ::asioexec::detail::completion_token::completion_handler< + Signatures, + Receiver>& h, + const Executor& ex) noexcept + { + return type(h.state(), ex); + } +}; + +template + requires requires(const Receiver& r) { + ::stdexec::get_allocator(::stdexec::get_env(r)); + } +struct associated_allocator< + ::asioexec::detail::completion_token::completion_handler< + Signatures, + Receiver>, + Allocator> +{ + using type = std::remove_cvref_t< + decltype( + ::stdexec::get_allocator( + ::stdexec::get_env( + std::declval())))>; + static type get( + const ::asioexec::detail::completion_token::completion_handler< + Signatures, + Receiver>& h, + const Allocator&) noexcept + { + return ::stdexec::get_allocator( + ::stdexec::get_env( + h.state().r_)); + } +}; + +} diff --git a/include/asioexec/use_sender.hpp b/include/asioexec/use_sender.hpp new file mode 100644 index 000000000..7af4e90f1 --- /dev/null +++ b/include/asioexec/use_sender.hpp @@ -0,0 +1,220 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asioexec { + +namespace detail::use_sender { + +template +concept is_error_code = + std::is_same_v< + std::remove_cvref_t, + error_code> || + std::is_same_v< + std::remove_cvref_t, + std::error_code>; + +template +std::exception_ptr to_exception_ptr(T t) noexcept { + using exception_type = std::conditional_t< + std::is_same_v, + std::system_error, + system_error>; + try { + return std::make_exception_ptr(exception_type(static_cast(t))); + } catch (...) { + return std::current_exception(); + } +} + +template +struct receiver { + template + requires std::constructible_from + constexpr explicit receiver(T&& t) noexcept( + std::is_nothrow_constructible_v) + : r_(static_cast(t)) + {} + using receiver_concept = ::stdexec::receiver_t; + constexpr void set_stopped() && noexcept requires + ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures< + ::stdexec::set_stopped_t()>> + { + ::stdexec::set_stopped(static_cast(r_)); + } + constexpr void set_error(std::exception_ptr ex) && noexcept requires + ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures< + ::stdexec::set_error_t(std::exception_ptr)>> + { + ::stdexec::set_error(static_cast(r_), std::move(ex)); + } + template + requires + is_error_code && + ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures< + ::stdexec::set_value_t(Args...), + ::stdexec::set_error_t(std::exception_ptr), + ::stdexec::set_stopped_t()>> + constexpr void set_value(T&& t, Args&&... args) && noexcept { + if (!t) { + ::stdexec::set_value( + static_cast(r_), + static_cast(args)...); + return; + } + if ([&]() noexcept { + using type = std::remove_cvref_t; + if constexpr (std::is_same_v) { + if (t == asio_impl::error::operation_aborted) { + return true; + } + } + if constexpr (std::is_same_v>) { + return t == std::errc::operation_canceled; + } else { + return t == errc::operation_canceled; + } + }()) { + ::stdexec::set_stopped(static_cast(r_)); + return; + } + ::stdexec::set_error( + static_cast(r_), + use_sender::to_exception_ptr(static_cast(t))); + } + template + requires ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures< + ::stdexec::set_value_t(Args...)>> + constexpr void set_value(Args&&... args) && noexcept { + ::stdexec::set_value( + static_cast(r_), + static_cast(args)...); + } + constexpr decltype(auto) get_env() const noexcept { + return ::stdexec::get_env(r_); + } +private: + Receiver r_; +}; + +template +struct transform_set_value { + using type = ::stdexec::completion_signatures< + ::stdexec::set_value_t(Args...)>; +}; +template + requires is_error_code +struct transform_set_value { + using type = ::stdexec::completion_signatures< + ::stdexec::set_value_t(Args...)>; +}; + +template +using transform_set_value_t = typename transform_set_value::type; + +template +using completion_signatures = ::stdexec::transform_completion_signatures< + Signatures, + ::stdexec::completion_signatures<>, + transform_set_value_t>; + +template +struct sender { + template + requires std::constructible_from + constexpr explicit sender(T&& t) noexcept( + std::is_nothrow_constructible_v) + : s_(static_cast(t)) + {} + using sender_concept = ::stdexec::sender_t; + template + consteval completion_signatures< + ::stdexec::completion_signatures_of_t< + Sender, + Env>> get_completion_signatures(const Env&) && noexcept + { + return {}; + } + template + consteval completion_signatures< + ::stdexec::completion_signatures_of_t< + const Sender&, + Env>> get_completion_signatures(const Env&) const& noexcept + { + return {}; + } + template + requires ::stdexec::sender_to> + constexpr auto connect(Receiver r) const& noexcept( + std::is_nothrow_constructible_v, Receiver> && + noexcept(::stdexec::connect( + std::declval(), + std::declval>()))) + { + return ::stdexec::connect( + s_, + receiver(static_cast(r))); + } + template + requires ::stdexec::sender_to> + constexpr auto connect(Receiver r) && noexcept( + std::is_nothrow_constructible_v, Receiver> && + noexcept(::stdexec::connect( + std::declval(), + std::declval>()))) + { + return ::stdexec::connect( + static_cast(s_), + receiver(static_cast(r))); + } +private: + Sender s_; +}; + +template +explicit sender(Sender) -> sender; + +} + +struct use_sender_t {}; +inline const use_sender_t use_sender{}; + +} + +namespace ASIOEXEC_ASIO_NAMESPACE { + +template +struct async_result<::asioexec::use_sender_t, Signatures...> { + template + static constexpr auto initiate( + Initiation&& i, + const ::asioexec::use_sender_t&, + Args&&... args) + { + return ::asioexec::detail::use_sender::sender( + async_result< + ::asioexec::completion_token_t, + Signatures...>::initiate( + static_cast(i), + ::asioexec::completion_token, + static_cast(args)...)); + } +}; + +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0f57eb4f8..2e243e9f5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -104,6 +104,10 @@ if(STDEXEC_ENABLE_CUDA) add_subdirectory(nvexec) endif() +if(STDEXEC_ENABLE_ASIO) + add_subdirectory(asioexec) +endif() + # build failure tests: tests which parse a source file for an expected error # message icm_add_build_failure_test( diff --git a/test/asioexec/CMakeLists.txt b/test/asioexec/CMakeLists.txt new file mode 100644 index 000000000..199f9417b --- /dev/null +++ b/test/asioexec/CMakeLists.txt @@ -0,0 +1,18 @@ +set(asioexec_test_sources + ../test_main.cpp + test_completion_token.cpp + test_use_sender.cpp + ) + +add_executable(test.asioexec ${asioexec_test_sources}) +target_link_libraries(test.asioexec + PUBLIC + STDEXEC::stdexec + $ + $ + stdexec_executable_flags + Catch2::Catch2 + PRIVATE + common_test_settings) + +catch_discover_tests(test.asioexec) diff --git a/test/asioexec/test_completion_token.cpp b/test/asioexec/test_completion_token.cpp new file mode 100644 index 000000000..0c22fed2d --- /dev/null +++ b/test/asioexec/test_completion_token.cpp @@ -0,0 +1,585 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace stdexec; +using namespace asioexec; + +namespace { + + TEST_CASE("Asio-based asynchronous operation ends with error when canceled", + "[asioexec][completion_token]") + { + inplace_stop_source source; + const struct { + auto query(const get_stop_token_t&) const noexcept { + return source_.get_token(); + } + inplace_stop_source& source_; + } e{source}; + error_code err; + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = t.async_wait(completion_token); + static_assert( + set_equivalent< + completion_signatures_of_t>, + completion_signatures< + set_stopped_t(), + set_error_t(std::exception_ptr), + set_value_t(error_code)>>); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect( + std::move(sender), + expect_value_receiver_ex(e, err)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + source.request_stop(); + start(op); + CHECK(!err); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(err == asio_impl::error::operation_aborted); + } + + TEST_CASE("Posting may be redirected to a sender", + "[asioexec][completion_token]") + { + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = asio_impl::post(ctx.get_executor(), completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect( + std::move(sender), + expect_value_receiver<>{}); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + } + + template + decltype(auto) async_throw_from_initiation(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [](const auto&) { + throw std::logic_error("Test"); + }, + token); + } + + TEST_CASE("When initiating the asynchronous operation throws this causes the " + "sender to send a std::exception_ptr", "[asioexec][completion_token]") + { + std::exception_ptr ex; + auto op = ::stdexec::connect( + async_throw_from_initiation(completion_token), + expect_error_receiver_ex(ex)); + CHECK(!ex); + ::stdexec::start(op); + CHECK(ex); + } + + template + decltype(auto) async_throw_from_completion( + const Executor& ex, + CompletionToken&& token) + { + using signature_type = void(); + return asio_impl::async_initiate( + [ex](auto h) { + const auto assoc = asio_impl::get_associated_executor(h, ex); + asio_impl::post( + ex, + asio_impl::bind_executor( + assoc, + [h = std::move(h)]() { throw std::logic_error("Test"); })); + }, + token); + } + + TEST_CASE("When a completion handler invoked as part of a composed " + "asynchronous operation throws that exception is captured and sent as a " + "std::exception_ptr", "[asioexec][completion_token]") + { + std::exception_ptr ex; + asio_impl::io_context ctx; + auto sender = async_throw_from_completion( + ctx.get_executor(), + completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + // Allows us to destroy the operation state in response to completion + // signal to ensure things are well behaved when the completion signal + // results in the destruction of the operation state + std::shared_ptr ptr; + struct : expect_error_receiver_ex { + void set_error(std::exception_ptr ex) && noexcept { + CHECK(ptr_); + ptr_.reset(); + expect_error_receiver_ex::set_error(std::move(ex)); + } + std::shared_ptr& ptr_; + } r{expect_error_receiver_ex(ex), ptr}; + using operation_state_type = connect_result_t< + decltype(sender), + decltype(r)>; + struct { + operator operation_state_type() { + return ::stdexec::connect(std::move(s_), std::move(r_)); + } + decltype(sender)& s_; + decltype(r)& r_; + } elide{sender, r}; + auto typed = std::make_shared(std::move(elide)); + auto&& op = *typed; + ptr = typed; + typed.reset(); + CHECK(ptr.use_count() == 1U); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ex); + start(op); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(ex); + } + + TEST_CASE("When an operation is abandoned this is reported via a stopped " + "signal", "[asioexec][completion_token]") + { + bool stopped = false; + expect_stopped_receiver_ex r(stopped); + using sender_type = decltype( + std::declval().async_wait(completion_token)); + using operation_state_type = connect_result_t; + std::optional op; + { + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + struct { + operator operation_state_type() { + return ::stdexec::connect(std::move(s_), std::move(r_)); + } + sender_type s_; + decltype(r) r_; + } elide{t.async_wait(completion_token), std::move(r)}; + op.emplace(std::move(elide)); + start(*op); + CHECK(!stopped); + } + CHECK(stopped); + } + + template + decltype(auto) async_indirect_completion_handler( + const Executor& ex, + std::shared_ptr& ptr, + CompletionToken&& token) + { + using signature_type = void(); + return asio_impl::async_initiate( + [&ptr](auto h, const auto& ex) { + auto local = std::make_shared(std::move(h)); + const auto assoc = asio_impl::get_associated_executor(*local, ex); + ptr = local; + asio_impl::post( + ex, + asio_impl::bind_executor( + assoc, + [local]() mutable { + std::invoke(std::move(*local)); + local.reset(); + })); + }, + token, + ex); + } + + TEST_CASE("If the completion handler outlives completion of the operation " + "the receiver contract is still satisfied eagerly", + "[asioexec][completion_token]") + { + bool invoked = false; + std::shared_ptr ptr; + asio_impl::io_context ctx; + auto sender = async_indirect_completion_handler( + ctx.get_executor(), + ptr, + completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + { + auto op = ::stdexec::connect( + std::move(sender), + expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + start(op); + CHECK(ptr); + CHECK(!invoked); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(invoked); + } + CHECK(ptr.use_count() == 1U); + } + + template + decltype(auto) async_indirect_completion_handler_throw_from_completion( + const Executor& ex, + std::shared_ptr& ptr, + CompletionToken&& token) + { + using signature_type = void(); + return asio_impl::async_initiate( + [&ptr](auto h, const auto& ex) { + auto local = std::make_shared(std::move(h)); + const auto assoc = asio_impl::get_associated_executor(*local, ex); + ptr = local; + asio_impl::post( + ex, + asio_impl::bind_executor( + assoc, + [local]() mutable { + local.reset(); + throw std::logic_error("Test"); + })); + }, + token, + ex); + } + + TEST_CASE("If the completion handler outlives completion of the operation " + "satisfaction of the receiver contract is deferred until the end of the " + "completion handler's lifetime (this is necessary in situations where " + "asynchronous control flow bifurcates and one of the child operations ends " + "via exception)", "[asioexec][completion_token]") + { + std::exception_ptr ex; + std::shared_ptr ptr; + asio_impl::io_context ctx; + auto sender = async_indirect_completion_handler_throw_from_completion( + ctx.get_executor(), + ptr, + completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + auto op = ::stdexec::connect( + std::move(sender), + expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + start(op); + CHECK(ptr); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(!ex); + CHECK(ptr.use_count() == 1U); + ptr.reset(); + CHECK(ex); + } + + template + decltype(auto) async_multishot(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [](auto&& h) { + std::forward(h)(); + }, + token); + } + + TEST_CASE("When appropriate the yielded sender is multi-shot", + "[asioexec][completion_token]") + { + const auto sender = async_multishot(completion_token); + auto a = ::stdexec::connect(sender, expect_void_receiver{}); + auto b = ::stdexec::connect(sender, expect_void_receiver{}); + start(a); + start(b); + } + + template + decltype(auto) async_single_shot(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [ptr = std::make_unique(5)](auto&& h) { + std::forward(h)(); + }, + token); + } + + TEST_CASE("When appropriate the yielded sender is single shot", + "[asioexec][completion_token]") + { + auto sender = async_single_shot(completion_token); + static_assert( + !::stdexec::sender_to< + const decltype(sender)&, + expect_void_receiver<>>); + auto op = ::stdexec::connect(std::move(sender), expect_void_receiver{}); + start(op); + } + + TEST_CASE("When an operation is abandoned by the initiating function " + "set_stopped is sent immediately", "[asioexec][completion_token]") + { + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [](auto&&) noexcept {}, + token); + }; + auto op = ::stdexec::connect( + initiating_function(completion_token), + expect_stopped_receiver{}); + start(op); + } + + class thread { + asio_impl::io_context ctx_; + asio_impl::executor_work_guard g_; + std::thread t_; + public: + thread() + : g_(ctx_.get_executor()), + t_([&]() noexcept { + try { + ctx_.run(); + } catch (...) { + FAIL("Exception thrown in background thread"); + } + }) + {} + ~thread() noexcept { + g_.reset(); + if (t_.joinable()) { + t_.join(); + } + } + void join() noexcept { + g_.reset(); + t_.join(); + } + asio_impl::io_context& context() noexcept { + return ctx_; + } + }; + + template + struct ping_pong { + explicit ping_pong( + asio_impl::io_context& a, + asio_impl::io_context& b, + CompletionHandler h) + : a_(a), + a_g_(a.get_executor()), + b_(b), + b_g_(b.get_executor()), + h_(std::move(h)) + {} + void operator()() && { + if (i_ == 10000) { + a_g_.reset(); + b_g_.reset(); + std::invoke(std::move(h_)); + return; + } + const auto ex = [&]() noexcept { + if (i_ % 2) { + return a_.get_executor(); + } + return b_.get_executor(); + }(); + ++i_; + asio_impl::post(ex, std::move(*this)); + } + asio_impl::io_context& a_; + asio_impl::executor_work_guard a_g_; + asio_impl::io_context& b_; + asio_impl::executor_work_guard b_g_; + CompletionHandler h_; + std::size_t i_{0}; + }; + + TEST_CASE("Intermediate completion handlers being passed between threads has " + "no effect on the transformation of an initiating function into a sender", + "[asioexec][completion_token]") + { + thread a; + thread b; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&a, &b](auto&& h) { + ping_pong>{ + a.context(), + b.context(), + std::forward(h)}(); + }, + token); + }; + auto op = ::stdexec::connect( + initiating_function(completion_token), + expect_void_receiver{}); + start(op); + a.join(); + b.join(); + } + + TEST_CASE("When the initiating function posts and then throws, and the " + "posted operation simply abandons the completion handler, the operation " + "completes after the post with the thrown error", + "[asioexec][completion_token]") + { + std::exception_ptr ex; + asio_impl::io_context ctx; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&](auto&& h) { + asio_impl::post(ctx.get_executor(), [h = std::move(h)]() noexcept {}); + throw std::logic_error("Test"); + }, + token); + }; + auto op = ::stdexec::connect( + initiating_function(completion_token), + expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!ex); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(ex); + } + + TEST_CASE("When there are two parallel paths of asynchronous execution, and " + "one of them throws an exception, and the other is abandoned, the " + "operation completes with the thrown exception", + "[asioexec][completion_token]") + { + for (unsigned u = 0; u < 2; ++u) { + std::exception_ptr ex; + asio_impl::io_context ctx; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&](auto&& h) { + auto ptr = std::make_shared>( + std::forward(h)); + const auto ex = asio_impl::get_associated_executor( + *ptr, + ctx.get_executor()); + if (u) { + asio_impl::post( + ex, + [ptr]() { + throw std::logic_error("Test"); + }); + asio_impl::post(ex, [ptr = std::move(ptr)]() noexcept {}); + } else { + asio_impl::post(ex, [ptr]() noexcept {}); + asio_impl::post( + ex, + [ptr = std::move(ptr)]() { + throw std::logic_error("Test"); + }); + } + }, + token); + }; + auto op = ::stdexec::connect( + initiating_function(completion_token), + expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!ex); + CHECK(ctx.poll_one()); + CHECK(!ctx.stopped()); + CHECK(!ex); + CHECK(ctx.poll_one()); + CHECK(ctx.stopped()); + CHECK(ex); + } + } + + TEST_CASE("When the Asio signature suggests the operation will send an " + "rvalue, but a const lvalue is sent, and the decay-copy throws, that " + "exception is sent as an error in completing the operation", + "[asioexec][completion_token]") + { + class obj { + bool throw_; + public: + explicit obj(bool t = false) noexcept : throw_(t) {} + obj(obj&&) = default; + obj(const obj& other) : throw_(other.throw_) { + if (throw_) { + throw std::logic_error("Test"); + } + } + obj& operator=(const obj&) = delete; + bool operator==(const obj& rhs) const noexcept { + return throw_ == rhs.throw_; + } + }; + const obj expected; + std::exception_ptr ex; + const auto initiating_function = [](obj o, auto&& token) { + return asio_impl::async_initiate( + [o = std::move(o)](auto&& h) { + // Lambda isn't mutable and there's no std::move so o's type is + // const obj& + std::invoke(std::forward(h), o); + }, + token); + }; + auto a = ::stdexec::connect( + initiating_function(obj{}, completion_token), + expect_value_receiver(expected)); + auto b = ::stdexec::connect( + initiating_function(obj(true), completion_token), + expect_error_receiver_ex(ex)); + start(a); + CHECK(!ex); + start(b); + CHECK(ex); + } + +} // namespace diff --git a/test/asioexec/test_use_sender.cpp b/test/asioexec/test_use_sender.cpp new file mode 100644 index 000000000..55fc311ad --- /dev/null +++ b/test/asioexec/test_use_sender.cpp @@ -0,0 +1,243 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace stdexec; +using namespace asioexec; + +namespace { + + static_assert( + set_equivalent< + detail::use_sender::completion_signatures< + completion_signatures< + set_value_t(std::error_code), + set_stopped_t(), + set_error_t(std::exception_ptr)>>, + completion_signatures< + set_value_t(), + set_stopped_t(), + set_error_t(std::exception_ptr)>>); + static_assert( + set_equivalent< + detail::use_sender::completion_signatures< + completion_signatures< + set_value_t(error_code), + set_stopped_t(), + set_error_t(std::exception_ptr)>>, + completion_signatures< + set_value_t(), + set_stopped_t(), + set_error_t(std::exception_ptr)>>); + static_assert( + set_equivalent< + detail::use_sender::completion_signatures< + completion_signatures< + set_value_t(error_code, int), + set_value_t(int), + set_stopped_t(), + set_error_t(std::exception_ptr)>>, + completion_signatures< + set_value_t(int), + set_stopped_t(), + set_error_t(std::exception_ptr)>>); + + TEST_CASE("Asio-based asynchronous operation ends with set_stopped when " + "cancellation occurs", "[asioexec][use_sender]") + { + bool stopped = false; + inplace_stop_source source; + const struct { + auto query(const get_stop_token_t&) const noexcept { + return source_.get_token(); + } + inplace_stop_source& source_; + } e{source}; + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = t.async_wait(use_sender); + static_assert( + set_equivalent< + ::stdexec::completion_signatures_of_t< + decltype(sender), + ::stdexec::env<>>, + completion_signatures< + set_value_t(), + set_stopped_t(), + set_error_t(std::exception_ptr)>>); + static_assert( + set_equivalent< + ::stdexec::completion_signatures_of_t< + const decltype(sender)&, + ::stdexec::env<>>, + completion_signatures< + set_value_t(), + set_stopped_t(), + set_error_t(std::exception_ptr)>>); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect( + std::move(sender), + expect_stopped_receiver_ex(e, stopped)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + source.request_stop(); + start(op); + CHECK(!stopped); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(stopped); + } + + TEST_CASE("std::errc::operation_canceled causes the operation to end with " + "set_stopped", "[asioexec][use_sender]") + { + const auto initiating_function = [](auto&& token) { + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke( + std::forward(h), + make_error_code(std::errc::operation_canceled)); + }, + token); + }; + bool stopped = false; + auto op = ::stdexec::connect( + initiating_function(use_sender), + expect_stopped_receiver_ex(stopped)); + CHECK(!stopped); + start(op); + CHECK(stopped); + } + + TEST_CASE("errc::operation_canceled (note in the case of Boost.Asio this is " + "boost::system::errc::operation_canceled) causes the operation to end with " + "set_stopped", "[asioexec][use_sender]") + { + const auto initiating_function = [](auto&& token) { + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke( + std::forward(h), + make_error_code(errc::operation_canceled)); + }, + token); + }; + bool stopped = false; + auto op = ::stdexec::connect( + initiating_function(use_sender), + expect_stopped_receiver_ex(stopped)); + CHECK(!stopped); + start(op); + CHECK(stopped); + } + + TEST_CASE("When an Asio-based asynchronous operation which could fail " + "completes successfully the success is reported via a value completion " + "signal", "[asioexec][use_sender]") + { + bool invoked = false; + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::milliseconds(1)); + auto sender = t.async_wait(use_sender); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect( + std::move(sender), + expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!invoked); + CHECK(ctx.run()); + CHECK(invoked); + } + + TEST_CASE("Post works with use_sender", "[asioexec][use_sender]") { + bool invoked = false; + asio_impl::io_context ctx; + auto sender = asio_impl::post(ctx, use_sender); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect( + std::move(sender), + expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!invoked); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(invoked); + } + + template + decltype(auto) async_error_code(CompletionToken&& token) { + using signature_type = void(error_code); + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke( + std::forward(h), + error_code(make_error_code(std::errc::not_enough_memory))); + }, + token); + } + + TEST_CASE("Error codes native to the version of Asio used are transformed " + "into a system_error", "[asioexec][use_sender]") + { + std::exception_ptr ex; + auto op = ::stdexec::connect( + async_error_code(use_sender), + expect_error_receiver_ex(ex)); + CHECK(!ex); + start(op); + REQUIRE(ex); + CHECK_THROWS_AS(std::rethrow_exception(std::move(ex)), system_error); + } + + template + decltype(auto) async_std_error_code(CompletionToken&& token) { + using signature_type = void(std::error_code); + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke( + std::forward(h), + make_error_code(std::errc::not_enough_memory)); + }, + token); + } + + TEST_CASE("Standard error codes are transformed into a std::system_error " + "(note that in the case of standalone Asio the error code native to that " + "version of Asio a std::error_code are the same and therefore this is a " + "duplicate of another test)", "[asioexec][use_sender]") + { + std::exception_ptr ex; + auto op = ::stdexec::connect( + async_std_error_code(use_sender), + expect_error_receiver_ex(ex)); + CHECK(!ex); + start(op); + REQUIRE(ex); + CHECK_THROWS_AS(std::rethrow_exception(std::move(ex)), std::system_error); + } + +} // namespace