diff --git a/DESCRIPTION b/DESCRIPTION index f01eba696..d51de8a09 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: RcppParallel Type: Package Title: Parallel Programming Tools for 'Rcpp' -Version: 5.1.9.9000 +Version: 5.1.10.9000 Authors@R: c( person("Kevin", "Ushey", role = c("aut", "cre"), email = "kevin@rstudio.com", comment = c(ORCID = "0000-0003-2880-7407")), diff --git a/NEWS.md b/NEWS.md index b571cbbf4..c57f63086 100644 --- a/NEWS.md +++ b/NEWS.md @@ -9,6 +9,12 @@ implementation. In practice, this implies that RcppParallel will now only provide a TBB backend with R (>= 4.2.0). +## RcppParallel 5.1.10 + +* Fixed an issue where packages linking to RcppParallel could inadverently + depend on internals of the TBB library available during compilation, even + if the package did not explicitly use TBB itself. + ## RcppParallel 5.1.9 * RcppParallel no longer passes `-rpath` when building / linking on Windows. diff --git a/R/flags.R b/R/flags.R index 32f641840..6ae1f8280 100644 --- a/R/flags.R +++ b/R/flags.R @@ -1,32 +1,32 @@ #' Compilation flags for RcppParallel -#' +#' #' Output the compiler or linker flags required to build against RcppParallel. -#' +#' #' These functions are typically called from `Makevars` as follows: -#' +#' #' ``` #' PKG_LIBS += $(shell "${R_HOME}/bin/Rscript" -e "RcppParallel::LdFlags()") #' ``` -#' +#' #' On Windows, the flags ensure that the package links with the built-in TBB #' library. On Linux and macOS, the output is empty, because TBB is loaded #' dynamically on load by `RcppParallel`. -#' +#' #' \R packages using RcppParallel should also add the following to their #' `NAMESPACE` file: -#' +#' #' ``` #' importFrom(RcppParallel, RcppParallelLibs) #' ``` -#' +#' #' This is necessary to ensure that \pkg{RcppParallel} (and so, TBB) is loaded #' and available. -#' +#' #' @name flags #' @rdname flags #' @aliases RcppParallelLibs LdFlags CxxFlags -#' +#' #' @return Returns \code{NULL}, invisibly. These functions are called for #' their side effects (writing the associated flags to stdout). #' diff --git a/R/tbb.R b/R/tbb.R index 9f4fc8db5..f00f0322d 100644 --- a/R/tbb.R +++ b/R/tbb.R @@ -1,26 +1,26 @@ #' Get the Path to a TBB Library -#' +#' #' Retrieve the path to a TBB library. This can be useful for \R packages #' using RcppParallel that wish to use, or re-use, the version of TBB that #' RcppParallel has been configured to use. -#' +#' #' @param name #' The name of the TBB library to be resolved. Normally, this is one of #' `tbb`, `tbbmalloc`, or `tbbmalloc_proxy`. When `NULL`, the library #' path containing the TBB libraries is returned instead. -#' +#' #' @export tbbLibraryPath <- function(name = NULL) { - + # library paths for different OSes sysname <- Sys.info()[["sysname"]] - + # find root for TBB install tbbRoot <- Sys.getenv("TBB_LIB", unset = tbbRoot()) if (is.null(name)) return(tbbRoot) - + # form library names tbbLibNames <- list( "Darwin" = paste0("lib", name, ".dylib"), @@ -28,12 +28,12 @@ tbbLibraryPath <- function(name = NULL) { "SunOS" = paste0("lib", name, ".so"), "Linux" = paste0("lib", name, c(".so.2", ".so")) ) - + # skip systems that we know not to be compatible isCompatible <- !is_sparc() && !is.null(tbbLibNames[[sysname]]) if (!isCompatible) return(NULL) - + # find the request library (if any) libNames <- tbbLibNames[[sysname]] for (libName in libNames) { @@ -49,12 +49,12 @@ tbbLibraryPath <- function(name = NULL) { return(tbbName) } - + } tbbCxxFlags <- function() { - - if (!TBB_ENABLED) + + if (!TBB_ENABLED) return("-DRCPP_PARALLEL_USE_TBB=0") flags <- c("-DRCPP_PARALLEL_USE_TBB=1") @@ -66,7 +66,7 @@ tbbCxxFlags <- function() { flags <- c(flags, "-DTBB_USE_GCC_BUILTINS") } } - + # if TBB_INC is set, apply those library paths tbbInc <- Sys.getenv("TBB_INC", unset = TBB_INC) if (!file.exists(tbbInc)) { @@ -86,10 +86,10 @@ tbbCxxFlags <- function() { flags <- c(flags, paste0("-I", asBuildPath(tbbInc))) } - + # return flags as string paste(flags, collapse = " ") - + } # Return the linker flags required for TBB on this platform @@ -120,20 +120,20 @@ tbbLdFlags <- function() { fmt <- "-L%s -l%s -l%s" return(sprintf(fmt, asBuildPath(tbbLibraryPath()), TBB_NAME, TBB_MALLOC_NAME)) } - + # nothing required on other platforms "" - + } tbbRoot <- function() { - + if (nzchar(TBB_LIB)) return(TBB_LIB) - + rArch <- .Platform$r_arch parts <- c("lib", if (nzchar(rArch)) rArch) libDir <- paste(parts, collapse = "/") system.file(libDir, package = "RcppParallel") - + } diff --git a/R/zzz.R b/R/zzz.R index c30a2983a..28b50d333 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -43,7 +43,7 @@ loadTbbLibrary <- function(name) { # load RcppParallel library if available if (.Platform$OS.type != "windows") { - .dllInfo <<- library.dynam("RcppParallel", pkgname, libname) + .dllInfo <<- library.dynam("RcppParallel", pkgname, libname, local = FALSE) } } diff --git a/RcppParallel.Rproj b/RcppParallel.Rproj index dcbdb2bd7..e478960cd 100644 --- a/RcppParallel.Rproj +++ b/RcppParallel.Rproj @@ -13,8 +13,11 @@ Encoding: UTF-8 RnwWeave: Sweave LaTeX: pdfLaTeX +AutoAppendNewline: Yes +StripTrailingWhitespace: Yes + BuildType: Package PackageCleanBeforeInstall: No -PackageInstallArgs: --with-keep.source --clean +PackageInstallArgs: --with-keep.source PackageCheckArgs: --as-cran PackageRoxygenize: rd,collate,namespace diff --git a/inst/include/RcppParallel.h b/inst/include/RcppParallel.h index 45020af68..c7fdf0bc0 100644 --- a/inst/include/RcppParallel.h +++ b/inst/include/RcppParallel.h @@ -6,7 +6,7 @@ #include "RcppParallel/TinyThread.h" // Use TBB only where it's known to compile and work correctly -// (NOTE: Windows TBB is temporarily opt-in for packages for +// (NOTE: Windows TBB is temporarily opt-in for packages for // compatibility with CRAN packages not previously configured // to link to TBB in Makevars.win) #ifndef RCPP_PARALLEL_USE_TBB @@ -31,14 +31,14 @@ namespace RcppParallel { inline void parallelFor(std::size_t begin, - std::size_t end, + std::size_t end, Worker& worker, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1u); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelFor(begin, end, worker, grainSize, numThreads); @@ -51,14 +51,14 @@ inline void parallelFor(std::size_t begin, template inline void parallelReduce(std::size_t begin, - std::size_t end, + std::size_t end, Reducer& reducer, std::size_t grainSize = 1, int numThreads = -1) { - grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1); + grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1)); numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1); - + #if RCPP_PARALLEL_USE_TBB if (internal::backend() == internal::BACKEND_TBB) tbbParallelReduce(begin, end, reducer, grainSize, numThreads); diff --git a/inst/include/RcppParallel/Common.h b/inst/include/RcppParallel/Common.h index 3b9d79781..7ab91a5ab 100644 --- a/inst/include/RcppParallel/Common.h +++ b/inst/include/RcppParallel/Common.h @@ -19,51 +19,63 @@ inline int resolveValue(const char* envvar, if (useRequestedValue) return requestedValue; - + // otherwise, try reading the default from associated envvar // if the environment variable is unset, use the default const char* var = getenv(envvar); if (var == NULL) return defaultValue; - + // try to convert the string to a number // if an error occurs during conversion, just use default errno = 0; char* end; long value = strtol(var, &end, 10); - + // check for conversion failure if (end == var || *end != '\0' || errno == ERANGE) return defaultValue; - - // okay, return the parsed environment variable value + + // okay, return the parsed environment variable value return value; } +// Tag type used for disambiguating splitting constructors +struct Split {}; + // Work executed within a background thread. We implement dynamic // dispatch using vtables so we can have a stable type to cast // to from the void* passed to the worker thread (required because // the tinythreads interface allows to pass only a void* to the // thread main rather than a generic type / template) - -struct Worker -{ +struct Worker +{ // construct and destruct (delete virtually) Worker() {} virtual ~Worker() {} - + // dispatch work over a range of values - virtual void operator()(std::size_t begin, std::size_t end) = 0; - - // disable copying and assignment + virtual void operator()(std::size_t begin, std::size_t end) = 0; + private: + // disable copying and assignment Worker(const Worker&); void operator=(const Worker&); }; -// Tag type used for disambiguating splitting constructors +// Used for controlling the stack size for threads / tasks within a scope. +class ThreadStackSizeControl +{ +public: + ThreadStackSizeControl(); + ~ThreadStackSizeControl(); + +private: + // COPYING: not copyable + ThreadStackSizeControl(const ThreadStackSizeControl&); + ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); +}; -struct Split {}; } // namespace RcppParallel diff --git a/inst/include/RcppParallel/TBB.h b/inst/include/RcppParallel/TBB.h index 83f4a0c0b..f65c7cc5e 100644 --- a/inst/include/RcppParallel/TBB.h +++ b/inst/include/RcppParallel/TBB.h @@ -44,246 +44,114 @@ class task_scheduler_init { namespace RcppParallel { -namespace { +// This class is primarily used to implement type erasure. The goals here were: +// +// 1. Hide the tbb symbols / implementation details from client R packages. +// That is, they should get the tools they need only via RcppParallel. +// +// 2. Do this in a way that preserves binary compatibility with pre-existing +// classes that make use of parallelReduce(). +// +// 3. Ensure that those packages, when re-compiled without source changes, +// can still function as expected. +// +// The downside here is that all the indirection through std::function<> +// and the requirement for RTTI is probably expensive, but I couldn't find +// a better way forward that could also preserve binary compatibility with +// existing pre-built pacakges. +// +// Hopefully, in a future release, we can do away with this wrapper, once +// packages have been rebuilt and no longer implicitly depend on TBB internals. +struct ReducerWrapper { + + template + ReducerWrapper(T* reducer) + { + self_ = reinterpret_cast(reducer); + owned_ = false; -struct TBBWorker -{ - explicit TBBWorker(Worker& worker) : worker_(worker) {} - - void operator()(const tbb::blocked_range& r) const { - worker_(r.begin(), r.end()); - } + work_ = [&](void* self, std::size_t begin, std::size_t end) + { + (*reinterpret_cast(self))(begin, end); + }; -private: - Worker& worker_; -}; + split_ = [&](void* object, Split split) + { + return new T(*reinterpret_cast(object), split); + }; -template -struct TBBReducer -{ - explicit TBBReducer(Reducer& reducer) - : pSplitReducer_(NULL), reducer_(reducer) - { - } - - TBBReducer(TBBReducer& tbbReducer, tbb::split) - : pSplitReducer_(new Reducer(tbbReducer.reducer_, RcppParallel::Split())), - reducer_(*pSplitReducer_) - { - } - - virtual ~TBBReducer() { delete pSplitReducer_; } + join_ = [&](void* self, void* other) + { + (*reinterpret_cast(self)).join(*reinterpret_cast(other)); + }; - void operator()(const tbb::blocked_range& r) { - reducer_(r.begin(), r.end()); - } - - void join(const TBBReducer& tbbReducer) { - reducer_.join(tbbReducer.reducer_); + deleter_ = [&](void* object) + { + delete (T*) object; + }; } - -private: - Reducer* pSplitReducer_; - Reducer& reducer_; -}; -class TBBParallelForExecutor -{ -public: - - TBBParallelForExecutor(Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) - { - } - - void operator()() const + ~ReducerWrapper() { - TBBWorker tbbWorker(worker_); - tbb::parallel_for( - tbb::blocked_range(begin_, end_, grainSize_), - tbbWorker - ); + if (owned_) + { + deleter_(self_); + self_ = nullptr; + } } - -private: - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -template -class TBBParallelReduceExecutor -{ -public: - - TBBParallelReduceExecutor(Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) + void operator()(std::size_t begin, std::size_t end) const { + work_(self_, begin, end); } - - void operator()() const - { - TBBReducer tbbReducer(reducer_); - tbb::parallel_reduce( - tbb::blocked_range(begin_, end_, grainSize_), - tbbReducer - ); - } - -private: - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -class TBBArenaParallelForExecutor -{ -public: - - TBBArenaParallelForExecutor(tbb::task_group& group, - Worker& worker, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - worker_(worker), - begin_(begin), - end_(end), - grainSize_(grainSize) - { - } - - void operator()() const + ReducerWrapper(const ReducerWrapper& rhs, Split split) { - TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); - group_.run_and_wait(executor); - } - -private: - - tbb::task_group& group_; - Worker& worker_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; + self_ = rhs.split_(rhs.self_, split); + owned_ = true; -template -class TBBArenaParallelReduceExecutor -{ -public: - - TBBArenaParallelReduceExecutor(tbb::task_group& group, - Reducer& reducer, - std::size_t begin, - std::size_t end, - std::size_t grainSize) - : group_(group), - reducer_(reducer), - begin_(begin), - end_(end), - grainSize_(grainSize) - { + work_ = rhs.work_; + split_ = rhs.split_; + join_ = rhs.join_; + deleter_ = rhs.deleter_; } - - void operator()() const - { - TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); - group_.run_and_wait(executor); - } - -private: - - tbb::task_group& group_; - Reducer& reducer_; - std::size_t begin_; - std::size_t end_; - std::size_t grainSize_; -}; -class ThreadStackSizeControl -{ -public: - - ThreadStackSizeControl() - : control_(nullptr) + void join(const ReducerWrapper& rhs) const { - int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); - if (stackSize > 0) - { - control_ = new tbb::global_control( - tbb::global_control::thread_stack_size, - stackSize - ); - } - } - - ~ThreadStackSizeControl() - { - if (control_ != nullptr) - { - delete control_; - control_ = nullptr; - } + join_(self_, rhs.self_); } private: - - // COPYING: not copyable - ThreadStackSizeControl(const ThreadStackSizeControl&); - ThreadStackSizeControl& operator=(const ThreadStackSizeControl&); - - // private members - tbb::global_control* control_; - + void* self_ = nullptr; + bool owned_ = false; + + std::function work_; + std::function split_; + std::function join_; + std::function deleter_; }; - -} // anonymous namespace +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize = 1, + int numThreads = -1); -inline void tbbParallelFor(std::size_t begin, - std::size_t end, - Worker& worker, +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& wrapper, std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) -{ - ThreadStackSizeControl control; - - tbb::task_arena arena(numThreads); - tbb::task_group group; - - TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); - arena.execute(executor); -} + int numThreads = -1); template -inline void tbbParallelReduce(std::size_t begin, - std::size_t end, - Reducer& reducer, - std::size_t grainSize = 1, - int numThreads = tbb::task_arena::automatic) +void tbbParallelReduce(std::size_t begin, + std::size_t end, + Reducer& reducer, + std::size_t grainSize = 1, + int numThreads = -1) { - ThreadStackSizeControl control; - - tbb::task_arena arena(numThreads); - tbb::task_group group; - - TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); - arena.execute(executor); + ReducerWrapper wrapper(&reducer); + tbbParallelReduceImpl(begin, end, wrapper, grainSize, numThreads); } } // namespace RcppParallel diff --git a/inst/skeleton/vector-sum.cpp b/inst/skeleton/vector-sum.cpp index 30622e519..57b541659 100644 --- a/inst/skeleton/vector-sum.cpp +++ b/inst/skeleton/vector-sum.cpp @@ -19,37 +19,37 @@ using namespace Rcpp; using namespace RcppParallel; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + // accumulate just the element of the range I've been asked to void operator()(std::size_t begin, std::size_t end) { value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; + void join(const Sum& rhs) { + value += rhs.value; } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/inst/tests/cpp/innerproduct.cpp b/inst/tests/cpp/innerproduct.cpp index 7a1957205..fee3e41ce 100644 --- a/inst/tests/cpp/innerproduct.cpp +++ b/inst/tests/cpp/innerproduct.cpp @@ -19,43 +19,43 @@ double innerProduct(NumericVector x, NumericVector y) { using namespace RcppParallel; struct InnerProduct : public Worker -{ +{ // source vectors const RVector x; const RVector y; - + // product that I have accumulated double product; - + // constructors - InnerProduct(const NumericVector x, const NumericVector y) + InnerProduct(const NumericVector x, const NumericVector y) : x(x), y(y), product(0) {} - InnerProduct(const InnerProduct& innerProduct, Split) + InnerProduct(const InnerProduct& innerProduct, Split) : x(innerProduct.x), y(innerProduct.y), product(0) {} - + // process just the elements of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { - product += std::inner_product(x.begin() + begin, - x.begin() + end, - y.begin() + begin, + product += std::inner_product(x.begin() + begin, + x.begin() + end, + y.begin() + begin, 0.0); } - + // join my value with that of another InnerProduct - void join(const InnerProduct& rhs) { - product += rhs.product; + void join(const InnerProduct& rhs) { + product += rhs.product; } }; // [[Rcpp::export]] double parallelInnerProduct(NumericVector x, NumericVector y) { - + // declare the InnerProduct instance that takes a pointer to the vector data InnerProduct innerProduct(x, y); - + // call paralleReduce to start the work parallelReduce(0, x.length(), innerProduct); - + // return the computed product return innerProduct.product; } diff --git a/inst/tests/cpp/sum.cpp b/inst/tests/cpp/sum.cpp index aec4895f9..db47c699e 100644 --- a/inst/tests/cpp/sum.cpp +++ b/inst/tests/cpp/sum.cpp @@ -3,7 +3,7 @@ * @author JJ Allaire * @license GPL (>= 2) */ - + #include #include @@ -12,37 +12,37 @@ using namespace RcppParallel; using namespace Rcpp; struct Sum : public Worker -{ +{ // source vector const RVector input; - + // accumulated value double value; - + // constructors Sum(const NumericVector input) : input(input), value(0) {} Sum(const Sum& sum, Split) : input(sum.input), value(0) {} - + // accumulate just the element of the range I have been asked to void operator()(std::size_t begin, std::size_t end) { value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0); } - + // join my value with that of another Sum - void join(const Sum& rhs) { - value += rhs.value; - } + void join(const Sum& rhs) { + value += rhs.value; + } }; // [[Rcpp::export]] double parallelVectorSum(NumericVector x) { - - // declare the SumBody instance + + // declare the SumBody instance Sum sum(x); - + // call parallel_reduce to start the work parallelReduce(0, x.length(), sum); - + // return the computed sum return sum.value; } diff --git a/src/tbb.cpp b/src/tbb.cpp new file mode 100644 index 000000000..9aafc8c61 --- /dev/null +++ b/src/tbb.cpp @@ -0,0 +1,240 @@ + +#if RCPP_PARALLEL_USE_TBB + +#include +#include + +namespace RcppParallel { + +tbb::global_control* s_globalControl = nullptr; + +// TBB Tools ---- + +struct TBBWorker +{ + explicit TBBWorker(Worker& worker) : worker_(worker) {} + + void operator()(const tbb::blocked_range& r) const { + worker_(r.begin(), r.end()); + } + +private: + Worker& worker_; +}; + +ThreadStackSizeControl::ThreadStackSizeControl() +{ + int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0); + if (stackSize > 0) + { + s_globalControl = new tbb::global_control( + tbb::global_control::thread_stack_size, + stackSize + ); + } +} + +ThreadStackSizeControl::~ThreadStackSizeControl() +{ + if (s_globalControl != nullptr) + { + delete s_globalControl; + s_globalControl = nullptr; + } +} + + +// TBB Parallel For ---- + +class TBBParallelForExecutor +{ +public: + + TBBParallelForExecutor(Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBWorker tbbWorker(worker_); + tbb::parallel_for( + tbb::blocked_range(begin_, end_, grainSize_), + tbbWorker + ); + } + +private: + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelForExecutor +{ +public: + + TBBArenaParallelForExecutor(tbb::task_group& group, + Worker& worker, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + worker_(worker), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + Worker& worker_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelFor(std::size_t begin, + std::size_t end, + Worker& worker, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize); + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + + +// TBB Parallel Reduce ---- + +struct TBBReducer +{ + explicit TBBReducer(ReducerWrapper& reducer) + : pSplitReducer_(NULL), reducer_(reducer) + { + } + + TBBReducer(TBBReducer& tbbReducer, tbb::split) + : pSplitReducer_(new ReducerWrapper(tbbReducer.reducer_, RcppParallel::Split())), + reducer_(*pSplitReducer_) + { + } + + virtual ~TBBReducer() { delete pSplitReducer_; } + + void operator()(const tbb::blocked_range& r) + { + reducer_(r.begin(), r.end()); + } + + void join(const TBBReducer& tbbReducer) + { + reducer_.join(tbbReducer.reducer_); + } + +private: + ReducerWrapper* pSplitReducer_; + ReducerWrapper& reducer_; +}; + +class TBBParallelReduceExecutor +{ +public: + + TBBParallelReduceExecutor(ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBReducer tbbReducer(reducer_); + tbb::parallel_reduce( + tbb::blocked_range(begin_, end_, grainSize_), + tbbReducer + ); + } + +private: + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +class TBBArenaParallelReduceExecutor +{ +public: + + TBBArenaParallelReduceExecutor(tbb::task_group& group, + ReducerWrapper& reducer, + std::size_t begin, + std::size_t end, + std::size_t grainSize) + : group_(group), + reducer_(reducer), + begin_(begin), + end_(end), + grainSize_(grainSize) + { + } + + void operator()() const + { + TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_); + group_.run_and_wait(executor); + } + +private: + + tbb::task_group& group_; + ReducerWrapper& reducer_; + std::size_t begin_; + std::size_t end_; + std::size_t grainSize_; +}; + +void tbbParallelReduceImpl(std::size_t begin, + std::size_t end, + ReducerWrapper& reducer, + std::size_t grainSize, + int numThreads) +{ + ThreadStackSizeControl control; + + tbb::task_group group; + TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize); + + tbb::task_arena arena(numThreads); + arena.execute(executor); +} + +} // end namespace RcppParallel + +#endif /* RCPP_PARALLEL_USE_TBB */