Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ lazy_static = "1"
[dependencies.crossbeam-deque]
version = "0.2.0"

# Also held back for rustc compatibility
[dependencies.crossbeam]
version = "0.3.0"

[dev-dependencies]
rand = "0.5"

Expand Down
49 changes: 20 additions & 29 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::marker::PhantomData;
use std::str::FromStr;
use std::fmt;

extern crate crossbeam;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -140,11 +141,6 @@ pub struct ThreadPoolBuilder {

/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
breadth_first: bool,
}

/// Contains the rayon thread pool configuration. Use [`ThreadPoolBuilder`] instead.
Expand Down Expand Up @@ -305,35 +301,32 @@ impl ThreadPoolBuilder {
self
}

/// Suggest to worker threads that they execute spawned jobs in a
/// "breadth-first" fashion. Typically, when a worker thread is
/// idle or blocked, it will attempt to execute the job from the
/// *top* of its local deque of work (i.e., the job most recently
/// spawned). If this flag is set to true, however, workers will
/// prefer to execute in a *breadth-first* fashion -- that is,
/// they will search for jobs at the *bottom* of their local
/// deque. (At present, workers *always* steal from the bottom of
/// other worker's deques, regardless of the setting of this
/// flag.)
/// **(DEPRECATED)** Suggest to worker threads that they execute
/// spawned jobs in a "breadth-first" fashion.
///
/// If you think of the tasks as a tree, where a parent task
/// spawns its children in the tree, then this flag loosely
/// corresponds to doing a breadth-first traversal of the tree,
/// whereas the default would be to do a depth-first traversal.
/// It used to be the case that threads would execute jobs from
/// their local deque in depth-first fashion by default, last-in
/// first-out (LIFO). This `breadth_first()` method changed to a
/// breadth-first strategy (FIFO) for the entire thread pool.
/// This can help some spawn workloads to prioritize items in the
/// same order they were queued, but this had a bad effect on
/// stack-oriented workloads like parallel iterators -- see
/// [rayon#590](https://github.com/rayon-rs/rayon/issues/590).
///
/// **Note that this is an "execution hint".** Rayon's task
/// **Note that this was just an "execution hint".** Rayon's task
/// execution is highly dynamic and the precise order in which
/// independent tasks are executed is not intended to be
/// guaranteed.
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
///
/// This `breadth_first()` method is now deprecated and has no
/// effect. Instead, we now **always** prioritize `spawn` jobs in
/// breadth-first/FIFO order, while keeping all other jobs in
/// depth-first/LIFO order.
#[deprecated(note = "spawns are now always breadth-first")]
pub fn breadth_first(self) -> Self {
self
}

fn get_breadth_first(&self) -> bool {
self.breadth_first
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -473,8 +466,7 @@ impl fmt::Debug for ThreadPoolBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ThreadPoolBuilder { ref num_threads, ref get_thread_name,
ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler,
ref breadth_first } = *self;
ref start_handler, ref exit_handler } = *self;

// Just print `Some(<closure>)` or `None` to the debug
// output.
Expand All @@ -496,7 +488,6 @@ impl fmt::Debug for ThreadPoolBuilder {
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
.finish()
}
}
Expand Down
81 changes: 21 additions & 60 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
use crossbeam::sync::SegQueue;
use crossbeam_deque::{Deque, Steal, Stealer};
use job::{JobRef, StackJob};
#[cfg(rayon_unstable)]
Expand All @@ -12,7 +13,7 @@ use std::any::Any;
use std::cell::Cell;
use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
use std::sync::{Arc, Once, ONCE_INIT};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use std::thread;
use std::mem;
Expand All @@ -22,9 +23,8 @@ use util::leak;

