diff --git a/.github/workflows/ci.cpu.yml b/.github/workflows/ci.cpu.yml index 48e063dbd..7bae17755 100644 --- a/.github/workflows/ci.cpu.yml +++ b/.github/workflows/ci.cpu.yml @@ -67,6 +67,8 @@ jobs: -DCMAKE_BUILD_TYPE=${{ matrix.build }} \ -DCMAKE_CXX_FLAGS="${{ matrix.cxxflags }}" \ -DSTDEXEC_ENABLE_TBB:BOOL=${{ !contains(matrix.cxxflags, '-fsanitize') }} \ + -DSTDEXEC_ENABLE_ASIO:BOOL=TRUE \ + -DSTDEXEC_ASIO_IMPLEMENTATION:STRING=boost \ ; # Compile @@ -148,6 +150,8 @@ jobs: cmake -S. -Bbuild -GNinja \ -DCMAKE_BUILD_TYPE=${{ matrix.build }} \ -DCMAKE_CXX_COMPILER=${{ matrix.compiler }} \ + -DSTDEXEC_ENABLE_ASIO:BOOL=TRUE \ + -DSTDEXEC_ASIO_IMPLEMENTATION:STRING=boost \ -DCMAKE_CXX_STANDARD=20 cmake --build build/ -v cd build diff --git a/.github/workflows/test-windows.ps1 b/.github/workflows/test-windows.ps1 index d940a7338..007cde49f 100644 --- a/.github/workflows/test-windows.ps1 +++ b/.github/workflows/test-windows.ps1 @@ -19,6 +19,10 @@ if (Test-Path -PathType Container $BuildDirectory) { } New-Item -ItemType Directory $BuildDirectory | Out-Null -Invoke-NativeCommand cmake -B $BuildDirectory -G Ninja "-DCMAKE_BUILD_TYPE=$Config" "-DCMAKE_MSVC_DEBUG_INFORMATION_FORMAT:STRING=Embedded" . +Invoke-NativeCommand cmake -B $BuildDirectory -G Ninja ` + "-DCMAKE_BUILD_TYPE=$Config" ` + "-DCMAKE_MSVC_DEBUG_INFORMATION_FORMAT:STRING=Embedded" ` + "-DSTDEXEC_ENABLE_ASIO:BOOL=TRUE" ` + "-DSTDEXEC_ASIO_IMPLEMENTATION:STRING=boost" . Invoke-NativeCommand cmake --build $BuildDirectory Invoke-NativeCommand ctest --test-dir $BuildDirectory diff --git a/.gitignore b/.gitignore index 46ecf5658..1e75f12a8 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ callgrind.* a.out *.code-workspace sanitizer-ignorelist.txt +/include/asioexec/asio_config.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index aed43a485..f1f629bc8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -433,6 +433,13 @@ if(STDEXEC_ENABLE_ASIO) set(STDEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/execpools/asio) configure_file(include/execpools/asio/asio_config.hpp.in ${STDEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp) + set(ASIOEXEC_USES_BOOST ${STDEXEC_ASIO_USES_BOOST}) + set(ASIOEXEC_USES_STANDALONE ${STDEXEC_ASIO_USES_STANDALONE}) + + file(GLOB_RECURSE asioexec_sources include/asioexec/*.hpp) + set(ASIOEXEC_ASIO_CONFIG_FILE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/include/asioexec) + configure_file(include/asioexec/asio_config.hpp.in ${ASIOEXEC_ASIO_CONFIG_FILE_PATH}/asio_config.hpp) + if(${STDEXEC_ASIO_USES_BOOST}) set(BOOST_ENABLE_COMPATIBILITY_TARGETS TRUE) rapids_cpm_find(Boost 1.86.0 @@ -449,6 +456,16 @@ if(STDEXEC_ENABLE_ASIO) STDEXEC::stdexec Boost::boost ) + + add_library(asioexec_boost INTERFACE ${asioexec_sources}) + list(APPEND stdexec_export_targets asioexec_boost) + add_library(STDEXEC::asioexec_boost ALIAS asioexec_boost) + + target_link_libraries(asioexec_boost + INTERFACE + STDEXEC::stdexec + Boost::boost + ) elseif(${STDEXEC_ASIO_USES_STANDALONE}) include(cmake/import_standalone_asio.cmake) import_standalone_asio( @@ -464,6 +481,16 @@ if(STDEXEC_ENABLE_ASIO) STDEXEC::stdexec asio ) + + add_library(asioexec_asio INTERFACE ${asioexec_sources}) + list(APPEND stdexec_export_targets asioexec_asio) + add_library(STDEXEC::asioexec_asio ALIAS asioexec_asio) + + target_link_libraries(asioexec_asio + INTERFACE + STDEXEC::stdexec + asio + ) else() message(FATAL_ERROR "ASIO implementation is not configured") endif() diff --git a/include/asioexec/asio_config.hpp.in b/include/asioexec/asio_config.hpp.in new file mode 100644 index 000000000..7b2b657db --- /dev/null +++ b/include/asioexec/asio_config.hpp.in @@ -0,0 +1,50 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#cmakedefine01 ASIOEXEC_USES_STANDALONE +#cmakedefine01 ASIOEXEC_USES_BOOST + +#if ASIOEXEC_USES_BOOST +# include +# include +# include +# include +# define ASIOEXEC_ASIO_NAMESPACE boost::asio +#elif ASIOEXEC_USES_STANDALONE +# include +# include +# define ASIOEXEC_ASIO_NAMESPACE asio +#endif + +namespace asioexec { +#if ASIOEXEC_USES_BOOST + namespace asio_impl = ::boost::asio; + using error_code = ::boost::system::error_code; + using error_condition = ::boost::system::error_condition; + namespace errc = ::boost::system::errc; + using system_error = ::boost::system::system_error; +#elif ASIOEXEC_USES_STANDALONE + namespace asio_impl = ::asio; + using error_code = std::error_code; + using error_condition = std::error_condition; + using errc = std::errc; + using system_error = std::system_error; +#endif +} diff --git a/include/asioexec/completion_token.hpp b/include/asioexec/completion_token.hpp new file mode 100644 index 000000000..8a96d4ec6 --- /dev/null +++ b/include/asioexec/completion_token.hpp @@ -0,0 +1,536 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asioexec { + + namespace detail::completion_token { + + // The machinery from here down through completion_token::set_value is to + // work around the fact that Asio allows operations to declare signatures which + // aren't reflective of the actual cv- & ref-qualifications with which the + // values will actually be sent. This means that one or more conversions may be + // required to actually send the values correctly which may throw and which + // therefore must occur before the call to ::stdexec::set_value. + // + // The technique used to achieve this is to utilize an unevaluated call against + // overload_set to determine the matching Asio signature. Then + // completion_token::convert is used to transform the values actually sent by + // the Asio operation so that they match the selected completion signature + // (this may throw). The converted values may then be sent to the receiver with + // no possibility of throwing. + template + struct has_function_call_operator { + struct type { }; + + std::tuple<> operator()(type) const; + }; + + template + struct has_function_call_operator<::stdexec::set_value_t(Args...)> { + std::tuple operator()(Args...) const; + }; + + template + struct overload_set; + + template + struct overload_set<::stdexec::completion_signatures> + : has_function_call_operator... { + using has_function_call_operator::operator()...; + }; + + template + requires + std::is_same_v || + std::is_convertible_v + constexpr T&& convert(U&& u) noexcept { + return static_cast(u); + } + + template + constexpr std::remove_cvref_t convert(U&& u) { + return static_cast(u); + } + + template + constexpr void set_value_impl(Receiver&& r, std::index_sequence, Args&&... args) { + ::stdexec::set_value( + static_cast(r), + completion_token::convert>(static_cast(args))...); + } + + template + constexpr void set_value(Receiver&& r, Args&&... args) { + using tuple = decltype(std::declval>()(std::declval()...)); + completion_token::set_value_impl( + static_cast(r), + std::make_index_sequence>{}, + static_cast(args)...); + } + + template + struct signature; + + template + struct signature { + using type = ::stdexec::set_value_t(Args...); + }; + + template + using completion_signatures = ::stdexec::completion_signatures< + typename signature::type..., + ::stdexec::set_error_t(std::exception_ptr), + ::stdexec::set_stopped_t()>; + + struct stop_callback { + constexpr explicit stop_callback(asio_impl::cancellation_signal& signal) noexcept + : signal_(signal) { + } + + void operator()() && noexcept { + signal_.emit(asio_impl::cancellation_type::partial); + } + private: + asio_impl::cancellation_signal& signal_; + }; + + template + class completion_handler; + + template + struct operation_state_base { + class frame_; + + template + requires std::constructible_from + explicit operation_state_base(T&& t) noexcept( + std::is_nothrow_constructible_v&& + std::is_nothrow_default_constructible_v) + : r_(static_cast(t)) { + } + + Receiver r_; + asio_impl::cancellation_signal signal_; + std::recursive_mutex m_; + frame_* frames_{nullptr}; + std::exception_ptr ex_; + completion_handler* h_{nullptr}; + + class frame_ { + operation_state_base& self_; + std::unique_lock l_; + frame_* prev_; + public: + explicit frame_(operation_state_base& self) noexcept + : self_(self) + , l_(self.m_) + , prev_(self.frames_) { + self.frames_ = this; + } + + frame_(const frame_&) = delete; + + ~frame_() noexcept { + if (l_) { + STDEXEC_ASSERT(self_.frames_ == this); + self_.frames_ = prev_; + if (!self_.frames_ && !self_.h_) { + // We are the last frame and the handler is gone so it's up to us to + // finalize the operation + l_.unlock(); + if (self_.ex_) { + ::stdexec::set_error(static_cast(self_.r_), std::move(self_.ex_)); + } else { + ::stdexec::set_stopped(static_cast(self_.r_)); + } + } + } + } + + explicit operator bool() const noexcept { + return bool(l_); + } + + void release() noexcept { + auto ptr = this; + do { + STDEXEC_ASSERT(ptr->l_); + STDEXEC_ASSERT(self_.frames_ == ptr); + ptr = ptr->prev_; + self_.frames_->l_.unlock(); + self_.frames_->prev_ = nullptr; + self_.frames_ = ptr; + } while (ptr); + } + }; + + template + void run_(F&& f) noexcept { + const frame_ frame(*this); + try { + static_cast(f)(); + } catch (...) { + STDEXEC_ASSERT(frame); + // Do not overwrite the first exception encountered + if (!ex_) { + ex_ = std::current_exception(); + } + } + } + protected: + std::optional<::stdexec::stop_callback_for_t< + ::stdexec::stop_token_of_t<::stdexec::env_of_t>, + stop_callback>> + callback_; + }; + + template + class completion_handler { + operation_state_base* self_; + public: + explicit completion_handler(operation_state_base& self) noexcept + : self_(&self) { + STDEXEC_ASSERT(!self_->h_); + self_->h_ = this; + } + + completion_handler(completion_handler&& other) noexcept + : self_(std::exchange(other.self_, nullptr)) { + if (self_) { + const std::lock_guard l(self_->m_); + self_->h_ = this; + } + } + + completion_handler& operator=(const completion_handler&) = delete; + + ~completion_handler() noexcept { + if (self_) { + // When this goes out of scope it might send set stopped or set error, or + // it might defer that to the executor frames above us on the call stack + // (if any) + const typename operation_state_base::frame_ frame(*self_); + self_->h_ = nullptr; + } + } + + template + void operator()(Args&&... args) && noexcept { + STDEXEC_ASSERT(self_); + { + const std::lock_guard l(self_->m_); + if (self_->frames_) { + self_->frames_->release(); + } + STDEXEC_ASSERT(!self_->frames_); + } + if (self_->ex_) { + ::stdexec::set_error(static_cast(self_->r_), std::move(self_->ex_)); + } else { + try { + completion_token::set_value( + static_cast(self_->r_), static_cast(args)...); + } catch (...) { + ::stdexec::set_error(static_cast(self_->r_), std::current_exception()); + } + } + // Makes destructor a no op, the operation is complete so there's nothing + // more to do + self_ = nullptr; + } + + using cancellation_slot_type = asio_impl::cancellation_slot; + + auto get_cancellation_slot() const noexcept { + STDEXEC_ASSERT(self_); + return self_->signal_.slot(); + } + + operation_state_base& state() const noexcept { + STDEXEC_ASSERT(self_); + return *self_; + } + }; + + template + class operation_state : operation_state_base { + using base_ = operation_state_base; + Initiation init_; + Args args_; + public: + template + requires std::constructible_from && std::constructible_from + && std::constructible_from + explicit operation_state(T&& t, U&& u, V&& v) noexcept( + std::is_nothrow_constructible_v&& std::is_nothrow_constructible_v&& + std::is_nothrow_constructible_v) + : base_(static_cast(t)) + , init_(static_cast(u)) + , args_(static_cast(v)) { + } + + void start() & noexcept { + const typename base_::frame_ frame(*this); + try { + std::apply( + [&](auto&&... args) { + std::invoke( + static_cast(init_), + completion_handler(*this), + static_cast(args)...); + }, + std::move(args_)); + } catch (...) { + if (!base_::ex_) { + base_::ex_ = std::current_exception(); + } + return; + } + // In the case of an immediate completion *this may already be outside its + // lifetime so we can't proceed into the branch + if (frame) { + base_::callback_.emplace( + ::stdexec::get_stop_token(::stdexec::get_env(base_::r_)), + stop_callback(base_::signal_)); + } + } + }; + + template + class sender { + using args_type_ = std::tuple...>; + public: + using sender_concept = ::stdexec::sender_t; + + template + requires std::constructible_from + && std::constructible_from + explicit constexpr sender(T&& t, Us&&... us) noexcept( + std::is_nothrow_constructible_v&& + std::is_nothrow_constructible_v) + : init_(static_cast(t)) + , args_(static_cast(us)...) { + } + + template + requires std::is_copy_constructible_v + && std::is_copy_constructible_v + consteval Signatures get_completion_signatures(const Env&) const & noexcept { + return {}; + } + + template + requires std::is_move_constructible_v + && std::is_move_constructible_v + consteval Signatures get_completion_signatures(const Env&) && noexcept { + return {}; + } + + template + requires ::stdexec::receiver_of< + std::remove_cvref_t, + ::stdexec::completion_signatures_of_t>> + constexpr auto connect(Receiver&& receiver) const & noexcept( + std::is_nothrow_constructible_v< + operation_state, Initiation, args_type_>, + Receiver, + const Initiation&, + const args_type_&>) { + return operation_state, Initiation, args_type_>( + static_cast(receiver), init_, args_); + } + + template + requires ::stdexec::receiver_of< + std::remove_cvref_t, + ::stdexec::completion_signatures_of_t>> + constexpr auto connect(Receiver&& receiver) && noexcept( + std::is_nothrow_constructible_v< + operation_state, Initiation, args_type_>, + Receiver, + Initiation, + args_type_>) { + return operation_state, Initiation, args_type_>( + static_cast(receiver), + static_cast(init_), + static_cast(args_)); + } + private: + Initiation init_; + args_type_ args_; + }; + + template + class executor { + operation_state_base& self_; + Executor ex_; + + template + constexpr auto wrap_(F f) const noexcept(std::is_nothrow_move_constructible_v) { + return [&self = self_, f = std::move(f)]() mutable noexcept { + self.run_(std::move(f)); + }; + } + public: + explicit constexpr executor( + operation_state_base& self, + const Executor& ex) noexcept + : self_(self) + , ex_(ex) { + } + + template + requires requires { + asio_impl::query(std::declval(), std::declval()); + } + constexpr decltype(auto) query(const Query& q) const noexcept { + return asio_impl::query(ex_, q); + } + + template + requires requires { + asio_impl::prefer(std::declval(), std::declval()...); + } + constexpr decltype(auto) prefer(Args&&... args) const noexcept { + const auto ex = asio_impl::prefer(ex_, static_cast(args)...); + return executor>(self_, ex); + } + + template + requires requires { + asio_impl::require(std::declval(), std::declval()...); + } + constexpr decltype(auto) require(Args&&... args) const noexcept { + const auto ex = asio_impl::require(ex_, static_cast(args)...); + return executor>(self_, ex); + } + + template + void execute(T&& t) const noexcept { + self_.run_([&]() { ex_.execute(wrap_(static_cast(t))); }); + } + + constexpr void on_work_started() const noexcept + requires requires { std::declval().on_work_started(); } + { + ex_.on_work_started(); + } + + constexpr void on_work_finished() const noexcept + requires requires { std::declval().on_work_finished(); } + { + ex_.on_work_finished(); + } + + template + requires requires { + std::declval().dispatch( + std::declval(), std::declval()); + } + constexpr void dispatch(F&& f, const A& a) const noexcept { + self_.run_([&]() { ex_.dispatch(wrap_(static_cast(f)), a); }); + } + + template + requires requires { + std::declval().post(std::declval(), std::declval()); + } + constexpr void post(F&& f, const A& a) const noexcept { + self_.run_([&]() { ex_.post(wrap_(static_cast(f)), a); }); + } + + template + requires requires { + std::declval().defer( + std::declval(), std::declval()); + } + constexpr void defer(F&& f, const A& a) const noexcept { + self_.run_([&]() { ex_.defer(wrap_(static_cast(f)), a); }); + } + + constexpr bool operator==(const executor& rhs) const noexcept { + return (&self_ == &rhs.self_) && (ex_ == rhs.ex_); + } + + bool operator!=(const executor& rhs) const = default; + }; + + } // namespace detail::completion_token + + struct completion_token_t { }; + + inline const completion_token_t completion_token{}; + +} // namespace asioexec + +namespace ASIOEXEC_ASIO_NAMESPACE { + + template + struct async_result<::asioexec::completion_token_t, Signatures...> { + template + static constexpr auto + initiate(Initiation&& i, const ::asioexec::completion_token_t&, Args&&... args) { + return ::asioexec::detail::completion_token::sender< + ::asioexec::detail::completion_token::completion_signatures, + std::remove_cvref_t, + Args...>(static_cast(i), static_cast(args)...); + } + }; + + template + struct associated_executor< + ::asioexec::detail::completion_token::completion_handler, + Executor> { + using type = ::asioexec::detail::completion_token::executor; + + static type get( + const ::asioexec::detail::completion_token::completion_handler& h, + const Executor& ex) noexcept { + return type(h.state(), ex); + } + }; + + template + requires requires(const Receiver& r) { ::stdexec::get_allocator(::stdexec::get_env(r)); } + struct associated_allocator< + ::asioexec::detail::completion_token::completion_handler, + Allocator> { + using type = std::remove_cvref_t< + decltype(::stdexec::get_allocator(::stdexec::get_env(std::declval())))>; + + static type get( + const ::asioexec::detail::completion_token::completion_handler& h, + const Allocator&) noexcept { + return ::stdexec::get_allocator(::stdexec::get_env(h.state().r_)); + } + }; + +} // namespace ASIOEXEC_ASIO_NAMESPACE diff --git a/include/asioexec/use_sender.hpp b/include/asioexec/use_sender.hpp new file mode 100644 index 000000000..95b046df3 --- /dev/null +++ b/include/asioexec/use_sender.hpp @@ -0,0 +1,207 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace asioexec { + + namespace detail::use_sender { + + template + concept is_error_code = std::is_same_v, error_code> + || std::is_same_v, std::error_code>; + + template + std::exception_ptr to_exception_ptr(T t) noexcept { + using exception_type = + std::conditional_t, std::system_error, system_error>; + try { + return std::make_exception_ptr(exception_type(static_cast(t))); + } catch (...) { + return std::current_exception(); + } + } + + template + struct receiver { + template + requires std::constructible_from + constexpr explicit receiver(T&& t) noexcept(std::is_nothrow_constructible_v) + : r_(static_cast(t)) { + } + + using receiver_concept = ::stdexec::receiver_t; + + constexpr void set_stopped() && noexcept + requires ::stdexec:: + receiver_of> + { + ::stdexec::set_stopped(static_cast(r_)); + } + + void set_error(std::exception_ptr ex) && noexcept + requires ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures<::stdexec::set_error_t(std::exception_ptr)>> + { + ::stdexec::set_error(static_cast(r_), std::move(ex)); + } + + template + requires is_error_code + && ::stdexec::receiver_of< + Receiver, + ::stdexec::completion_signatures< + ::stdexec::set_value_t(Args...), + ::stdexec::set_error_t(std::exception_ptr), + ::stdexec::set_stopped_t()>> + void set_value(T&& t, Args&&... args) && noexcept { + if (!t) { + ::stdexec::set_value(static_cast(r_), static_cast(args)...); + return; + } + if ([&]() noexcept { + using type = std::remove_cvref_t; + if constexpr (std::is_same_v) { + if (t == asio_impl::error::operation_aborted) { + return true; + } + } + if constexpr (std::is_same_v>) { + return t == std::errc::operation_canceled; + } else { + return t == errc::operation_canceled; + } + }()) { + ::stdexec::set_stopped(static_cast(r_)); + return; + } + ::stdexec::set_error( + static_cast(r_), use_sender::to_exception_ptr(static_cast(t))); + } + + template + requires ::stdexec:: + receiver_of> + constexpr void set_value(Args&&... args) && noexcept { + ::stdexec::set_value(static_cast(r_), static_cast(args)...); + } + + constexpr decltype(auto) get_env() const noexcept { + return ::stdexec::get_env(r_); + } + private: + Receiver r_; + }; + + template + struct transform_set_value { + using type = ::stdexec::completion_signatures<::stdexec::set_value_t(Args...)>; + }; + + template + requires is_error_code + struct transform_set_value { + using type = ::stdexec::completion_signatures<::stdexec::set_value_t(Args...)>; + }; + + template + using transform_set_value_t = typename transform_set_value::type; + + template + using completion_signatures = ::stdexec::transform_completion_signatures< + Signatures, + ::stdexec::completion_signatures<>, + transform_set_value_t>; + + template + struct sender { + template + requires std::constructible_from + constexpr explicit sender(T&& t) noexcept(std::is_nothrow_constructible_v) + : s_(static_cast(t)) { + } + + using sender_concept = ::stdexec::sender_t; + + template + consteval completion_signatures<::stdexec::completion_signatures_of_t> + get_completion_signatures(const Env&) && noexcept { + return {}; + } + + template + consteval completion_signatures<::stdexec::completion_signatures_of_t> + get_completion_signatures(const Env&) const & noexcept { + return {}; + } + + template + requires ::stdexec::sender_to> + constexpr auto connect(Receiver r) const & noexcept( + std::is_nothrow_constructible_v, Receiver>&& noexcept( + ::stdexec::connect(std::declval(), std::declval>()))) { + return ::stdexec::connect(s_, receiver(static_cast(r))); + } + + template + requires ::stdexec::sender_to> + constexpr auto connect(Receiver r) && noexcept( + std::is_nothrow_constructible_v, Receiver>&& noexcept( + ::stdexec::connect(std::declval(), std::declval>()))) { + return ::stdexec::connect( + static_cast(s_), receiver(static_cast(r))); + } + private: + Sender s_; + }; + + template + explicit sender(Sender) -> sender; + + } // namespace detail::use_sender + + struct use_sender_t { }; + + inline const use_sender_t use_sender{}; + +} // namespace asioexec + +namespace ASIOEXEC_ASIO_NAMESPACE { + + template + struct async_result<::asioexec::use_sender_t, Signatures...> { + template + static constexpr auto + initiate(Initiation&& i, const ::asioexec::use_sender_t&, Args&&... args) { + return ::asioexec::detail::use_sender::sender( + async_result<::asioexec::completion_token_t, Signatures...>::initiate( + static_cast(i), ::asioexec::completion_token, static_cast(args)...)); + } + }; + +} // namespace ASIOEXEC_ASIO_NAMESPACE diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0f57eb4f8..2e243e9f5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -104,6 +104,10 @@ if(STDEXEC_ENABLE_CUDA) add_subdirectory(nvexec) endif() +if(STDEXEC_ENABLE_ASIO) + add_subdirectory(asioexec) +endif() + # build failure tests: tests which parse a source file for an expected error # message icm_add_build_failure_test( diff --git a/test/asioexec/CMakeLists.txt b/test/asioexec/CMakeLists.txt new file mode 100644 index 000000000..ee8fab0dd --- /dev/null +++ b/test/asioexec/CMakeLists.txt @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2025 Robert Leahy. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +# +# Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://llvm.org/LICENSE.txt +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(asioexec_test_sources + ../test_main.cpp + test_completion_token.cpp + test_use_sender.cpp + ) + +add_executable(test.asioexec ${asioexec_test_sources}) +target_link_libraries(test.asioexec + PUBLIC + STDEXEC::stdexec + $ + $ + stdexec_executable_flags + Catch2::Catch2 + PRIVATE + common_test_settings) + +catch_discover_tests(test.asioexec) diff --git a/test/asioexec/test_completion_token.cpp b/test/asioexec/test_completion_token.cpp new file mode 100644 index 000000000..848dfcce0 --- /dev/null +++ b/test/asioexec/test_completion_token.cpp @@ -0,0 +1,710 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace stdexec; +using namespace asioexec; + +namespace { + + // connect_shared and start_shared ensure the operation state's lifetime ends + // within the completion signal handing of the receiver thereby ensuring any + // use of the operation state by the operation after it's sent a completion + // signal is caught be AddressSanitizer + + template + class connect_shared_receiver { + Receiver r_; + std::shared_ptr& ptr_; + + template + void complete_(const Tag& tag, Args&&... args) noexcept { + CHECK(ptr_); + CHECK(ptr_.use_count() == 1); + tag(std::move(r_), std::forward(args)...); + ptr_.reset(); + } + public: + using receiver_concept = receiver_t; + + template + requires std::constructible_from + constexpr explicit connect_shared_receiver(T&& t, std::shared_ptr& ptr) noexcept + : r_(std::forward(t)) + , ptr_(ptr) { + } + + constexpr void set_stopped() && noexcept + requires ::stdexec:: + receiver_of> + { + complete_(::stdexec::set_stopped); + } + + template + requires ::stdexec:: + receiver_of> + constexpr void set_error(T&& t) && noexcept { + complete_(::stdexec::set_error, std::forward(t)); + } + + template + requires ::stdexec:: + receiver_of> + constexpr void set_value(Args&&... args) && noexcept { + complete_(::stdexec::set_value, std::forward(args)...); + } + + constexpr decltype(auto) get_env() const noexcept { + return ::stdexec::get_env(r_); + } + }; + + template + class connect_shared_operation_state { + using receiver_ = connect_shared_receiver>; + std::shared_ptr self_; + ::stdexec::connect_result_t op_; + public: + constexpr explicit connect_shared_operation_state(Sender&& s, Receiver&& r) + : op_( + ::stdexec::connect(std::forward(s), receiver_(std::forward(r), self_))) { + } + + void start(std::shared_ptr&& ptr) & noexcept { + CHECK(ptr.get() == this); + CHECK(ptr.use_count() == 1); + self_ = std::move(ptr); + ::stdexec::start(op_); + } + }; + + template + auto connect_shared(Sender&& sender, Receiver&& receiver) { + return std::make_shared>( + std::forward(sender), std::forward(receiver)); + } + + template + void + start_shared(std::shared_ptr>&& ptr) noexcept { + REQUIRE(ptr); + auto&& state = *ptr; + state.start(std::move(ptr)); + } + + TEST_CASE( + "Asio-based asynchronous operation ends with error when canceled", + "[asioexec][completion_token]") { + inplace_stop_source source; + + const struct { + auto query(const get_stop_token_t&) const noexcept { + return source_.get_token(); + } + + inplace_stop_source& source_; + } e{source}; + + error_code err; + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = t.async_wait(completion_token); + static_assert(set_equivalent< + completion_signatures_of_t>, + completion_signatures< + set_stopped_t(), + set_error_t(std::exception_ptr), + set_value_t(error_code)>>); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = connect_shared(std::move(sender), expect_value_receiver_ex(e, err)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + source.request_stop(); + start_shared(std::move(op)); + CHECK(!err); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(err == asio_impl::error::operation_aborted); + } + + TEST_CASE("Posting may be redirected to a sender", "[asioexec][completion_token]") { + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = asio_impl::post(ctx.get_executor(), completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = connect_shared(std::move(sender), expect_value_receiver<>{}); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start_shared(std::move(op)); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + } + + template + decltype(auto) async_throw_from_initiation(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [](const auto&) { throw std::logic_error("Test"); }, token); + } + + TEST_CASE( + "When initiating the asynchronous operation throws this causes the " + "sender to send a std::exception_ptr", + "[asioexec][completion_token]") { + std::exception_ptr ex; + auto op = + connect_shared(async_throw_from_initiation(completion_token), expect_error_receiver_ex(ex)); + CHECK(!ex); + start_shared(std::move(op)); + CHECK(ex); + } + + template + decltype(auto) async_throw_from_completion(const Executor& ex, CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [ex](auto h) { + const auto assoc = asio_impl::get_associated_executor(h, ex); + asio_impl::post(ex, asio_impl::bind_executor(assoc, [h = std::move(h)]() { + throw std::logic_error("Test"); + })); + }, + token); + } + + TEST_CASE( + "When a completion handler invoked as part of a composed " + "asynchronous operation throws that exception is captured and sent as a " + "std::exception_ptr", + "[asioexec][completion_token]") { + std::exception_ptr ex; + asio_impl::io_context ctx; + auto sender = async_throw_from_completion(ctx.get_executor(), completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = connect_shared(std::move(sender), expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ex); + start_shared(std::move(op)); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(ex); + } + + TEST_CASE( + "When an operation is abandoned this is reported via a stopped " + "signal", + "[asioexec][completion_token]") { + bool stopped = false; + expect_stopped_receiver_ex r(stopped); + using sender_type = + decltype(std::declval().async_wait(completion_token)); + using operation_state_type = connect_result_t; + std::optional op; + { + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + + struct { + operator operation_state_type() { + return ::stdexec::connect(std::move(s_), std::move(r_)); + } + + sender_type s_; + decltype(r) r_; + } elide{t.async_wait(completion_token), std::move(r)}; + + op.emplace(std::move(elide)); + start(*op); + CHECK(!stopped); + } + CHECK(stopped); + } + + template + decltype(auto) async_indirect_completion_handler( + const Executor& ex, + std::shared_ptr& ptr, + CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [&ptr](auto h, const auto& ex) { + auto local = std::make_shared(std::move(h)); + const auto assoc = asio_impl::get_associated_executor(*local, ex); + ptr = local; + asio_impl::post(ex, asio_impl::bind_executor(assoc, [local]() mutable { + std::invoke(std::move(*local)); + local.reset(); + })); + }, + token, + ex); + } + + TEST_CASE( + "If the completion handler outlives completion of the operation " + "the receiver contract is still satisfied eagerly", + "[asioexec][completion_token]") { + bool invoked = false; + std::shared_ptr ptr; + asio_impl::io_context ctx; + auto sender = async_indirect_completion_handler(ctx.get_executor(), ptr, completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + auto op = connect_shared(std::move(sender), expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + start_shared(std::move(op)); + CHECK(ptr); + CHECK(!invoked); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(invoked); + CHECK(ptr.use_count() == 1U); + } + + template + decltype(auto) async_indirect_completion_handler_throw_from_completion( + const Executor& ex, + std::shared_ptr& ptr, + CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [&ptr](auto h, const auto& ex) { + auto local = std::make_shared(std::move(h)); + const auto assoc = asio_impl::get_associated_executor(*local, ex); + ptr = local; + asio_impl::post(ex, asio_impl::bind_executor(assoc, [local]() mutable { + local.reset(); + throw std::logic_error("Test"); + })); + }, + token, + ex); + } + + TEST_CASE( + "If the completion handler outlives completion of the operation " + "satisfaction of the receiver contract is deferred until the end of the " + "completion handler's lifetime (this is necessary in situations where " + "asynchronous control flow bifurcates and one of the child operations ends " + "via exception)", + "[asioexec][completion_token]") { + std::exception_ptr ex; + std::shared_ptr ptr; + asio_impl::io_context ctx; + auto sender = async_indirect_completion_handler_throw_from_completion( + ctx.get_executor(), ptr, completion_token); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + auto op = connect_shared(std::move(sender), expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + CHECK(!ptr); + start_shared(std::move(op)); + CHECK(ptr); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(!ex); + CHECK(ptr.use_count() == 1U); + ptr.reset(); + CHECK(ex); + } + + template + decltype(auto) async_multishot(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [](auto&& h) { std::forward(h)(); }, token); + } + + TEST_CASE("When appropriate the yielded sender is multi-shot", "[asioexec][completion_token]") { + const auto sender = async_multishot(completion_token); + auto a = connect_shared(sender, expect_void_receiver{}); + auto b = connect_shared(sender, expect_void_receiver{}); + start_shared(std::move(a)); + start_shared(std::move(b)); + } + + template + decltype(auto) async_single_shot(CompletionToken&& token) { + using signature_type = void(); + return asio_impl::async_initiate( + [ptr = std::make_unique(5)](auto&& h) { std::forward(h)(); }, token); + } + + TEST_CASE("When appropriate the yielded sender is single shot", "[asioexec][completion_token]") { + auto sender = async_single_shot(completion_token); + static_assert(!::stdexec::sender_to>); + start_shared(connect_shared(std::move(sender), expect_void_receiver{})); + } + + TEST_CASE( + "When an operation is abandoned by the initiating function " + "set_stopped is sent immediately", + "[asioexec][completion_token]") { + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate([](auto&&) noexcept {}, token); + }; + start_shared(connect_shared(initiating_function(completion_token), expect_stopped_receiver{})); + } + + class thread { + asio_impl::io_context ctx_; + asio_impl::executor_work_guard g_; + std::thread t_; + public: + thread() + : g_(ctx_.get_executor()) + , t_([&]() noexcept { + try { + ctx_.run(); + } catch (...) { + FAIL("Exception thrown in background thread"); + } + }) { + } + + ~thread() noexcept { + g_.reset(); + if (t_.joinable()) { + t_.join(); + } + } + + void join() noexcept { + g_.reset(); + t_.join(); + } + + asio_impl::io_context& context() noexcept { + return ctx_; + } + }; + + template + struct ping_pong { + explicit ping_pong(asio_impl::io_context& a, asio_impl::io_context& b, CompletionHandler h) + : a_(a) + , a_g_(a.get_executor()) + , b_(b) + , b_g_(b.get_executor()) + , h_(std::move(h)) { + } + + void operator()() && { + if (i_ == 10000) { + a_g_.reset(); + b_g_.reset(); + std::invoke(std::move(h_)); + return; + } + const auto ex = [&]() noexcept { + if (i_ % 2) { + return a_.get_executor(); + } + return b_.get_executor(); + }(); + ++i_; + asio_impl::post(ex, std::move(*this)); + } + + asio_impl::io_context& a_; + asio_impl::executor_work_guard a_g_; + asio_impl::io_context& b_; + asio_impl::executor_work_guard b_g_; + CompletionHandler h_; + std::size_t i_{0}; + }; + + TEST_CASE( + "Intermediate completion handlers being passed between threads has " + "no effect on the transformation of an initiating function into a sender", + "[asioexec][completion_token]") { + thread a; + thread b; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&a, &b](auto&& h) { + ping_pong>{ + a.context(), b.context(), std::forward(h)}(); + }, + token); + }; + start_shared(connect_shared(initiating_function(completion_token), expect_void_receiver{})); + a.join(); + b.join(); + } + + TEST_CASE( + "When the initiating function posts and then throws, and the " + "posted operation simply abandons the completion handler, the operation " + "completes after the post with the thrown error", + "[asioexec][completion_token]") { + std::exception_ptr ex; + asio_impl::io_context ctx; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&](auto&& h) { + asio_impl::post(ctx.get_executor(), [h = std::move(h)]() noexcept {}); + throw std::logic_error("Test"); + }, + token); + }; + auto op = connect_shared(initiating_function(completion_token), expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start_shared(std::move(op)); + CHECK(!ex); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(ex); + } + + TEST_CASE( + "When there are two parallel paths of asynchronous execution, and " + "one of them throws an exception, and the other is abandoned, the " + "operation completes with the thrown exception", + "[asioexec][completion_token]") { + for (unsigned u = 0; u < 2; ++u) { + std::exception_ptr ex; + asio_impl::io_context ctx; + const auto initiating_function = [&](auto&& token) { + return asio_impl::async_initiate( + [&](auto&& h) { + auto ptr = + std::make_shared>(std::forward(h)); + const auto ex = asio_impl::get_associated_executor(*ptr, ctx.get_executor()); + if (u) { + asio_impl::post(ex, [ptr]() { throw std::logic_error("Test"); }); + asio_impl::post(ex, [ptr = std::move(ptr)]() noexcept {}); + } else { + asio_impl::post(ex, [ptr]() noexcept {}); + asio_impl::post(ex, [ptr = std::move(ptr)]() { throw std::logic_error("Test"); }); + } + }, + token); + }; + auto op = connect_shared(initiating_function(completion_token), expect_error_receiver_ex(ex)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start_shared(std::move(op)); + CHECK(!ex); + CHECK(ctx.poll_one()); + CHECK(!ctx.stopped()); + CHECK(!ex); + CHECK(ctx.poll_one()); + CHECK(ctx.stopped()); + CHECK(ex); + } + } + + TEST_CASE( + "When the Asio signature suggests the operation will send an " + "rvalue, but a const lvalue is sent, and the decay-copy throws, that " + "exception is sent as an error in completing the operation", + "[asioexec][completion_token]") { + class obj { + bool throw_; + public: + explicit obj(bool t = false) noexcept + : throw_(t) { + } + + obj(obj&&) = default; + + obj(const obj& other) + : throw_(other.throw_) { + if (throw_) { + throw std::logic_error("Test"); + } + } + + obj& operator=(const obj&) = delete; + + bool operator==(const obj& rhs) const noexcept { + return throw_ == rhs.throw_; + } + }; + + const obj expected; + std::exception_ptr ex; + const auto initiating_function = [](obj o, auto&& token) { + return asio_impl::async_initiate( + [o = std::move(o)](auto&& h) { + // Lambda isn't mutable and there's no std::move so o's type is + // const obj& + std::invoke(std::forward(h), o); + }, + token); + }; + auto a = + connect_shared(initiating_function(obj{}, completion_token), expect_value_receiver(expected)); + auto b = connect_shared( + initiating_function(obj(true), completion_token), expect_error_receiver_ex(ex)); + start_shared(std::move(a)); + CHECK(!ex); + start_shared(std::move(b)); + CHECK(ex); + } + + struct value_category_receiver { + using receiver_concept = receiver_t; + + void set_value(std::mutex&&) && noexcept { + CHECK(kind_ == kind::none); + kind_ = kind::rvalue; + } + + void set_value(std::mutex&) && noexcept { + CHECK(kind_ == kind::none); + kind_ = kind::mutable_lvalue; + } + + void set_value(const std::mutex&) && noexcept { + CHECK(kind_ == kind::none); + kind_ = kind::const_lvalue; + } + + void set_error(std::exception_ptr) && noexcept { + CHECK(kind_ == kind::none); + kind_ = kind::error; + } + + void set_stopped() && noexcept { + CHECK(kind_ == kind::none); + kind_ = kind::stopped; + } + enum class kind { + none, + rvalue, + mutable_lvalue, + const_lvalue, + error, + stopped + }; + + constexpr explicit value_category_receiver(kind& k) noexcept + : kind_(k) { + } + private: + kind& kind_; + }; + + TEST_CASE( + "When the operation declares separate rvalue and lvalue completion signatures they are " + "appropriately passed through", + "[asioexec][completion_token]") { + const auto initiating_function = [](const bool rvalue, auto&& token) { + return asio_impl::async_initiate( + [rvalue](auto&& h) { + std::mutex m; + if (rvalue) { + std::invoke(std::forward(h), std::move(m)); + } else { + std::invoke(std::forward(h), m); + } + }, + token); + }; + value_category_receiver::kind rvalue_kind{value_category_receiver::kind::none}; + value_category_receiver::kind lvalue_kind{value_category_receiver::kind::none}; + auto rvalue = connect_shared( + initiating_function(true, completion_token), value_category_receiver(rvalue_kind)); + auto lvalue = connect_shared( + initiating_function(false, completion_token), value_category_receiver(lvalue_kind)); + CHECK(rvalue_kind == value_category_receiver::kind::none); + start_shared(std::move(rvalue)); + CHECK(rvalue_kind == value_category_receiver::kind::rvalue); + CHECK(lvalue_kind == value_category_receiver::kind::none); + start_shared(std::move(lvalue)); + CHECK(lvalue_kind == value_category_receiver::kind::mutable_lvalue); + } + + TEST_CASE( + "When the operation declares separate rvalue and const lvalue completion signatures they are " + "appropriately passed through even if the lvalue is sent mutable", + "[asioexec][completion_token]") { + const auto initiating_function = [](const bool rvalue, auto&& token) { + return asio_impl::async_initiate( + [rvalue](auto&& h) { + std::mutex m; + if (rvalue) { + std::invoke(std::forward(h), std::move(m)); + } else { + std::invoke(std::forward(h), m); + } + }, + token); + }; + value_category_receiver::kind rvalue_kind{value_category_receiver::kind::none}; + value_category_receiver::kind lvalue_kind{value_category_receiver::kind::none}; + auto rvalue = connect_shared( + initiating_function(true, completion_token), value_category_receiver(rvalue_kind)); + auto lvalue = connect_shared( + initiating_function(false, completion_token), value_category_receiver(lvalue_kind)); + CHECK(rvalue_kind == value_category_receiver::kind::none); + start_shared(std::move(rvalue)); + CHECK(rvalue_kind == value_category_receiver::kind::rvalue); + CHECK(lvalue_kind == value_category_receiver::kind::none); + start_shared(std::move(lvalue)); + CHECK(lvalue_kind == value_category_receiver::kind::const_lvalue); + } + +} // namespace diff --git a/test/asioexec/test_use_sender.cpp b/test/asioexec/test_use_sender.cpp new file mode 100644 index 000000000..9440cf3c6 --- /dev/null +++ b/test/asioexec/test_use_sender.cpp @@ -0,0 +1,229 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2025 Robert Leahy. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + * + * Licensed under the Apache License, Version 2.0 with LLVM Exceptions (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://llvm.org/LICENSE.txt + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace stdexec; +using namespace asioexec; + +namespace { + + static_assert( + set_equivalent< + detail::use_sender::completion_signatures>, + completion_signatures>); + static_assert( + set_equivalent< + detail::use_sender::completion_signatures>, + completion_signatures>); + static_assert( + set_equivalent< + detail::use_sender::completion_signatures>, + completion_signatures>); + + TEST_CASE( + "Asio-based asynchronous operation ends with set_stopped when " + "cancellation occurs", + "[asioexec][use_sender]") { + bool stopped = false; + inplace_stop_source source; + + const struct { + auto query(const get_stop_token_t&) const noexcept { + return source_.get_token(); + } + + inplace_stop_source& source_; + } e{source}; + + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::years(1)); + auto sender = t.async_wait(use_sender); + static_assert( + set_equivalent< + ::stdexec::completion_signatures_of_t>, + completion_signatures>); + static_assert( + set_equivalent< + ::stdexec::completion_signatures_of_t>, + completion_signatures>); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect(std::move(sender), expect_stopped_receiver_ex(e, stopped)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + source.request_stop(); + start(op); + CHECK(!stopped); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(stopped); + } + + TEST_CASE( + "std::errc::operation_canceled causes the operation to end with " + "set_stopped", + "[asioexec][use_sender]") { + const auto initiating_function = [](auto&& token) { + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke(std::forward(h), make_error_code(std::errc::operation_canceled)); + }, + token); + }; + bool stopped = false; + auto op = + ::stdexec::connect(initiating_function(use_sender), expect_stopped_receiver_ex(stopped)); + CHECK(!stopped); + start(op); + CHECK(stopped); + } + + TEST_CASE( + "errc::operation_canceled (note in the case of Boost.Asio this is " + "boost::system::errc::operation_canceled) causes the operation to end with " + "set_stopped", + "[asioexec][use_sender]") { + const auto initiating_function = [](auto&& token) { + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke(std::forward(h), make_error_code(errc::operation_canceled)); + }, + token); + }; + bool stopped = false; + auto op = + ::stdexec::connect(initiating_function(use_sender), expect_stopped_receiver_ex(stopped)); + CHECK(!stopped); + start(op); + CHECK(stopped); + } + + TEST_CASE( + "When an Asio-based asynchronous operation which could fail " + "completes successfully the success is reported via a value completion " + "signal", + "[asioexec][use_sender]") { + bool invoked = false; + asio_impl::io_context ctx; + asio_impl::system_timer t(ctx); + t.expires_after(std::chrono::milliseconds(1)); + auto sender = t.async_wait(use_sender); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect(std::move(sender), expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!invoked); + CHECK(ctx.run()); + CHECK(invoked); + } + + TEST_CASE("Post works with use_sender", "[asioexec][use_sender]") { + bool invoked = false; + asio_impl::io_context ctx; + auto sender = asio_impl::post(ctx, use_sender); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + auto op = ::stdexec::connect(std::move(sender), expect_void_receiver_ex(invoked)); + CHECK(!ctx.poll()); + CHECK(ctx.stopped()); + ctx.restart(); + start(op); + CHECK(!invoked); + CHECK(ctx.poll()); + CHECK(ctx.stopped()); + CHECK(invoked); + } + + template + decltype(auto) async_error_code(CompletionToken&& token) { + using signature_type = void(error_code); + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke( + std::forward(h), error_code(make_error_code(std::errc::not_enough_memory))); + }, + token); + } + + TEST_CASE( + "Error codes native to the version of Asio used are transformed " + "into a system_error", + "[asioexec][use_sender]") { + std::exception_ptr ex; + auto op = ::stdexec::connect(async_error_code(use_sender), expect_error_receiver_ex(ex)); + CHECK(!ex); + start(op); + REQUIRE(ex); + CHECK_THROWS_AS(std::rethrow_exception(std::move(ex)), system_error); + } + + template + decltype(auto) async_std_error_code(CompletionToken&& token) { + using signature_type = void(std::error_code); + return asio_impl::async_initiate( + [](auto&& h) { + std::invoke(std::forward(h), make_error_code(std::errc::not_enough_memory)); + }, + token); + } + + TEST_CASE( + "Standard error codes are transformed into a std::system_error " + "(note that in the case of standalone Asio the error code native to that " + "version of Asio a std::error_code are the same and therefore this is a " + "duplicate of another test)", + "[asioexec][use_sender]") { + std::exception_ptr ex; + auto op = ::stdexec::connect(async_std_error_code(use_sender), expect_error_receiver_ex(ex)); + CHECK(!ex); + start(op); + REQUIRE(ex); + CHECK_THROWS_AS(std::rethrow_exception(std::move(ex)), std::system_error); + } + +} // namespace