diff --git a/rayon-core/Cargo.toml b/rayon-core/Cargo.toml index 14230aa2b..3bf3ebc69 100644 --- a/rayon-core/Cargo.toml +++ b/rayon-core/Cargo.toml @@ -23,6 +23,10 @@ lazy_static = "1" [dependencies.crossbeam-deque] version = "0.2.0" +# Also held back for rustc compatibility +[dependencies.crossbeam] +version = "0.3.0" + [dev-dependencies] rand = "0.5" diff --git a/rayon-core/src/lib.rs b/rayon-core/src/lib.rs index b008786b9..934e9b6a6 100644 --- a/rayon-core/src/lib.rs +++ b/rayon-core/src/lib.rs @@ -31,6 +31,7 @@ use std::marker::PhantomData; use std::str::FromStr; use std::fmt; +extern crate crossbeam; extern crate crossbeam_deque; #[macro_use] extern crate lazy_static; @@ -140,11 +141,6 @@ pub struct ThreadPoolBuilder { /// Closure invoked on worker thread exit. exit_handler: Option>, - - /// If false, worker threads will execute spawned jobs in a - /// "depth-first" fashion. If true, they will do a "breadth-first" - /// fashion. Depth-first is the default. - breadth_first: bool, } /// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead. @@ -305,35 +301,32 @@ impl ThreadPoolBuilder { self } - /// Suggest to worker threads that they execute spawned jobs in a - /// "breadth-first" fashion. Typically, when a worker thread is - /// idle or blocked, it will attempt to execute the job from the - /// *top* of its local deque of work (i.e., the job most recently - /// spawned). If this flag is set to true, however, workers will - /// prefer to execute in a *breadth-first* fashion -- that is, - /// they will search for jobs at the *bottom* of their local - /// deque. (At present, workers *always* steal from the bottom of - /// other worker's deques, regardless of the setting of this - /// flag.) + /// **(DEPRECATED)** Suggest to worker threads that they execute + /// spawned jobs in a "breadth-first" fashion. /// - /// If you think of the tasks as a tree, where a parent task - /// spawns its children in the tree, then this flag loosely - /// corresponds to doing a breadth-first traversal of the tree, - /// whereas the default would be to do a depth-first traversal. + /// It used to be the case that threads would execute jobs from + /// their local deque in depth-first fashion by default, last-in + /// first-out (LIFO). This `breadth_first()` method changed to a + /// breadth-first strategy (FIFO) for the entire thread pool. + /// This can help some spawn workloads to prioritize items in the + /// same order they were queued, but this had a bad effect on + /// stack-oriented workloads like parallel iterators -- see + /// [rayon#590](https://github.com/rayon-rs/rayon/issues/590). /// - /// **Note that this is an "execution hint".** Rayon's task + /// **Note that this was just an "execution hint".** Rayon's task /// execution is highly dynamic and the precise order in which /// independent tasks are executed is not intended to be /// guaranteed. - pub fn breadth_first(mut self) -> Self { - self.breadth_first = true; + /// + /// This `breadth_first()` method is now deprecated and has no + /// effect. Instead, we now **always** prioritize `spawn` jobs in + /// breadth-first/FIFO order, while keeping all other jobs in + /// depth-first/LIFO order. + #[deprecated(note = "spawns are now always breadth-first")] + pub fn breadth_first(self) -> Self { self } - fn get_breadth_first(&self) -> bool { - self.breadth_first - } - /// Takes the current thread start callback, leaving `None`. fn take_start_handler(&mut self) -> Option> { self.start_handler.take() @@ -473,8 +466,7 @@ impl fmt::Debug for ThreadPoolBuilder { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { let ThreadPoolBuilder { ref num_threads, ref get_thread_name, ref panic_handler, ref stack_size, - ref start_handler, ref exit_handler, - ref breadth_first } = *self; + ref start_handler, ref exit_handler } = *self; // Just print `Some()` or `None` to the debug // output. @@ -496,7 +488,6 @@ impl fmt::Debug for ThreadPoolBuilder { .field("stack_size", &stack_size) .field("start_handler", &start_handler) .field("exit_handler", &exit_handler) - .field("breadth_first", &breadth_first) .finish() } } diff --git a/rayon-core/src/registry.rs b/rayon-core/src/registry.rs index 0fb8e2038..e6b868453 100644 --- a/rayon-core/src/registry.rs +++ b/rayon-core/src/registry.rs @@ -1,4 +1,5 @@ use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind}; +use crossbeam::sync::SegQueue; use crossbeam_deque::{Deque, Steal, Stealer}; use job::{JobRef, StackJob}; #[cfg(rayon_unstable)] @@ -12,7 +13,7 @@ use std::any::Any; use std::cell::Cell; use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; -use std::sync::{Arc, Mutex, Once, ONCE_INIT}; +use std::sync::{Arc, Once, ONCE_INIT}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; use std::thread; use std::mem; @@ -22,9 +23,8 @@ use util::leak; pub struct Registry { thread_infos: Vec, - state: Mutex, sleep: Sleep, - job_uninjector: Stealer, + injected_jobs: SegQueue, panic_handler: Option>, start_handler: Option>, exit_handler: Option>, @@ -45,10 +45,6 @@ pub struct Registry { terminate_latch: CountLatch, } -struct RegistryState { - job_injector: Deque, -} - /// //////////////////////////////////////////////////////////////////////// /// Initialization @@ -98,10 +94,7 @@ impl<'a> Drop for Terminator<'a> { impl Registry { pub fn new(mut builder: ThreadPoolBuilder) -> Result, ThreadPoolBuildError> { let n_threads = builder.get_num_threads(); - let breadth_first = builder.get_breadth_first(); - let inj_worker = Deque::new(); - let inj_stealer = inj_worker.stealer(); let workers: Vec<_> = (0..n_threads) .map(|_| Deque::new()) .collect(); @@ -111,9 +104,8 @@ impl Registry { thread_infos: stealers.into_iter() .map(|s| ThreadInfo::new(s)) .collect(), - state: Mutex::new(RegistryState::new(inj_worker)), sleep: Sleep::new(), - job_uninjector: inj_stealer, + injected_jobs: SegQueue::new(), terminate_latch: CountLatch::new(), panic_handler: builder.take_panic_handler(), start_handler: builder.take_start_handler(), @@ -132,7 +124,7 @@ impl Registry { if let Some(stack_size) = builder.get_stack_size() { b = b.stack_size(stack_size); } - if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }) { + if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index) }) { return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e))) } } @@ -291,34 +283,26 @@ impl Registry { /// you are not on a worker of this registry. pub fn inject(&self, injected_jobs: &[JobRef]) { log!(InjectJobs { count: injected_jobs.len() }); - { - let state = self.state.lock().unwrap(); - - // It should not be possible for `state.terminate` to be true - // here. It is only set to true when the user creates (and - // drops) a `ThreadPool`; and, in that case, they cannot be - // calling `inject()` later, since they dropped their - // `ThreadPool`. - assert!(!self.terminate_latch.probe(), "inject() sees state.terminate as true"); - - for &job_ref in injected_jobs { - state.job_injector.push(job_ref); - } + + // It should not be possible for `state.terminate` to be true + // here. It is only set to true when the user creates (and + // drops) a `ThreadPool`; and, in that case, they cannot be + // calling `inject()` later, since they dropped their + // `ThreadPool`. + assert!(!self.terminate_latch.probe(), "inject() sees state.terminate as true"); + + for &job_ref in injected_jobs { + self.injected_jobs.push(job_ref); } self.sleep.tickle(usize::MAX); } fn pop_injected_job(&self, worker_index: usize) -> Option { - loop { - match self.job_uninjector.steal() { - Steal::Empty => return None, - Steal::Data(d) => { - log!(UninjectedWork { worker: worker_index }); - return Some(d); - }, - Steal::Retry => {}, - } + let job = self.injected_jobs.try_pop(); + if job.is_some() { + log!(UninjectedWork { worker: worker_index }); } + job } /// If already in a worker-thread of this registry, just execute `op`. @@ -416,14 +400,6 @@ pub struct RegistryId { addr: usize } -impl RegistryState { - pub fn new(job_injector: Deque) -> RegistryState { - RegistryState { - job_injector: job_injector, - } - } -} - struct ThreadInfo { /// Latch set once thread has started and we are entering into the /// main loop. Used to wait for worker threads to become primed, @@ -457,9 +433,6 @@ pub struct WorkerThread { index: usize, - /// are these workers configured to steal breadth-first or not? - breadth_first: bool, - /// A weak random number generator. rng: XorShift64Star, @@ -522,17 +495,7 @@ impl WorkerThread { /// bottom. #[inline] pub unsafe fn take_local_job(&self) -> Option { - if !self.breadth_first { - self.worker.pop() - } else { - loop { - match self.worker.steal() { - Steal::Empty => return None, - Steal::Data(d) => return Some(d), - Steal::Retry => {}, - } - } - } + self.worker.pop() } /// Wait until the latch is set. Try to keep busy by popping and @@ -632,11 +595,9 @@ impl WorkerThread { unsafe fn main_loop(worker: Deque, registry: Arc, - index: usize, - breadth_first: bool) { + index: usize) { let worker_thread = WorkerThread { worker: worker, - breadth_first: breadth_first, index: index, rng: XorShift64Star::new(), registry: registry.clone(), diff --git a/rayon-core/src/scope/mod.rs b/rayon-core/src/scope/mod.rs index 5f349b97e..99ca2c9f6 100644 --- a/rayon-core/src/scope/mod.rs +++ b/rayon-core/src/scope/mod.rs @@ -4,9 +4,10 @@ //! [`scope()`]: fn.scope.html //! [`join()`]: ../join/join.fn.html +use crossbeam::sync::SegQueue; use latch::{Latch, CountLatch}; use log::Event::*; -use job::HeapJob; +use job::{HeapJob, Job, JobRef}; use std::any::Any; use std::fmt; use std::marker::PhantomData; @@ -33,6 +34,8 @@ pub struct Scope<'scope> { /// thread registry where `scope()` was executed. registry: Arc, + queue: ScopeQueue, + /// if some job panicked, the error is stored here; it will be /// propagated to the one who created the scope panic: AtomicPtr>, @@ -267,6 +270,7 @@ pub fn scope<'scope, OP, R>(op: OP) -> R panic: AtomicPtr::new(ptr::null_mut()), job_completed_latch: CountLatch::new(), marker: PhantomData, + queue: ScopeQueue::new(), }; let result = scope.execute_job_closure(op); scope.steal_till_jobs_complete(owner_thread); @@ -333,13 +337,10 @@ impl<'scope> Scope<'scope> { { unsafe { self.job_completed_latch.increment(); - let job_ref = Box::new(HeapJob::new(move || self.execute_job(body))) - .as_job_ref(); + let job = Box::new(HeapJob::new(move || self.execute_job(body))); - // Since `Scope` implements `Sync`, we can't be sure - // that we're still in a thread of this pool, so we - // can't just push to the local worker thread. - self.registry.inject_or_push(job_ref); + // Use our private queue to execute in FIFO order. + self.queue.inject(&self.registry, job.as_job_ref()); } } @@ -415,3 +416,38 @@ impl<'scope> fmt::Debug for Scope<'scope> { .finish() } } + +/// Private queue to provide FIFO job priority. +struct ScopeQueue { + inner: SegQueue, +} + +impl ScopeQueue { + fn new() -> Self { + ScopeQueue { + inner: SegQueue::new(), + } + } + + unsafe fn inject(&self, registry: &Registry, job_ref: JobRef) { + // A little indirection ensures that spawns are always prioritized in FIFO order. The + // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front + // (FIFO), but either way they will end up popping from the front of this queue. + self.inner.push(job_ref); + + // Since `Scope` implements `Sync`, we can't be sure that we're still in a thread of this + // pool, so we can't just push to the local worker thread. + registry.inject_or_push(JobRef::new(self)); + } +} + +impl Job for ScopeQueue { + unsafe fn execute(this: *const Self) { + // We "execute" a queue by executing its first job, FIFO. + (*this) + .inner + .try_pop() + .expect("job in scope queue") + .execute() + } +} diff --git a/rayon-core/src/spawn/mod.rs b/rayon-core/src/spawn/mod.rs index c118b1838..49e4db86f 100644 --- a/rayon-core/src/spawn/mod.rs +++ b/rayon-core/src/spawn/mod.rs @@ -87,7 +87,9 @@ pub unsafe fn spawn_in(func: F, registry: &Arc) // enqueued into some deque for later execution. let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic let job_ref = HeapJob::as_job_ref(async_job); - registry.inject_or_push(job_ref); + + // We always inject in the global queue to prioritize jobs in FIFO order. + registry.inject(&[job_ref]); mem::forget(abort_guard); } diff --git a/rayon-core/src/thread_pool/test.rs b/rayon-core/src/thread_pool/test.rs index 5c9a9ae79..56152fc60 100644 --- a/rayon-core/src/thread_pool/test.rs +++ b/rayon-core/src/thread_pool/test.rs @@ -1,5 +1,6 @@ #![cfg(test)] +use std::ops::Range; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -200,3 +201,87 @@ fn check_thread_pool_new() { let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap(); assert_eq!(pool.current_num_threads(), 22); } + +/// Wait for a successful compare-and-swap +fn wait_for_swap(x: &AtomicUsize, current: usize, new: usize) { + use std::{thread, time}; + + let start = time::Instant::now(); + let max = time::Duration::from_secs(60); + while start.elapsed() < max { + let previous = x.compare_and_swap(current, new, Ordering::Relaxed); + if previous == current { + return; + } + thread::yield_now(); + } + + // That's too long! + panic!("Counter didn't reach the expected value!"); +} + +/// Spawn jobs with compare-and-swap incrementing counters +fn pool_spawn_inc(pool: &ThreadPool, counter: &Arc, range: Range) { + for i in range { + let counter = counter.clone(); + pool.spawn(move || wait_for_swap(&counter, i, i + 1)); + } +} + +#[test] +fn threadpool_spawn_order() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + pool_spawn_inc(&pool, &counter, 0..100); + assert_eq!(wait_for_counter(counter), 100); +} + +#[test] +fn threadpool_install_spawn_order() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + pool.install(|| pool_spawn_inc(&pool, &counter, 0..100)); + assert_eq!(wait_for_counter(counter), 100); +} + +/// Spawn jobs with compare-and-swap incrementing counters in a scope +fn scope_spawn_inc(pool: &ThreadPool, counter: &AtomicUsize, range: Range) { + pool.scope(|scope| { + for i in range { + scope.spawn(move |_| wait_for_swap(counter, i, i + 1)); + } + }); +} + +#[test] +fn scope_spawn_order() { + let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap(); + + let counter = AtomicUsize::new(0); + scope_spawn_inc(&pool, &counter, 0..100); + assert_eq!(counter.into_inner(), 100); +} + +#[test] +fn nested_scope_spawn_order() { + let pool = ThreadPoolBuilder::new().num_threads(4).build().unwrap(); + + let counter = AtomicUsize::new(0); + pool.scope(|scope| { + let pool = &pool; + let counter = &counter; + for i in 0..10 { + scope.spawn(move |_| { + let j = 10 * i; + join( + || scope_spawn_inc(pool, counter, j..j + 5), + || scope_spawn_inc(pool, counter, j + 5..j + 10), + ); + }); + } + }); + + assert_eq!(counter.into_inner(), 100); +}