pub struct Registry {
thread_infos: Vec<ThreadInfo>,
state: Mutex<RegistryState>,
sleep: Sleep,
job_uninjector: Stealer<JobRef>,
injected_jobs: SegQueue<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
Expand All @@ -45,10 +45,6 @@ pub struct Registry {
terminate_latch: CountLatch,
}

struct RegistryState {
job_injector: Deque<JobRef>,
}

/// ////////////////////////////////////////////////////////////////////////
/// Initialization

Expand Down Expand Up @@ -98,10 +94,7 @@ impl<'a> Drop for Terminator<'a> {
impl Registry {
pub fn new(mut builder: ThreadPoolBuilder) -> Result<Arc<Registry>, ThreadPoolBuildError> {
let n_threads = builder.get_num_threads();
let breadth_first = builder.get_breadth_first();

let inj_worker = Deque::new();
let inj_stealer = inj_worker.stealer();
let workers: Vec<_> = (0..n_threads)
.map(|_| Deque::new())
.collect();
Expand All @@ -111,9 +104,8 @@ impl Registry {
thread_infos: stealers.into_iter()
.map(|s| ThreadInfo::new(s))
.collect(),
state: Mutex::new(RegistryState::new(inj_worker)),
sleep: Sleep::new(),
job_uninjector: inj_stealer,
injected_jobs: SegQueue::new(),
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
Expand All @@ -132,7 +124,7 @@ impl Registry {
if let Some(stack_size) = builder.get_stack_size() {
b = b.stack_size(stack_size);
}
if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }) {
if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index) }) {
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)))
}
}
Expand Down Expand Up @@ -291,34 +283,26 @@ impl Registry {
/// you are not on a worker of this registry.
pub fn inject(&self, injected_jobs: &[JobRef]) {
log!(InjectJobs { count: injected_jobs.len() });
{
let state = self.state.lock().unwrap();

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(!self.terminate_latch.probe(), "inject() sees state.terminate as true");

for &job_ref in injected_jobs {
state.job_injector.push(job_ref);
}

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(!self.terminate_latch.probe(), "inject() sees state.terminate as true");

for &job_ref in injected_jobs {
self.injected_jobs.push(job_ref);
}
self.sleep.tickle(usize::MAX);
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
loop {
match self.job_uninjector.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
log!(UninjectedWork { worker: worker_index });
return Some(d);
},
Steal::Retry => {},
}
let job = self.injected_jobs.try_pop();
if job.is_some() {
log!(UninjectedWork { worker: worker_index });
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down Expand Up @@ -416,14 +400,6 @@ pub struct RegistryId {
addr: usize
}

impl RegistryState {
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
RegistryState {
job_injector: job_injector,
}
}
}

struct ThreadInfo {
/// Latch set once thread has started and we are entering into the
/// main loop. Used to wait for worker threads to become primed,
Expand Down Expand Up @@ -457,9 +433,6 @@ pub struct WorkerThread {

index: usize,

/// are these workers configured to steal breadth-first or not?
breadth_first: bool,

/// A weak random number generator.
rng: XorShift64Star,

Expand Down Expand Up @@ -522,17 +495,7 @@ impl WorkerThread {
/// bottom.
#[inline]
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
if !self.breadth_first {
self.worker.pop()
} else {
loop {
match self.worker.steal() {
Steal::Empty => return None,
Steal::Data(d) => return Some(d),
Steal::Retry => {},
}
}
}
self.worker.pop()
}

/// Wait until the latch is set. Try to keep busy by popping and
Expand Down Expand Up @@ -632,11 +595,9 @@ impl WorkerThread {

unsafe fn main_loop(worker: Deque<JobRef>,
registry: Arc<Registry>,
index: usize,
breadth_first: bool) {
index: usize) {
let worker_thread = WorkerThread {
worker: worker,
breadth_first: breadth_first,
index: index,
rng: XorShift64Star::new(),
registry: registry.clone(),
Expand Down
50 changes: 43 additions & 7 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
//! [`scope()`]: fn.scope.html
//! [`join()`]: ../join/join.fn.html

use crossbeam::sync::SegQueue;
use latch::{Latch, CountLatch};
use log::Event::*;
use job::HeapJob;
use job::{HeapJob, Job, JobRef};
use std::any::Any;
use std::fmt;
use std::marker::PhantomData;
Expand All @@ -33,6 +34,8 @@ pub struct Scope<'scope> {
/// thread registry where `scope()` was executed.
registry: Arc<Registry>,

queue: ScopeQueue,

/// if some job panicked, the error is stored here; it will be
/// propagated to the one who created the scope
panic: AtomicPtr<Box<Any + Send + 'static>>,
Expand Down Expand Up @@ -267,6 +270,7 @@ pub fn scope<'scope, OP, R>(op: OP) -> R
panic: AtomicPtr::new(ptr::null_mut()),
job_completed_latch: CountLatch::new(),
marker: PhantomData,
queue: ScopeQueue::new(),
};
let result = scope.execute_job_closure(op);
scope.steal_till_jobs_complete(owner_thread);
Expand Down Expand Up @@ -333,13 +337,10 @@ impl<'scope> Scope<'scope> {
{
unsafe {
self.job_completed_latch.increment();
let job_ref = Box::new(HeapJob::new(move || self.execute_job(body)))
.as_job_ref();
let job = Box::new(HeapJob::new(move || self.execute_job(body)));

// Since `Scope` implements `Sync`, we can't be sure
// that we're still in a thread of this pool, so we
// can't just push to the local worker thread.
self.registry.inject_or_push(job_ref);
// Use our private queue to execute in FIFO order.
self.queue.inject(&self.registry, job.as_job_ref());
}
}

Expand Down Expand Up @@ -415,3 +416,38 @@ impl<'scope> fmt::Debug for Scope<'scope> {
.finish()
}
}

/// Private queue to provide FIFO job priority.
struct ScopeQueue {
inner: SegQueue<JobRef>,
}

impl ScopeQueue {
fn new() -> Self {
ScopeQueue {
inner: SegQueue::new(),
}
}

unsafe fn inject(&self, registry: &Registry, job_ref: JobRef) {
// A little indirection ensures that spawns are always prioritized in FIFO order. The
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
// (FIFO), but either way they will end up popping from the front of this queue.
self.inner.push(job_ref);

// Since `Scope` implements `Sync`, we can't be sure that we're still in a thread of this
// pool, so we can't just push to the local worker thread.
registry.inject_or_push(JobRef::new(self));
}
}

impl Job for ScopeQueue {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this)
.inner
.try_pop()
.expect("job in scope queue")
.execute()
}
}
4 changes: 3 additions & 1 deletion rayon-core/src/spawn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ pub unsafe fn spawn_in<F>(func: F, registry: &Arc<Registry>)
// enqueued into some deque for later execution.
let abort_guard = unwind::AbortIfPanic; // just in case we are wrong, and code CAN panic
let job_ref = HeapJob::as_job_ref(async_job);
registry.inject_or_push(job_ref);

// We always inject in the global queue to prioritize jobs in FIFO order.
registry.inject(&[job_ref]);
mem::forget(abort_guard);
}

Expand Down
Loading