diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 14230aa2b..3bf3ebc69 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -23,6 +23,10 @@ lazy_static = "1" [dependencies.crossbeam-deque] version = "0.2.0" +# Also held back for rustc compatibility +[dependencies.crossbeam] +version = "0.3.0" + [dev-dependencies] rand = "0.5" diff --git a/rayon-core/src/job.rs b/rayon-core/src/job.rs index 8ddc3371c..bcda77398 100644 --- a/rayon-core/src/job.rs +++ b/rayon-core/src/job.rs @@ -1,3 +1,4 @@ +use crossbeam::sync::SegQueue; use latch::Latch; use std::any::Any; use std::cell::UnsafeCell; @@ -179,3 +180,35 @@ impl JobResult { } } } + +/// Indirect queue to provide FIFO job priority. +pub struct JobFifo { + inner: SegQueue, +} + +impl JobFifo { + pub fn new() -> Self { + JobFifo { + inner: SegQueue::new(), + } + } + + pub unsafe fn push(&self, job_ref: JobRef) -> JobRef { + // A little indirection ensures that spawns are always prioritized in FIFO order. The + // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front + // (FIFO), but either way they will end up popping from the front of this queue. + self.inner.push(job_ref); + JobRef::new(self) + } +} + +impl Job for JobFifo { + unsafe fn execute(this: *const Self) { + // We "execute" a queue by executing its first job, FIFO. + (*this) + .inner + .try_pop() + .expect("job in fifo queue") + .execute() + } +} diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index 3c713080d..f845cae14 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -31,6 +31,7 @@ use std::io; use std::marker::PhantomData; use std::str::FromStr; +extern crate crossbeam; extern crate crossbeam_deque; #[macro_use] extern crate lazy_static; @@ -61,7 +62,8 @@ mod test; pub mod internal; pub use join::{join, join_context}; pub use scope::{scope, Scope}; -pub use spawn::spawn; +pub use scope::{scope_fifo, ScopeFifo}; +pub use spawn::{spawn, spawn_fifo}; pub use thread_pool::current_thread_has_pending_tasks; pub use thread_pool::current_thread_index; pub use thread_pool::ThreadPool; @@ -314,16 +316,17 @@ impl ThreadPoolBuilder { self } - /// Suggest to worker threads that they execute spawned jobs in a - /// "breadth-first" fashion. Typically, when a worker thread is - /// idle or blocked, it will attempt to execute the job from the - /// *top* of its local deque of work (i.e., the job most recently - /// spawned). If this flag is set to true, however, workers will - /// prefer to execute in a *breadth-first* fashion -- that is, - /// they will search for jobs at the *bottom* of their local - /// deque. (At present, workers *always* steal from the bottom of - /// other worker's deques, regardless of the setting of this - /// flag.) + /// **(DEPRECATED)** Suggest to worker threads that they execute + /// spawned jobs in a "breadth-first" fashion. + /// + /// Typically, when a worker thread is idle or blocked, it will + /// attempt to execute the job from the *top* of its local deque of + /// work (i.e., the job most recently spawned). If this flag is set + /// to true, however, workers will prefer to execute in a + /// *breadth-first* fashion -- that is, they will search for jobs at + /// the *bottom* of their local deque. (At present, workers *always* + /// steal from the bottom of other worker's deques, regardless of + /// the setting of this flag.) /// /// If you think of the tasks as a tree, where a parent task /// spawns its children in the tree, then this flag loosely @@ -334,6 +337,14 @@ impl ThreadPoolBuilder { /// execution is highly dynamic and the precise order in which /// independent tasks are executed is not intended to be /// guaranteed. + /// + /// This `breadth_first()` method is now deprecated per [RFC #1], + /// and in the future its effect may be removed. Consider using + /// [`scope_fifo()`] for a similar effect. + /// + /// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md + /// [`scope_fifo()`]: fn.scope_fifo.html + #[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")] pub fn breadth_first(mut self) -> Self { self.breadth_first = true; self diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index a233768fb..9eed2c3e2 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,9 +1,10 @@ +use crossbeam::sync::SegQueue; use crossbeam_deque::{Deque, Steal, Stealer}; #[cfg(rayon_unstable)] use internal::task::Task; #[cfg(rayon_unstable)] use job::Job; -use job::{JobRef, StackJob}; +use job::{JobFifo, JobRef, StackJob}; use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch}; use log::Event::*; use sleep::Sleep; @@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; use std::mem; use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -use std::sync::{Arc, Mutex, Once, ONCE_INIT}; +use std::sync::{Arc, Once, ONCE_INIT}; use std::thread; use std::usize; use unwind; @@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T pub struct Registry { thread_infos: Vec, - state: Mutex, sleep: Sleep, - job_uninjector: Stealer, + injected_jobs: SegQueue, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, @@ -45,10 +45,6 @@ pub struct Registry { terminate_latch: CountLatch, } -struct RegistryState { - job_injector: Deque, -} - /// //////////////////////////////////////////////////////////////////////// /// Initialization @@ -104,16 +100,13 @@ impl Registry { let n_threads = builder.get_num_threads(); let breadth_first = builder.get_breadth_first(); - let inj_worker = Deque::new(); - let inj_stealer = inj_worker.stealer(); let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect(); let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect(); let registry = Arc::new(Registry { thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(), - state: Mutex::new(RegistryState::new(inj_worker)), sleep: Sleep::new(), - job_uninjector: inj_stealer, + injected_jobs: SegQueue::new(), terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), @@ -175,6 +168,18 @@ impl Registry { } } + /// Returns the current `WorkerThread` if it's part of this `Registry`. + pub fn current_thread(&self) -> Option<&WorkerThread> { + unsafe { + if let Some(worker) = WorkerThread::current().as_ref() { + if worker.registry().id() == self.id() { + return Some(worker); + } + } + None + } + } + /// Returns an opaque identifier for this registry. pub fn id(&self) -> RegistryId { // We can rely on `self` not to change since we only ever create @@ -297,39 +302,31 @@ impl Registry { log!(InjectJobs { count: injected_jobs.len() }); - { - let state = self.state.lock().unwrap(); - - // It should not be possible for `state.terminate` to be true - // here. It is only set to true when the user creates (and - // drops) a `ThreadPool`; and, in that case, they cannot be - // calling `inject()` later, since they dropped their - // `ThreadPool`. - assert!( - !self.terminate_latch.probe(), - "inject() sees state.terminate as true" - ); - - for &job_ref in injected_jobs { - state.job_injector.push(job_ref); - } + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject()` later, since they dropped their + // `ThreadPool`. + assert!( + !self.terminate_latch.probe(), + "inject() sees state.terminate as true" + ); + + for &job_ref in injected_jobs { + self.injected_jobs.push(job_ref); } self.sleep.tickle(usize::MAX); } fn pop_injected_job(&self, worker_index: usize) -> Option { - loop { - match self.job_uninjector.steal() { - Steal::Empty => return None, - Steal::Data(d) => { - log!(UninjectedWork { - worker: worker_index - }); - return Some(d); - } - Steal::Retry => {} - } + let job = self.injected_jobs.try_pop(); + if job.is_some() { + log!(UninjectedWork { + worker: worker_index + }); } + job } /// If already in a worker-thread of this registry, just execute `op`. @@ -439,14 +436,6 @@ pub struct RegistryId { addr: usize, } -impl RegistryState { - pub fn new(job_injector: Deque) -> RegistryState { - RegistryState { - job_injector: job_injector, - } - } -} - struct ThreadInfo { /// Latch set once thread has started and we are entering into the /// main loop. Used to wait for worker threads to become primed, @@ -478,6 +467,9 @@ pub struct WorkerThread { /// the "worker" half of our local deque worker: Deque, + /// local queue used for `spawn_fifo` indirection + fifo: JobFifo, + index: usize, /// are these workers configured to steal breadth-first or not? @@ -534,6 +526,11 @@ impl WorkerThread { self.registry.sleep.tickle(self.index); } + #[inline] + pub unsafe fn push_fifo(&self, job: JobRef) { + self.push(self.fifo.push(job)); + } + #[inline] pub fn local_deque_is_empty(&self) -> bool { self.worker.len() == 0 @@ -663,6 +660,7 @@ unsafe fn main_loop( ) { let worker_thread = WorkerThread { worker: worker, + fifo: JobFifo::new(), breadth_first: breadth_first, index: index, rng: XorShift64Star::new(), diff --git a/rayon-core/src/scope/internal.rs b/rayon-core/src/scope/internal.rs index e9ff6a447..30e07920a 100644 --- a/rayon-core/src/scope/internal.rs +++ b/rayon-core/src/scope/internal.rs @@ -1,6 +1,6 @@ #![cfg(rayon_unstable)] -use super::Scope; +use super::{Scope, ScopeBase}; use internal::task::{ScopeHandle, Task, ToScopeHandle}; use std::any::Any; use std::mem; @@ -16,7 +16,7 @@ impl<'scope> ToScopeHandle<'scope> for Scope<'scope> { #[derive(Debug)] pub struct LocalScopeHandle<'scope> { - scope: *const Scope<'scope>, + scope: *const ScopeBase<'scope>, } impl<'scope> LocalScopeHandle<'scope> { @@ -24,8 +24,8 @@ impl<'scope> LocalScopeHandle<'scope> { /// until the scope completes. Since we acquire a ref, /// that means it will remain valid until we release it. unsafe fn new(scope: &Scope<'scope>) -> Self { - scope.job_completed_latch.increment(); - LocalScopeHandle { scope: scope } + scope.base.increment(); + LocalScopeHandle { scope: &scope.base } } } diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 47ced9f91..7cf25177d 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -4,7 +4,7 @@ //! [`scope()`]: fn.scope.html //! [`join()`]: ../join/join.fn.html -use job::HeapJob; +use job::{HeapJob, JobFifo}; use latch::{CountLatch, Latch}; use log::Event::*; use registry::{in_worker, Registry, WorkerThread}; @@ -21,10 +21,25 @@ mod internal; #[cfg(test)] mod test; -///Represents a fork-join scope which can be used to spawn any number of tasks. See [`scope()`] for more information. +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// See [`scope()`] for more information. /// ///[`scope()`]: fn.scope.html pub struct Scope<'scope> { + base: ScopeBase<'scope>, +} + +/// Represents a fork-join scope which can be used to spawn any number of tasks. +/// Those spawned from the same thread are prioritized in relative FIFO order. +/// See [`scope_fifo()`] for more information. +/// +///[`scope_fifo()`]: fn.scope_fifo.html +pub struct ScopeFifo<'scope> { + base: ScopeBase<'scope>, + fifos: Vec, +} + +struct ScopeBase<'scope> { /// thread where `scope()` was executed (note that individual jobs /// may be executing on different worker threads, though they /// should always be within the same pool of threads) @@ -120,7 +135,7 @@ pub struct Scope<'scope> { /// }); /// }); /// }); -/// s.spawn(|s| { // task 2 +/// s.spawn(|s| { // task s.2 /// }); /// // point mid /// }); @@ -133,17 +148,18 @@ pub struct Scope<'scope> { /// | (start) /// | /// | (scope `s` created) -/// +--------------------+ (task s.1) -/// +-------+ (task s.2) | -/// | | +---+ (task s.1.1) -/// | | | | -/// | | | | (scope `t` created) -/// | | | +----------------+ (task t.1) -/// | | | +---+ (task t.2) | -/// | (mid) | | | | | -/// : | | + <-+------------+ (scope `t` ends) -/// : | | | -/// |<------+------------+---+ (scope `s` ends) +/// +-----------------------------------------------+ (task s.2) +/// +-------+ (task s.1) | +/// | | | +/// | +---+ (task s.1.1) | +/// | | | | +/// | | | (scope `t` created) | +/// | | +----------------+ (task t.2) | +/// | | +---+ (task t.1) | | +/// | (mid) | | | | | +/// : | + <-+------------+ (scope `t` ends) | +/// : | | | +/// |<------+---+-----------------------------------+ (scope `s` ends) /// | /// | (end) /// ``` @@ -156,6 +172,19 @@ pub struct Scope<'scope> { /// will be joined before that scope returns, which in turn occurs /// before the creating task (task `s.1.1` in this case) finishes. /// +/// There is no guaranteed order of execution for spawns in a scope, +/// given that other threads may steal tasks at any time. However, they +/// are generally prioritized in a LIFO order on the thread from which +/// they were spawned. So in this example, absent any stealing, we can +/// expect `s.2` to execute before `s.1`, and `t.2` before `t.1`. Other +/// threads always steal from the other end of the deque, like FIFO +/// order. The idea is that "recent" tasks are most likely to be fresh +/// in the local CPU's cache, while other threads can steal older +/// "stale" tasks. For an alternate approach, consider +/// [`scope_fifo()`] instead. +/// +/// [`scope_fifo()`]: fn.scope_fifo.html +/// /// # Accessing stack data /// /// In general, spawned tasks may access stack data in place that @@ -262,22 +291,109 @@ where R: Send, { in_worker(|owner_thread, _| { - unsafe { - let scope: Scope<'scope> = Scope { - owner_thread_index: owner_thread.index(), - registry: owner_thread.registry().clone(), - panic: AtomicPtr::new(ptr::null_mut()), - job_completed_latch: CountLatch::new(), - marker: PhantomData, - }; - let result = scope.execute_job_closure(op); - scope.steal_till_jobs_complete(owner_thread); - result.unwrap() // only None if `op` panicked, and that would have been propagated - } + let scope = Scope::<'scope>::new(owner_thread); + unsafe { scope.base.complete(owner_thread, || op(&scope)) } + }) +} + +/// Create a "fork-join" scope `s` with FIFO order, and invokes the +/// closure with a reference to `s`. This closure can then spawn +/// asynchronous tasks into `s`. Those tasks may run asynchronously with +/// respect to the closure; they may themselves spawn additional tasks +/// into `s`. When the closure returns, it will block until all tasks +/// that have been spawned into `s` complete. +/// +/// # Task execution +/// +/// Tasks in a `scope_fifo()` run similarly to [`scope()`], but there's a +/// difference in the order of execution. Consider a similar example: +/// +/// [`scope()`]: fn.scope.html +/// +/// ```rust +/// # use rayon_core as rayon; +/// // point start +/// rayon::scope_fifo(|s| { +/// s.spawn_fifo(|s| { // task s.1 +/// s.spawn_fifo(|s| { // task s.1.1 +/// rayon::scope_fifo(|t| { +/// t.spawn_fifo(|_| ()); // task t.1 +/// t.spawn_fifo(|_| ()); // task t.2 +/// }); +/// }); +/// }); +/// s.spawn_fifo(|s| { // task s.2 +/// }); +/// // point mid +/// }); +/// // point end +/// ``` +/// +/// The various tasks that are run will execute roughly like so: +/// +/// ```notrust +/// | (start) +/// | +/// | (FIFO scope `s` created) +/// +--------------------+ (task s.1) +/// +-------+ (task s.2) | +/// | | +---+ (task s.1.1) +/// | | | | +/// | | | | (FIFO scope `t` created) +/// | | | +----------------+ (task t.1) +/// | | | +---+ (task t.2) | +/// | (mid) | | | | | +/// : | | + <-+------------+ (scope `t` ends) +/// : | | | +/// |<------+------------+---+ (scope `s` ends) +/// | +/// | (end) +/// ``` +/// +/// Under `scope_fifo()`, the spawns are prioritized in a FIFO order on +/// the thread from which they were spawned, as opposed to `scope()`'s +/// LIFO. So in this example, we can expect `s.1` to execute before +/// `s.2`, and `t.1` before `t.2`. Other threads also steal tasks in +/// FIFO order, as usual. Overall, this has roughly the same order as +/// the now-deprecated [`breadth_first`] option, except the effect is +/// isolated to a particular scope. If spawns are intermingled from any +/// combination of `scope()` and `scope_fifo()`, or from different +/// threads, their order is only specified with respect to spawns in the +/// same scope and thread. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panics +/// +/// If a panic occurs, either in the closure given to `scope_fifo()` or +/// in any of the spawned jobs, that panic will be propagated and the +/// call to `scope_fifo()` will panic. If multiple panics occurs, it is +/// non-deterministic which of their panic values will propagate. +/// Regardless, once a task is spawned using `scope.spawn_fifo()`, it +/// will execute, even if the spawning task should later panic. +/// `scope_fifo()` returns once all spawned jobs have completed, and any +/// panics are propagated at that point. +pub fn scope_fifo<'scope, OP, R>(op: OP) -> R +where + OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, + R: Send, +{ + in_worker(|owner_thread, _| { + let scope = ScopeFifo::<'scope>::new(owner_thread); + unsafe { scope.base.complete(owner_thread, || op(&scope)) } }) } impl<'scope> Scope<'scope> { + fn new(owner_thread: &WorkerThread) -> Self { + Scope { + base: ScopeBase::new(owner_thread), + } + } + /// Spawns a job into the fork-join scope `self`. This job will /// execute sometime before the fork-join scope completes. The /// job is specified as a closure, and this closure receives its @@ -334,16 +450,95 @@ impl<'scope> Scope<'scope> { where BODY: FnOnce(&Scope<'scope>) + Send + 'scope, { + self.base.increment(); unsafe { - self.job_completed_latch.increment(); - let job_ref = Box::new(HeapJob::new(move || self.execute_job(body))).as_job_ref(); + let job_ref = Box::new(HeapJob::new(move || { + self.base.execute_job(move || body(self)) + })) + .as_job_ref(); - // Since `Scope` implements `Sync`, we can't be sure - // that we're still in a thread of this pool, so we - // can't just push to the local worker thread. - self.registry.inject_or_push(job_ref); + // Since `Scope` implements `Sync`, we can't be sure that we're still in a + // thread of this pool, so we can't just push to the local worker thread. + self.base.registry.inject_or_push(job_ref); } } +} + +impl<'scope> ScopeFifo<'scope> { + fn new(owner_thread: &WorkerThread) -> Self { + let num_threads = owner_thread.registry().num_threads(); + ScopeFifo { + base: ScopeBase::new(owner_thread), + fifos: (0..num_threads).map(|_| JobFifo::new()).collect(), + } + } + + /// Spawns a job into the fork-join scope `self`. This job will + /// execute sometime before the fork-join scope completes. The + /// job is specified as a closure, and this closure receives its + /// own reference to the scope `self` as argument. This can be + /// used to inject new jobs into `self`. + /// + /// # See also + /// + /// This method is akin to [`Scope::spawn()`], but with a FIFO + /// priority. The [`scope_fifo` function] has more details about + /// this distinction. + /// + /// [`Scope::spawn()`]: struct.Scope.html#method.spawn + /// [`scope_fifo` function]: fn.scope.html + pub fn spawn_fifo(&self, body: BODY) + where + BODY: FnOnce(&ScopeFifo<'scope>) + Send + 'scope, + { + self.base.increment(); + unsafe { + let job_ref = Box::new(HeapJob::new(move || { + self.base.execute_job(move || body(self)) + })) + .as_job_ref(); + + // If we're in the pool, use our scope's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match self.base.registry.current_thread() { + Some(worker) => { + let fifo = &self.fifos[worker.index()]; + worker.push(fifo.push(job_ref)); + } + None => self.base.registry.inject(&[job_ref]), + } + } + } +} + +impl<'scope> ScopeBase<'scope> { + /// Create the base of a new scope for the given worker thread + fn new(owner_thread: &WorkerThread) -> Self { + ScopeBase { + owner_thread_index: owner_thread.index(), + registry: owner_thread.registry().clone(), + panic: AtomicPtr::new(ptr::null_mut()), + job_completed_latch: CountLatch::new(), + marker: PhantomData, + } + } + + fn increment(&self) { + self.job_completed_latch.increment(); + } + + /// Executes `func` as a job, either aborting or executing as + /// appropriate. + /// + /// Unsafe because it must be executed on a worker thread. + unsafe fn complete(&self, owner_thread: &WorkerThread, func: FUNC) -> R + where + FUNC: FnOnce() -> R, + { + let result = self.execute_job_closure(func); + self.steal_till_jobs_complete(owner_thread); + result.unwrap() // only None if `op` panicked, and that would have been propagated + } /// Executes `func` as a job, either aborting or executing as /// appropriate. @@ -351,7 +546,7 @@ impl<'scope> Scope<'scope> { /// Unsafe because it must be executed on a worker thread. unsafe fn execute_job(&self, func: FUNC) where - FUNC: FnOnce(&Scope<'scope>) + 'scope, + FUNC: FnOnce(), { let _: Option<()> = self.execute_job_closure(func); } @@ -363,9 +558,9 @@ impl<'scope> Scope<'scope> { /// Unsafe because this must be executed on a worker thread. unsafe fn execute_job_closure(&self, func: FUNC) -> Option where - FUNC: FnOnce(&Scope<'scope>) -> R + 'scope, + FUNC: FnOnce() -> R, { - match unwind::halt_unwinding(move || func(self)) { + match unwind::halt_unwinding(func) { Ok(r) => { self.job_completed_ok(); Some(r) @@ -431,10 +626,22 @@ impl<'scope> Scope<'scope> { impl<'scope> fmt::Debug for Scope<'scope> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("Scope") - .field("pool_id", &self.registry.id()) - .field("owner_thread_index", &self.owner_thread_index) - .field("panic", &self.panic) - .field("job_completed_latch", &self.job_completed_latch) + .field("pool_id", &self.base.registry.id()) + .field("owner_thread_index", &self.base.owner_thread_index) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) + .finish() + } +} + +impl<'scope> fmt::Debug for ScopeFifo<'scope> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("ScopeFifo") + .field("num_fifos", &self.fifos.len()) + .field("pool_id", &self.base.registry.id()) + .field("owner_thread_index", &self.base.owner_thread_index) + .field("panic", &self.base.panic) + .field("job_completed_latch", &self.base.job_completed_latch) .finish() } } diff --git a/rayon-core/src/scope/test.rs b/rayon-core/src/scope/test.rs index 5f5446810..4ccce0866 100644 --- a/rayon-core/src/scope/test.rs +++ b/rayon-core/src/scope/test.rs @@ -6,7 +6,7 @@ use std::sync::Mutex; use std::vec; use unwind; use ThreadPoolBuilder; -use {scope, Scope}; +use {scope, scope_fifo, Scope}; #[test] fn scope_empty() { @@ -266,3 +266,169 @@ fn panic_propagate_still_execute_4() { Err(_) => assert!(x, "panic in spawn tainted scope"), } } + +macro_rules! test_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |scope| { + for j in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn lifo_order() { + // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_order!(scope => spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn fifo_order() { + // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_order!(scope_fifo => spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_nested_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$outer_spawn(move |_| { + $inner_scope(|scope| { + for j in 0..10 { + scope.$inner_spawn(move |_| { + vec.lock().unwrap().push(i * 10 + j); + }); + } + }); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn nested_lifo_order() { + // In the absense of stealing, `scope()` runs its `spawn()` jobs in LIFO order. + let vec = test_nested_order!(scope => spawn, scope => spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn nested_fifo_order() { + // In the absense of stealing, `scope_fifo()` runs its `spawn_fifo()` jobs in FIFO order. + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn nested_lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_nested_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected: Vec = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +fn nested_fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_nested_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected: Vec = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_push { + ($scope:ident . $spawn:ident, $vec:ident, $i:expr) => {{ + $scope.$spawn(move |_| $vec.lock().unwrap().push($i)); + }}; +} + +/// Test spawns pushing a series of numbers, interleaved +/// such that negative values are using an inner scope. +macro_rules! test_mixed_order { + ($outer_scope:ident => $outer_spawn:ident, + $inner_scope:ident => $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + $outer_scope(|outer_scope| { + let vec = &vec; + spawn_push!(outer_scope.$outer_spawn, vec, 0); + $inner_scope(|inner_scope| { + spawn_push!(inner_scope.$inner_spawn, vec, -1); + spawn_push!(outer_scope.$outer_spawn, vec, 1); + spawn_push!(inner_scope.$inner_spawn, vec, -2); + spawn_push!(outer_scope.$outer_spawn, vec, 2); + spawn_push!(inner_scope.$inner_spawn, vec, -3); + }); + spawn_push!(outer_scope.$outer_spawn, vec, 3); + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn mixed_lifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope => spawn); + let expected = vec![-3, 2, -2, 1, -1, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope_fifo => spawn_fifo); + let expected = vec![-1, 0, -2, 1, -3, 2, 3]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_lifo_fifo_order() { + // NB: the end of the inner scope makes us execute some of the outer scope + // before they've all been spawned, so they're not perfectly LIFO. + let vec = test_mixed_order!(scope => spawn, scope_fifo => spawn_fifo); + let expected = vec![-1, 2, -2, 1, -3, 3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(scope_fifo => spawn_fifo, scope => spawn); + let expected = vec![-3, 0, -2, 1, -1, 2, 3]; + assert_eq!(vec, expected); +} diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index 8dcd26693..c2194e3f7 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -24,6 +24,17 @@ use unwind; /// by a mutex, or some such thing). If you want to compute a result, /// consider `spawn_future()`. /// +/// There is no guaranteed order of execution for spawns, given that +/// other threads may steal tasks at any time. However, they are +/// generally prioritized in a LIFO order on the thread from which +/// they were spawned. Other threads always steal from the other end of +/// the deque, like FIFO order. The idea is that "recent" tasks are +/// most likely to be fresh in the local CPU's cache, while other +/// threads can steal older "stale" tasks. For an alternate approach, +/// consider [`spawn_fifo()`] instead. +/// +/// [`spawn_fifo()`]: fn.spawn_fifo.html +/// /// # Panic handling /// /// If this closure should panic, the resulting panic will be @@ -61,6 +72,21 @@ where /// /// Not a public API, but used elsewhere in Rayon. pub unsafe fn spawn_in(func: F, registry: &Arc) +where + F: FnOnce() + Send + 'static, +{ + // We assert that this does not hold any references (we know + // this because of the `'static` bound in the inferface); + // moreover, we assert that the code below is not supposed to + // be able to panic, and hence the data won't leak but will be + // enqueued into some deque for later execution. + let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic + let job_ref = spawn_job(func, registry); + registry.inject_or_push(job_ref); + mem::forget(abort_guard); +} + +unsafe fn spawn_job(func: F, registry: &Arc) -> JobRef where F: FnOnce() + Send + 'static, { @@ -68,7 +94,7 @@ where // executed. This ref is decremented at the (*) below. registry.increment_terminate_count(); - let async_job = Box::new(HeapJob::new({ + Box::new(HeapJob::new({ let registry = registry.clone(); move || { match unwind::halt_unwinding(func) { @@ -79,16 +105,67 @@ where } registry.terminate(); // (*) permit registry to terminate now } - })); + })) + .as_job_ref() +} +/// Fires off a task into the Rayon threadpool in the "static" or +/// "global" scope. Just like a standard thread, this task is not +/// tied to the current stack frame, and hence it cannot hold any +/// references other than those with `'static` lifetime. If you want +/// to spawn a task that references stack data, use [the `scope_fifo()` +/// function](fn.scope_fifo.html) to create a scope. +/// +/// The behavior is essentially the same as [the `spawn` +/// function](fn.spawn.html), except that calls from the same thread +/// will be prioritized in FIFO order. This is similar to the now- +/// deprecated [`breadth_first`] option, except the effect is isolated +/// to relative `spawn_fifo` calls, not all threadpool tasks. +/// +/// For more details on this design, see Rayon [RFC #1]. +/// +/// [`breadth_first`]: struct.ThreadPoolBuilder.html#method.breadth_first +/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md +/// +/// # Panic handling +/// +/// If this closure should panic, the resulting panic will be +/// propagated to the panic handler registered in the `ThreadPoolBuilder`, +/// if any. See [`ThreadPoolBuilder::panic_handler()`][ph] for more +/// details. +/// +/// [ph]: struct.ThreadPoolBuilder.html#method.panic_handler +pub fn spawn_fifo(func: F) +where + F: FnOnce() + Send + 'static, +{ + // We assert that current registry has not terminated. + unsafe { spawn_fifo_in(func, &Registry::current()) } +} + +/// Spawn an asynchronous FIFO job in `registry.` +/// +/// Unsafe because `registry` must not yet have terminated. +/// +/// Not a public API, but used elsewhere in Rayon. +pub unsafe fn spawn_fifo_in(func: F, registry: &Arc) +where + F: FnOnce() + Send + 'static, +{ // We assert that this does not hold any references (we know // this because of the `'static` bound in the inferface); // moreover, we assert that the code below is not supposed to // be able to panic, and hence the data won't leak but will be // enqueued into some deque for later execution. let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic - let job_ref = HeapJob::as_job_ref(async_job); - registry.inject_or_push(job_ref); + let job_ref = spawn_job(func, registry); + + // If we're in the pool, use our thread's private fifo for this thread to execute + // in a locally-FIFO order. Otherwise, just use the pool's global injector. + match registry.current_thread() { + Some(worker) => worker.push_fifo(job_ref), + None => registry.inject(&[job_ref]), + } mem::forget(abort_guard); } diff --git a/rayon-core/src/spawn/test.rs b/rayon-core/src/spawn/test.rs index a94d4e15f..59a9496b8 100644 --- a/rayon-core/src/spawn/test.rs +++ b/rayon-core/src/spawn/test.rs @@ -3,7 +3,7 @@ use std::any::Any; use std::sync::mpsc::channel; use std::sync::Mutex; -use super::spawn; +use super::{spawn, spawn_fifo}; use ThreadPoolBuilder; #[test] @@ -141,3 +141,103 @@ fn custom_panic_handler_and_nested_spawn() { } } } + +macro_rules! test_order { + ($outer_spawn:ident, $inner_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + $outer_spawn(move || { + for j in 0..10 { + let tx = tx.clone(); + $inner_spawn(move || { + tx.send(i * 10 + j).unwrap(); + }); + } + }); + } + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn lifo_order() { + // In the absense of stealing, `spawn()` jobs on a thread will run in LIFO order. + let vec = test_order!(spawn, spawn); + let expected: Vec = (0..100).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn fifo_order() { + // In the absense of stealing, `spawn_fifo()` jobs on a thread will run in FIFO order. + let vec = test_order!(spawn_fifo, spawn_fifo); + let expected: Vec = (0..100).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +#[test] +fn lifo_fifo_order() { + // LIFO on the outside, FIFO on the inside + let vec = test_order!(spawn, spawn_fifo); + let expected: Vec = (0..10) + .rev() + .flat_map(|i| (0..10).map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +#[test] +fn fifo_lifo_order() { + // FIFO on the outside, LIFO on the inside + let vec = test_order!(spawn_fifo, spawn); + let expected: Vec = (0..10) + .flat_map(|i| (0..10).rev().map(move |j| i * 10 + j)) + .collect(); + assert_eq!(vec, expected); +} + +macro_rules! spawn_send { + ($spawn:ident, $tx:ident, $i:expr) => {{ + let tx = $tx.clone(); + $spawn(move || tx.send($i).unwrap()); + }}; +} + +/// Test mixed spawns pushing a series of numbers, interleaved such +/// such that negative values are using the second kind of spawn. +macro_rules! test_mixed_order { + ($pos_spawn:ident, $neg_spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + spawn_send!($pos_spawn, tx, 0); + spawn_send!($neg_spawn, tx, -1); + spawn_send!($pos_spawn, tx, 1); + spawn_send!($neg_spawn, tx, -2); + spawn_send!($pos_spawn, tx, 2); + spawn_send!($neg_spawn, tx, -3); + spawn_send!($pos_spawn, tx, 3); + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn mixed_lifo_fifo_order() { + let vec = test_mixed_order!(spawn, spawn_fifo); + let expected = vec![3, -1, 2, -2, 1, -3, 0]; + assert_eq!(vec, expected); +} + +#[test] +fn mixed_fifo_lifo_order() { + let vec = test_mixed_order!(spawn_fifo, spawn); + let expected = vec![0, -3, 1, -2, 2, -1, 3]; + assert_eq!(vec, expected); +} diff --git a/rayon-core/src/thread_pool/mod.rs b/rayon-core/src/thread_pool/mod.rs index d13f4e5f9..0e90c84aa 100644 --- a/rayon-core/src/thread_pool/mod.rs +++ b/rayon-core/src/thread_pool/mod.rs @@ -12,6 +12,7 @@ use std::sync::Arc; #[allow(deprecated)] use Configuration; use {scope, Scope}; +use {scope_fifo, ScopeFifo}; use {ThreadPoolBuildError, ThreadPoolBuilder}; mod internal; @@ -238,6 +239,21 @@ impl ThreadPool { self.install(|| scope(op)) } + /// Creates a scope that executes within this thread-pool. + /// Spawns from the same thread are prioritized in relative FIFO order. + /// Equivalent to `self.install(|| scope_fifo(...))`. + /// + /// See also: [the `scope_fifo()` function][scope_fifo]. + /// + /// [scope_fifo]: fn.scope_fifo.html + pub fn scope_fifo<'scope, OP, R>(&self, op: OP) -> R + where + OP: for<'s> FnOnce(&'s ScopeFifo<'scope>) -> R + 'scope + Send, + R: Send, + { + self.install(|| scope_fifo(op)) + } + /// Spawns an asynchronous task in this thread-pool. This task will /// run in the implicit, global scope, which means that it may outlast /// the current stack frame -- therefore, it cannot capture any references @@ -253,6 +269,22 @@ impl ThreadPool { // We assert that `self.registry` has not terminated. unsafe { spawn::spawn_in(op, &self.registry) } } + + /// Spawns an asynchronous task in this thread-pool. This task will + /// run in the implicit, global scope, which means that it may outlast + /// the current stack frame -- therefore, it cannot capture any references + /// onto the stack (you will likely need a `move` closure). + /// + /// See also: [the `spawn_fifo()` function defined on scopes][spawn_fifo]. + /// + /// [spawn_fifo]: struct.ScopeFifo.html#method.spawn_fifo + pub fn spawn_fifo(&self, op: OP) + where + OP: FnOnce() + Send + 'static, + { + // We assert that `self.registry` has not terminated. + unsafe { spawn::spawn_fifo_in(op, &self.registry) } + } } impl Drop for ThreadPool { diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 071d1d50d..87f31def9 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -1,7 +1,8 @@ #![cfg(test)] use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::sync::mpsc::channel; +use std::sync::{Arc, Mutex}; use join; use thread_pool::ThreadPool; @@ -205,3 +206,67 @@ fn check_thread_pool_new() { let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap(); assert_eq!(pool.current_num_threads(), 22); } + +macro_rules! test_scope_order { + ($scope:ident => $spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = builder.build().unwrap(); + pool.install(|| { + let vec = Mutex::new(vec![]); + pool.$scope(|scope| { + let vec = &vec; + for i in 0..10 { + scope.$spawn(move |_| { + vec.lock().unwrap().push(i); + }); + } + }); + vec.into_inner().unwrap() + }) + }}; +} + +#[test] +fn scope_lifo_order() { + let vec = test_scope_order!(scope => spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn scope_fifo_order() { + let vec = test_scope_order!(scope_fifo => spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} + +macro_rules! test_spawn_order { + ($spawn:ident) => {{ + let builder = ThreadPoolBuilder::new().num_threads(1); + let pool = &builder.build().unwrap(); + let (tx, rx) = channel(); + pool.install(move || { + for i in 0..10 { + let tx = tx.clone(); + pool.$spawn(move || { + tx.send(i).unwrap(); + }); + } + }); + rx.iter().collect::>() + }}; +} + +#[test] +fn spawn_lifo_order() { + let vec = test_spawn_order!(spawn); + let expected: Vec = (0..10).rev().collect(); // LIFO -> reversed + assert_eq!(vec, expected); +} + +#[test] +fn spawn_fifo_order() { + let vec = test_spawn_order!(spawn_fifo); + let expected: Vec = (0..10).collect(); // FIFO -> natural order + assert_eq!(vec, expected); +} diff --git a/src/lib.rs b/src/lib.rs index 1644ff8ed..a489cfa25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -108,10 +108,11 @@ mod par_either; mod compile_fail; pub use rayon_core::current_num_threads; -pub use rayon_core::spawn; pub use rayon_core::FnContext; pub use rayon_core::ThreadPool; pub use rayon_core::ThreadPoolBuildError; pub use rayon_core::ThreadPoolBuilder; pub use rayon_core::{join, join_context}; pub use rayon_core::{scope, Scope}; +pub use rayon_core::{scope_fifo, ScopeFifo}; +pub use rayon_core::{spawn, spawn_fifo};