diff --git a/Cargo.toml b/Cargo.toml index 3daa21ff8..3a3bb097e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,19 +39,24 @@ optional = true blas-sys = { version = "0.6.5", optional = true, default-features = false } matrixmultiply = { version = "0.1.13" } +rayon = { version = "0.6.0", optional = true } + [dependencies.serde] version = "0.8.20" optional = true +[dev-dependencies] +num_cpus = "1.2" + [features] blas = ["blas-sys"] # These features are used for testing blas-openblas-sys = ["blas"] -test = ["blas-openblas-sys"] +test = ["blas-openblas-sys", "rayon"] # This feature is used for docs -docs = ["rustc-serialize", "serde"] +docs = ["rustc-serialize", "serde", "rayon"] [profile.release] [profile.bench] diff --git a/README.rst b/README.rst index a771f307f..ff018ea78 100644 --- a/README.rst +++ b/README.rst @@ -69,6 +69,11 @@ your `Cargo.toml`. Uses ``blas-sys`` for pluggable backend, which needs to be configured separately. +- ``rayon`` + + - Optional, compatible with Rust stable + - Implement rayon 0.6 parallelization. + How to use with cargo:: [dependencies] diff --git a/benches/rayon.rs b/benches/rayon.rs new file mode 100644 index 000000000..f51435df2 --- /dev/null +++ b/benches/rayon.rs @@ -0,0 +1,114 @@ + +#![feature(test)] + +extern crate num_cpus; +extern crate test; +use test::Bencher; + +#[macro_use(s)] +extern crate ndarray; +use ndarray::prelude::*; + +extern crate rayon; +use rayon::prelude::*; + +const EXP_N: usize = 128; + +use std::cmp::max; + +fn set_threads() { + let n = max(1, num_cpus::get() / 2); + let cfg = rayon::Configuration::new().set_num_threads(n); + let _ = rayon::initialize(cfg); +} + +#[bench] +fn map_exp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.mapv_inplace(|x| x.exp()); + }); +} + +#[bench] +fn rayon_exp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((EXP_N, EXP_N)); + a.swap_axes(0, 1); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = x.exp()); + }); +} + +const FASTEXP: usize = 800; + +#[inline] +fn fastexp(x: f64) -> f64 { + let x = 1. + x/1024.; + x.powi(1024) +} + +#[bench] +fn map_fastexp_regular(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_regular(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_cut(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.mapv_inplace(|x| fastexp(x)) + }); +} + +#[bench] +fn rayon_fastexp_cut(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + let mut a = a.slice_mut(s![.., ..-1]); + bench.iter(|| { + a.view_mut().into_par_iter().for_each(|x| *x = fastexp(*x)); + }); +} + +#[bench] +fn map_fastexp_by_axis(bench: &mut Bencher) +{ + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + for mut sheet in a.axis_iter_mut(Axis(0)) { + sheet.mapv_inplace(fastexp) + } + }); +} + +#[bench] +fn rayon_fastexp_by_axis(bench: &mut Bencher) +{ + set_threads(); + let mut a = Array2::::zeros((FASTEXP, FASTEXP)); + bench.iter(|| { + a.axis_iter_mut(Axis(0)).into_par_iter() + .for_each(|mut sheet| sheet.mapv_inplace(fastexp)); + }); +} diff --git a/src/iterators/mod.rs b/src/iterators/mod.rs index e379fb96f..4f91c4b61 100644 --- a/src/iterators/mod.rs +++ b/src/iterators/mod.rs @@ -21,6 +21,9 @@ use super::{ Axis, }; +#[cfg(feature = "rayon")] +pub mod par; + /// Base for array iterators /// /// Iterator element type is `&'a A`. diff --git a/src/iterators/par.rs b/src/iterators/par.rs new file mode 100644 index 000000000..554577210 --- /dev/null +++ b/src/iterators/par.rs @@ -0,0 +1,225 @@ + + +use rayon::par_iter::ParallelIterator; +use rayon::par_iter::IntoParallelIterator; +use rayon::par_iter::IndexedParallelIterator; +use rayon::par_iter::ExactParallelIterator; +use rayon::par_iter::BoundedParallelIterator; +use rayon::par_iter::internal::{Consumer, UnindexedConsumer}; +use rayon::par_iter::internal::bridge; +use rayon::par_iter::internal::bridge_unindexed; +use rayon::par_iter::internal::ProducerCallback; +use rayon::par_iter::internal::Producer; +use rayon::par_iter::internal::UnindexedProducer; +#[cfg(rayon_fold_with)] +use rayon::par_iter::internal::Folder; + +use super::AxisIter; +use super::AxisIterMut; +use imp_prelude::*; + +/// Wrapper type for parallelized implementations. +/// +/// **Requires crate feature `"rayon"`** +#[derive(Copy, Clone, Debug)] +pub struct Parallel { + iter: I, +} + +macro_rules! par_iter_wrapper { + // thread_bounds are either Sync or Send + Sync + ($iter_name:ident, [$($thread_bounds:tt)*]) => { + /// This iterator can be turned into a parallel iterator (rayon crate). + /// + /// **Requires crate feature `"rayon"`** + impl<'a, A, D> IntoParallelIterator for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + impl<'a, A, D> ParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$iter_name<'a, A, D> as Iterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge(self, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> IndexedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn with_producer(self, callback: Cb) -> Cb::Output + where Cb: ProducerCallback + { + callback.callback(self.iter) + } + } + + impl<'a, A, D> ExactParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn len(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + } + + impl<'a, A, D> BoundedParallelIterator for Parallel<$iter_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + fn upper_bound(&mut self) -> usize { + ExactSizeIterator::len(&self.iter) + } + + fn drive(self, consumer: C) -> C::Result + where C: Consumer + { + bridge(self, consumer) + } + } + + // This is the real magic, I guess + + impl<'a, A, D> Producer for $iter_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn cost(&mut self, len: usize) -> f64 { + // FIXME: No idea about what this is + len as f64 + } + + fn split_at(self, i: usize) -> (Self, Self) { + self.split_at(i) + } + } + } +} + +par_iter_wrapper!(AxisIter, [Sync]); +par_iter_wrapper!(AxisIterMut, [Send + Sync]); + +impl<'a, A, D> IntoParallelIterator for &'a Array + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a RcArray + where D: Dimension, + A: Sync +{ + type Item = &'a A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut Array + where D: Dimension, + A: Sync + Send +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +impl<'a, A, D> IntoParallelIterator for &'a mut RcArray + where D: Dimension, + A: Sync + Send + Clone, +{ + type Item = &'a mut A; + type Iter = Parallel>; + fn into_par_iter(self) -> Self::Iter { + self.view_mut().into_par_iter() + } +} + +macro_rules! par_iter_view_wrapper { + // thread_bounds are either Sync or Send + Sync + ($view_name:ident, [$($thread_bounds:tt)*]) => { + impl<'a, A, D> IntoParallelIterator for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = ::Item; + type Iter = Parallel; + fn into_par_iter(self) -> Self::Iter { + Parallel { + iter: self, + } + } + } + + + impl<'a, A, D> ParallelIterator for Parallel<$view_name<'a, A, D>> + where D: Dimension, + A: $($thread_bounds)*, + { + type Item = <$view_name<'a, A, D> as IntoIterator>::Item; + fn drive_unindexed(self, consumer: C) -> C::Result + where C: UnindexedConsumer + { + bridge_unindexed(self.iter, consumer) + } + + fn opt_len(&mut self) -> Option { + Some(self.iter.len()) + } + } + + impl<'a, A, D> UnindexedProducer for $view_name<'a, A, D> + where D: Dimension, + A: $($thread_bounds)*, + { + fn can_split(&self) -> bool { + self.len() > 1 + } + + fn split(self) -> (Self, Self) { + let max_axis = self.max_stride_axis(); + let mid = self.len_of(max_axis) / 2; + let (a, b) = self.split_at(max_axis, mid); + //println!("Split along axis {:?} at {}\nshapes {:?}, {:?}", max_axis, mid, a.shape(), b.shape()); + (a, b) + } + + #[cfg(rayon_fold_with)] + fn fold_with(self, folder: F) -> F + where F: Folder, + { + self.into_iter().fold(folder, move |f, elt| f.consume(elt)) + } + } + + } +} + +par_iter_view_wrapper!(ArrayView, [Sync]); +par_iter_view_wrapper!(ArrayViewMut, [Sync + Send]); diff --git a/src/lib.rs b/src/lib.rs index 5e049b13e..1ac9423a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -62,6 +62,9 @@ //! - Enable transparent BLAS support for matrix multiplication. //! Uses ``blas-sys`` for pluggable backend, which needs to be configured //! separately. +//! - `rayon` +//! - Optional. +//! - Implement rayon 0.6 parallelization. //! #[cfg(feature = "serde")] @@ -77,6 +80,8 @@ extern crate matrixmultiply; extern crate itertools; extern crate num_traits as libnum; extern crate num_complex; +#[cfg(feature = "rayon")] +extern crate rayon; use std::iter::Zip; use std::marker::PhantomData; @@ -124,6 +129,8 @@ mod array_serde; mod array_serialize; mod arrayformat; mod data_traits; +#[cfg(feature = "rayon")] +pub mod parallel; pub use aliases::*; diff --git a/src/parallel/mod.rs b/src/parallel/mod.rs new file mode 100644 index 000000000..417900cc1 --- /dev/null +++ b/src/parallel/mod.rs @@ -0,0 +1,43 @@ + +//! Parallelization features for ndarray. +//! +//! **Requires crate feature `"rayon"`** +//! +//! The array views and references to owned arrays all implement +//! `IntoParallelIterator`; the default parallel iterators (each element by +//! reference or mutable reference) have no ordering guarantee in their parallel +//! implementations. +//! +//! `.axis_iter()` and `.axis_iter_mut()` also have parallel counterparts. +//! +//! # Examples +//! +//! Compute the exponential of each element in an array, parallelized. +//! +//! ``` +//! use ndarray::Array2; +//! use ndarray::parallel::rayon_prelude::*; +//! +//! let mut a = Array2::::zeros((128, 128)); +//! a.par_iter_mut().for_each(|x| *x = x.exp()); +//! ``` +//! +//! Use the parallel `.axis_iter()` to compute the sum of each row. +//! +//! ``` +//! use ndarray::Array; +//! use ndarray::Axis; +//! use ndarray::parallel::rayon_prelude::*; +//! +//! let a = Array::linspace(0., 63., 64).into_shape((4, 16)).unwrap(); +//! let mut sums = Vec::new(); +//! a.axis_iter(Axis(0)) +//! .into_par_iter() +//! .map(|row| row.scalar_sum()) +//! .collect_into(&mut sums); +//! +//! assert_eq!(sums, [120., 376., 632., 888.]); +//! ``` + +pub use rayon::prelude as rayon_prelude; +pub use iterators::par::Parallel; diff --git a/tests/rayon.rs b/tests/rayon.rs new file mode 100644 index 000000000..8ff19ae84 --- /dev/null +++ b/tests/rayon.rs @@ -0,0 +1,43 @@ +#![cfg(feature = "rayon")] + +extern crate rayon; +#[macro_use(s)] extern crate ndarray; + +use ndarray::prelude::*; + +use rayon::prelude::*; + +const M: usize = 1024 * 10; +const N: usize = 100; + +#[test] +fn test_axis_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + assert_eq!(a.axis_iter(Axis(0)).len(), M); + let s = a.axis_iter(Axis(0)).into_par_iter().map(|x| x.scalar_sum()).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +} + +#[test] +fn test_axis_iter_mut() { + let mut a = Array::linspace(0., 1.0f64, M * N).into_shape((M, N)).unwrap(); + let b = a.mapv(|x| x.exp()); + a.axis_iter_mut(Axis(0)).into_par_iter().for_each(|mut v| v.mapv_inplace(|x| x.exp())); + println!("{:?}", a.slice(s![..10, ..5])); + assert!(a.all_close(&b, 0.001)); +} + +#[test] +fn test_regular_iter() { + let mut a = Array2::::zeros((M, N)); + for (i, mut v) in a.axis_iter_mut(Axis(0)).enumerate() { + v.fill(i as _); + } + let s = a.par_iter().map(|&x| x).sum(); + println!("{:?}", a.slice(s![..10, ..5])); + assert_eq!(s, a.scalar_sum()); +}