Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion quantum/impl/quantum_context_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//#################################### IMPLEMENTATIONS #########################################
//##############################################################################################
#include <quantum/quantum_allocator.h>
#include <quantum/quantum_traits.h>

namespace Bloomberg {
namespace quantum {
Expand Down Expand Up @@ -1341,7 +1342,11 @@ void Context<RET>::waitAll(ICoroSync::Ptr sync) const
{
promise->getICoroFutureBase()->wait(sync);
}
catch(...) //catch all broken promises or any other exception
catch (const Traits::CoroutineStackUnwind&) {
//allow coroutine stack to unwind properly
throw;
}
catch (...) //catch all broken promises or any other exception
{
}
}
Expand Down
9 changes: 9 additions & 0 deletions quantum/quantum_dispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace quantum {
/// @class Dispatcher.
/// @brief Parallel execution engine used to run coroutines or IO tasks asynchronously.
/// This class is the main entry point into the library.
/// @warning Please read the following for [exception safety](https://www.boost.org/doc/libs/1_68_0/libs/coroutine2/doc/html/coroutine2/coroutine/asymmetric.html#coroutine2.coroutine.asymmetric.exceptions)
class Dispatcher : public ITerminate
{
public:
Expand Down Expand Up @@ -59,6 +60,8 @@ class Dispatcher : public ITerminate
/// @return A pointer to a thread context object.
/// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain
/// further coroutines.
/// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
/// unwind exceptions from propagating which are not derived from std::exception (see link at the top)
template <class RET = Deprecated, class FUNC, class ... ARGS>
auto post(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>;

Expand All @@ -82,6 +85,8 @@ class Dispatcher : public ITerminate
/// @return A pointer to a thread context object.
/// @note This function is non-blocking and returns immediately. The returned thread context cannot be used to chain
/// further coroutines.
/// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
/// unwind exceptions from propagating which are not derived from std::exception (see link at the top).
template <class RET = Deprecated, class FUNC, class ... ARGS>
auto post(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
->ThreadContextPtr<decltype(coroResult(func))>;
Expand All @@ -102,6 +107,8 @@ class Dispatcher : public ITerminate
/// @return A pointer to a thread context object.
/// @note This function is non-blocking and returns immediately. The returned context can be used to chain other
/// coroutines which will run sequentially.
/// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
/// unwind exceptions from propagating which are not derived from std::exception (see link at the top).
template <class RET = Deprecated, class FUNC, class ... ARGS>
auto postFirst(FUNC&& func, ARGS&&... args)->ThreadContextPtr<decltype(coroResult(func))>;

Expand All @@ -125,6 +132,8 @@ class Dispatcher : public ITerminate
/// @return A pointer to a thread context object.
/// @note This function is non-blocking and returns immediately. The returned context can be used to chain other
/// coroutines which will run sequentially.
/// @warning User functions should *never* catch all exceptions with (...) as it may block coroutine stack
/// unwind exceptions from propagating which are not derived from std::exception (see link at the top).
template <class RET = Deprecated, class FUNC, class ... ARGS>
auto postFirst(int queueId, bool isHighPriority, FUNC&& func, ARGS&&... args)
->ThreadContextPtr<decltype(coroResult(func))>;
Expand Down
2 changes: 2 additions & 0 deletions quantum/quantum_traits.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct Traits
using BoostCoro = boost::coroutines2::coroutine<int&>;
using Yield = typename BoostCoro::pull_type;
using Coroutine = typename BoostCoro::push_type;
//NOTE: 'boost::coroutines2::detail::forced_unwind' is deprecated
using CoroutineStackUnwind = boost::context::detail::forced_unwind;

template <class IT>
using IsInputIterator = std::enable_if_t<std::is_convertible<typename std::iterator_traits<IT>::iterator_category, std::input_iterator_tag>::value>;
Expand Down
9 changes: 7 additions & 2 deletions quantum/util/impl/quantum_sequencer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <quantum/util/quantum_drain_guard.h>
#include <quantum/quantum_promise.h>
#include <quantum/quantum_traits.h>
#include <stdexcept>

namespace Bloomberg {
Expand Down Expand Up @@ -508,14 +509,18 @@ Sequencer<SequenceKey, Hash, KeyEqual, Allocator>::callPosted(
std::forward<FUNC>(func)(ctx, std::forward<ARGS>(args)...);
return ctx->set(Void{});
}
catch(std::exception& ex)
catch (const Traits::CoroutineStackUnwind&) {
//allow coroutine stack to unwind properly
throw;
}
catch (const std::exception& ex)
{
if (sequencer._exceptionCallback)
{
sequencer._exceptionCallback(std::current_exception(), opaque);
}
}
catch(...)
catch (...)
{
if (sequencer._exceptionCallback)
{
Expand Down
33 changes: 17 additions & 16 deletions quantum/util/impl/quantum_util_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//#################################### IMPLEMENTATIONS #########################################
//##############################################################################################
#include <quantum/util/quantum_future_joiner.h>
#include <quantum/quantum_traits.h>

namespace Bloomberg {
namespace quantum {
Expand All @@ -41,11 +42,11 @@ int bindCoro(Traits::Yield& yield,
yield.get() = rc;
return 0;
}
catch (const boost::context::detail::forced_unwind&)
{
catch (const Traits::CoroutineStackUnwind&) {
//allow coroutine stack to unwind properly
throw;
}
catch(const std::exception& ex)
catch (const std::exception& ex)
{
UNUSED(ex);
#ifdef __QUANTUM_PRINT_DEBUG
Expand All @@ -54,11 +55,11 @@ int bindCoro(Traits::Yield& yield,
#endif
ctx->setException(std::current_exception());
}
catch(...)
catch (...)
{
#ifdef __QUANTUM_PRINT_DEBUG
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cerr << "Caught unknown exception." << std::endl;
std::cerr << "Caught unknown exception in coroutine." << std::endl;
#endif
ctx->setException(std::current_exception());
}
Expand All @@ -81,11 +82,11 @@ int bindCoro2(Traits::Yield& yield,
yield.get() = rc;
return 0;
}
catch (const boost::context::detail::forced_unwind&)
{
catch (const Traits::CoroutineStackUnwind&) {
//allow coroutine stack to unwind properly
throw;
}
catch(const std::exception& ex)
catch (const std::exception& ex)
{
UNUSED(ex);
#ifdef __QUANTUM_PRINT_DEBUG
Expand All @@ -94,11 +95,11 @@ int bindCoro2(Traits::Yield& yield,
#endif
ctx->setException(std::current_exception());
}
catch(...)
catch (...)
{
#ifdef __QUANTUM_PRINT_DEBUG
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cerr << "Caught unknown exception." << std::endl;
std::cerr << "Caught unknown exception in coroutine." << std::endl;
#endif
ctx->setException(std::current_exception());
}
Expand All @@ -114,7 +115,7 @@ int bindIo(std::shared_ptr<Promise<RET>> promise,
{
return std::forward<CAPTURE>(capture)();
}
catch(const std::exception& ex)
catch (const std::exception& ex)
{
UNUSED(ex);
#ifdef __QUANTUM_PRINT_DEBUG
Expand All @@ -123,11 +124,11 @@ int bindIo(std::shared_ptr<Promise<RET>> promise,
#endif
promise->setException(std::current_exception());
}
catch(...)
catch (...)
{
#ifdef __QUANTUM_PRINT_DEBUG
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cerr << "Caught unknown exception." << std::endl;
std::cerr << "Caught unknown exception in IO task." << std::endl;
#endif
promise->setException(std::current_exception());
}
Expand All @@ -143,7 +144,7 @@ int bindIo2(std::shared_ptr<Promise<RET>> promise,
promise->set(std::forward<CAPTURE>(capture)());
return 0;
}
catch(const std::exception& ex)
catch (const std::exception& ex)
{
UNUSED(ex);
#ifdef __QUANTUM_PRINT_DEBUG
Expand All @@ -152,11 +153,11 @@ int bindIo2(std::shared_ptr<Promise<RET>> promise,
#endif
promise->setException(std::current_exception());
}
catch(...)
catch (...)
{
#ifdef __QUANTUM_PRINT_DEBUG
std::lock_guard<std::mutex> guard(Util::LogMutex());
std::cerr << "Caught unknown exception." << std::endl;
std::cerr << "Caught unknown exception in IO task." << std::endl;
#endif
promise->setException(std::current_exception());
}
Expand Down
4 changes: 2 additions & 2 deletions tests/quantum_sequencer_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,14 @@ TEST_P(SequencerTest, ExceptionHandler)
// the exception above must be thrown
ASSERT_TRUE(false);
}
catch(const std::exception& e)
catch (const std::exception& e)
{
EXPECT_EQ(e.what(), errorText);
ASSERT_NE(opaque, nullptr);
SequencerTestData::TaskId taskId = *static_cast<SequencerTestData::TaskId*>(opaque);
EXPECT_EQ(taskId % exceptionFrequency, 0);
}
catch(...)
catch (...)
{
ASSERT_TRUE(false);
}
Expand Down
12 changes: 6 additions & 6 deletions tests/quantum_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -829,9 +829,9 @@ TEST_P(PromiseTest, BufferedFutureException)
}
// Don't close the buffer but throw instead
try {
throw 5;
throw std::runtime_error("42");
}
catch (...)
catch (const std::exception&)
{
return ctx->setException(std::current_exception());
}
Expand All @@ -847,7 +847,7 @@ TEST_P(PromiseTest, BufferedFutureException)
if (isBufferClosed) break;
v.push_back(value);
}
catch (...) {
catch (const std::exception&) {
wasCaught = true;
break;
}
Expand Down Expand Up @@ -1000,14 +1000,14 @@ TEST_P(PromiseTest, SetExceptionInPromise)
Dispatcher& dispatcher = getDispatcher();
IThreadContext<int>::Ptr ctx = dispatcher.post([](ICoroContext<int>::Ptr ctx)->int{
try {
throw 5;
throw std::runtime_error("42");
}
catch (...)
catch (const std::exception&)
{
return ctx->setException(std::current_exception());
}
});
EXPECT_THROW(ctx->get(), int);
EXPECT_THROW(ctx->get(), std::runtime_error);
}

TEST_P(PromiseTest, FutureTimeout)
Expand Down