Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ build = "build.rs"
[dependencies]
rand = "0.3"
num_cpus = "1.2"
coco = "0.1.0"
coco = "0.1.1"
libc = "0.2.16"
lazy_static = "0.2.2"
futures = { version = "0.1.7", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion rayon-core/src/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
// pushed on top of it in the stack, and we will have to pop
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.pop() {
if let Some(job) = worker_thread.take_local_job() {
if job == job_b_ref {
// Found it! Let's run it.
//
Expand Down
57 changes: 45 additions & 12 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ mod unwind;
mod util;

pub use thread_pool::ThreadPool;
pub use thread_pool::current_thread_index;
pub use thread_pool::current_thread_has_pending_tasks;
pub use join::join;
pub use scope::{scope, Scope};
#[cfg(feature = "unstable")]
Expand All @@ -73,9 +75,10 @@ pub use spawn::spawn_future;
pub use future::RayonFuture;

/// Returns the number of threads in the current registry. If this
/// code is executing within the Rayon thread-pool, then this will be
/// the number of threads for the current thread-pool. Otherwise, it
/// will be the number of threads for the global thread-pool.
/// code is executing within a Rayon thread-pool, then this will be
/// the number of threads for the thread-pool of the current
/// thread. Otherwise, it will be the number of threads for the global
/// thread-pool.
///
/// This can be useful when trying to judge how many times to split
/// parallel work (the parallel iterator traits use this value
Expand Down Expand Up @@ -116,6 +119,11 @@ pub struct Configuration {

/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
breadth_first: bool,
}

/// The type for a panic handling closure. Note that this same closure
Expand Down Expand Up @@ -239,6 +247,35 @@ impl Configuration {
self
}

/// Suggest to worker threads that they execute spawned jobs in a
/// "breadth-first" fashion. Typically, when a worker thread is
/// idle or blocked, it will attempt to execute the job from the
/// *top* of its local deque of work (i.e., the job most recently
/// spawned). If this flag is set to true, however, workers will
/// prefer to execute in a *breadth-first* fashion -- that is,
/// they will search for jobs at the *bottom* of their local
/// deque. (At present, workers *always* steal from the bottom of
/// other worker's deques, regardless of the setting of this
/// flag.)
///
/// If you think of the tasks as a tree, where a parent task
/// spawns its children in the tree, then this flag loosely
/// corresponds to doing a breadth-first traversal of the tree,
/// whereas the default would be to do a depth-first traversal.
///
/// **Note that this is an "execution hint".** Rayon's task
/// execution is highly dynamic and the precise order in which
/// independent tasks are executed is not intended to be
/// guaranteed.
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
self
}

fn get_breadth_first(&self) -> bool {
self.breadth_first
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -299,17 +336,12 @@ pub fn initialize(config: Configuration) -> Result<(), Box<Error>> {
Ok(())
}

/// This is a debugging API not really intended for end users. It will
/// dump some performance statistics out using `println`.
#[cfg(feature = "unstable")]
pub fn dump_stats() {
dump_stats!();
}

impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Configuration { ref num_threads, ref get_thread_name, ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler } = *self;
let Configuration { ref num_threads, ref get_thread_name,
ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler,
ref breadth_first } = *self;

// Just print `Some("<closure>")` or `None` to the debug
// output.
Expand All @@ -328,6 +360,7 @@ impl fmt::Debug for Configuration {
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
.finish()
}
}
28 changes: 0 additions & 28 deletions rayon-core/src/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! variable, which is still supported for backwards compatibility.

use std::env;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};

#[derive(Debug)]
pub enum Event {
Expand Down Expand Up @@ -56,30 +55,3 @@ macro_rules! log {
if ::log::DUMP_LOGS { if *::log::LOG_ENV { println!("{:?}", $event); } }
}
}

pub static STOLEN_JOB: AtomicUsize = ATOMIC_USIZE_INIT;

macro_rules! stat_stolen {
() => {
::log::STOLEN_JOB.fetch_add(1, ::std::sync::atomic::Ordering::SeqCst);
}
}

pub static POPPED_JOB: AtomicUsize = ATOMIC_USIZE_INIT;

macro_rules! stat_popped {
() => {
::log::POPPED_JOB.fetch_add(1, ::std::sync::atomic::Ordering::SeqCst);
}
}

macro_rules! dump_stats {
() => {
{
let stolen = ::log::STOLEN_JOB.load(::std::sync::atomic::Ordering::SeqCst);
println!("Jobs stolen: {:?}", stolen);
let popped = ::log::POPPED_JOB.load(::std::sync::atomic::Ordering::SeqCst);
println!("Jobs popped: {:?}", popped);
}
}
}
37 changes: 29 additions & 8 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl<'a> Drop for Terminator<'a> {
impl Registry {
pub fn new(mut configuration: Configuration) -> Result<Arc<Registry>, Box<Error>> {
let n_threads = configuration.get_num_threads();
let breadth_first = configuration.get_breadth_first();

let (inj_worker, inj_stealer) = deque::new();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads).map(|_| deque::new()).unzip();
Expand Down Expand Up @@ -141,7 +142,7 @@ impl Registry {
if let Some(stack_size) = configuration.get_stack_size() {
b = b.stack_size(stack_size);
}
try!(b.spawn(move || unsafe { main_loop(worker, registry, index) }));
try!(b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }));
}

// Returning normally now, without termination.
Expand Down Expand Up @@ -349,9 +350,14 @@ impl ThreadInfo {
/// WorkerThread identifiers

pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,

index: usize,

/// are these workers configured to steal breadth-first or not?
breadth_first: bool,

/// A weak random number generator.
rng: UnsafeCell<rand::XorShiftRng>,

Expand Down Expand Up @@ -403,11 +409,22 @@ impl WorkerThread {
self.registry.sleep.tickle(self.index);
}

/// Pop `job` from top of stack, returning `false` if it has been
/// stolen.
#[inline]
pub unsafe fn pop(&self) -> Option<JobRef> {
self.worker.pop()
pub fn local_deque_is_empty(&self) -> bool {
self.worker.len() == 0
}

/// Attempts to obtain a "local" job -- typically this means
/// popping from the top of the stack, though if we are configured
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
if !self.breadth_first {
self.worker.pop()
} else {
self.worker.steal()
}
}

/// Wait until the latch is set. Try to keep busy by popping and
Expand Down Expand Up @@ -436,7 +453,7 @@ impl WorkerThread {
// deques, and finally to injected jobs from the
// outside. The idea is to finish what we started before
// we take on something new.
if let Some(job) = self.pop()
if let Some(job) = self.take_local_job()
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job(self.index)) {
yields = self.registry.sleep.work_found(self.index, yields);
Expand Down Expand Up @@ -506,9 +523,13 @@ impl WorkerThread {

/// ////////////////////////////////////////////////////////////////////////

unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
unsafe fn main_loop(worker: Worker<JobRef>,
registry: Arc<Registry>,
index: usize,
breadth_first: bool) {
let worker_thread = WorkerThread {
worker: worker,
breadth_first: breadth_first,
index: index,
rng: UnsafeCell::new(rand::weak_rng()),
registry: registry.clone(),
Expand Down Expand Up @@ -538,7 +559,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
worker_thread.wait_until(&registry.terminate_latch);

// Should not be any work left in our queue.
debug_assert!(worker_thread.pop().is_none());
debug_assert!(worker_thread.take_local_job().is_none());

// let registry know we are done
registry.thread_infos[index].stopped.set();
Expand Down
89 changes: 89 additions & 0 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl ThreadPool {
/// `num_threads()` method for details][snt]).
///
/// [snt]: struct.Configuration.html#method.num_threads
#[inline]
pub fn current_num_threads(&self) -> usize {
self.registry.num_threads()
}
Expand All @@ -155,6 +156,7 @@ impl ThreadPool {
/// restarted.
///
/// [snt]: struct.Configuration.html#method.num_threads
#[inline]
pub fn current_thread_index(&self) -> Option<usize> {
unsafe {
let curr = WorkerThread::current();
Expand All @@ -168,6 +170,41 @@ impl ThreadPool {
}
}

/// Returns true if the current worker thread currently has "local
/// tasks" pending. This can be useful as part of a heuristic for
/// deciding whether to spawn a new task or execute code on the
/// current thread, particularly in breadth-first
/// schedulers. However, keep in mind that this is an inherently
/// racy check, as other worker threads may be actively "stealing"
/// tasks from our local deque.
///
/// **Background:** Rayon's uses a [work-stealing] scheduler. The
/// key idea is that each thread has its own [deque] of
/// tasks. Whenever a new task is spawned -- whether through
/// `join()`, `Scope::spawn()`, or some other means -- that new
/// task is pushed onto the thread's *local* deque. Worker threads
/// have a preference for executing their own tasks; if however
/// they run out of tasks, they will go try to "steal" tasks from
/// other threads. This function therefore has an inherent race
/// with other active worker threads, which may be removing items
/// from the local deque.
///
/// [work-stealing]: https://en.wikipedia.org/wiki/Work_stealing
/// [deque]: https://en.wikipedia.org/wiki/Double-ended_queue
#[inline]
pub fn current_thread_has_pending_tasks(&self) -> Option<bool> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else if (*curr).registry().id() != self.registry.id() {
None
} else {
Some(!(*curr).local_deque_is_empty())
}
}
}

/// Execute `oper_a` and `oper_b` in the thread-pool and return
/// the results. Equivalent to `self.install(|| join(oper_a,
/// oper_b))`.
Expand Down Expand Up @@ -256,3 +293,55 @@ impl Drop for ThreadPool {
}
}

/// If called from a Rayon worker thread, returns the index of that
/// thread within its current pool; if not called from a Rayon thread,
/// returns `None`.
///
/// The index for a given thread will not change over the thread's
/// lifetime. However, multiple threads may share the same index if
/// they are in distinct thread-pools.
///
/// See also: [the `ThreadPool::current_thread_index()` method].
///
/// [m]: struct.ThreadPool.html#method.current_thread_index
///
/// ### Future compatibility note
///
/// Currently, every thread-pool (including the global
/// thread-pool) has a fixed number of threads, but this may
/// change in future Rayon versions (see [the `num_threads()` method
/// for details][snt]). In that case, the index for a
/// thread would not change during its lifetime, but thread
/// indices may wind up being reused if threads are terminated and
/// restarted.
///
/// [snt]: struct.Configuration.html#method.num_threads
#[inline]
pub fn current_thread_index() -> Option<usize> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else {
Some((*curr).index())
}
}
}

/// If called from a Rayon worker thread, indicates whether that
/// thread's local deque still has pending tasks. Otherwise, returns
/// `None`. For more information, see [the
/// `ThreadPool::current_thread_has_pending_tasks()` method][m].
///
/// [m]: struct.ThreadPool.html#method.current_thread_has_pending_tasks
#[inline]
pub fn current_thread_has_pending_tasks() -> Option<bool> {
unsafe {
let curr = WorkerThread::current();
if curr.is_null() {
None
} else {
Some(!(*curr).local_deque_is_empty())
}
}
}