Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rayon-core/src/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
// pushed on top of it in the stack, and we will have to pop
// those off to get to it.
while !job_b.latch.probe() {
if let Some(job) = worker_thread.pop() {
if let Some(job) = worker_thread.take_local_job() {
if job == job_b_ref {
// Found it! Let's run it.
//
Expand Down
32 changes: 30 additions & 2 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ pub struct Configuration {

/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
breadth_first: bool,
}

/// The type for a panic handling closure. Note that this same closure
Expand Down Expand Up @@ -239,6 +244,26 @@ impl Configuration {
self
}

/// Configure worker threads to execute spawned jobs in a
/// "breadth-first" fashion. Typically, when a worker thread is
/// idle or blocked, it will attempt to execute the job from the
/// *top* of its local deque of work (i.e., the job most recently
/// spawned). If this flag is set to true, however, workers will
/// prefer to execute in a *breadth-first* fashion -- that is,
/// they will search for jobs at the *bottom* of their local
/// deque.
///
/// At present, workers *always* steal from the bottom of other
/// worker's deques, regardless of the setting of this flag.
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
self
}

fn get_breadth_first(&self) -> bool {
self.breadth_first
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -308,8 +333,10 @@ pub fn dump_stats() {

impl fmt::Debug for Configuration {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let Configuration { ref num_threads, ref get_thread_name, ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler } = *self;
let Configuration { ref num_threads, ref get_thread_name,
ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler,
ref breadth_first } = *self;

// Just print `Some("<closure>")` or `None` to the debug
// output.
Expand All @@ -328,6 +355,7 @@ impl fmt::Debug for Configuration {
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("breadth_first", &breadth_first)
.finish()
}
}
46 changes: 38 additions & 8 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl<'a> Drop for Terminator<'a> {
impl Registry {
pub fn new(mut configuration: Configuration) -> Result<Arc<Registry>, Box<Error>> {
let n_threads = configuration.get_num_threads();
let breadth_first = configuration.get_breadth_first();

let (inj_worker, inj_stealer) = deque::new();
let (workers, stealers): (Vec<_>, Vec<_>) = (0..n_threads).map(|_| deque::new()).unzip();
Expand Down Expand Up @@ -141,7 +142,7 @@ impl Registry {
if let Some(stack_size) = configuration.get_stack_size() {
b = b.stack_size(stack_size);
}
try!(b.spawn(move || unsafe { main_loop(worker, registry, index) }));
try!(b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }));
}

// Returning normally now, without termination.
Expand Down Expand Up @@ -349,9 +350,17 @@ impl ThreadInfo {
/// WorkerThread identifiers

pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Worker<JobRef>,

/// a "stealer" half of our local deque; used in BFS mode
stealer: Stealer<JobRef>,

index: usize,

/// are these workers configured to steal breadth-first or not?
breadth_first: bool,

/// A weak random number generator.
rng: UnsafeCell<rand::XorShiftRng>,

Expand Down Expand Up @@ -403,11 +412,27 @@ impl WorkerThread {
self.registry.sleep.tickle(self.index);
}

/// Pop `job` from top of stack, returning `false` if it has been
/// stolen.
#[inline]
pub unsafe fn pop(&self) -> Option<JobRef> {
self.worker.pop()
pub fn local_deque_is_empty(&self) -> bool {
self.worker.len() == 0
}

#[inline]
pub fn breadth_first(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method used anywhere?

self.breadth_first
}

/// Attempts to obtain a "local" job -- typically this means
/// popping from the top of the stack, though if we are configured
/// for breadth-first execution, it would mean dequeuing from the
/// bottom.
#[inline]
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
if !self.breadth_first {
self.worker.pop()
} else {
self.stealer.steal()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just published a new version of Coco that introduces Worker::steal method. Now you can call self.worker.steal() here. This change will give some performance wins in breadth-first mode. I'd be curious to know how much :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice!

}
}

/// Wait until the latch is set. Try to keep busy by popping and
Expand Down Expand Up @@ -436,7 +461,7 @@ impl WorkerThread {
// deques, and finally to injected jobs from the
// outside. The idea is to finish what we started before
// we take on something new.
if let Some(job) = self.pop()
if let Some(job) = self.take_local_job()
.or_else(|| self.steal())
.or_else(|| self.registry.pop_injected_job(self.index)) {
yields = self.registry.sleep.work_found(self.index, yields);
Expand Down Expand Up @@ -506,9 +531,14 @@ impl WorkerThread {

/// ////////////////////////////////////////////////////////////////////////

unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usize) {
unsafe fn main_loop(worker: Worker<JobRef>,
registry: Arc<Registry>,
index: usize,
breadth_first: bool) {
let worker_thread = WorkerThread {
worker: worker,
stealer: registry.thread_infos[index].stealer.clone(),
breadth_first: breadth_first,
index: index,
rng: UnsafeCell::new(rand::weak_rng()),
registry: registry.clone(),
Expand Down Expand Up @@ -538,7 +568,7 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
worker_thread.wait_until(&registry.terminate_latch);

// Should not be any work left in our queue.
debug_assert!(worker_thread.pop().is_none());
debug_assert!(worker_thread.take_local_job().is_none());

// let registry know we are done
registry.thread_infos[index].stopped.set();
Expand Down
34 changes: 34 additions & 0 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,40 @@ impl<'scope> Scope<'scope> {
}
}

/// If `body` would be the next task to execute, then invoke it directly;
/// otherwise, spawn normally. This is an optimization that you can use for
/// final task that a given thread will spawn into a scope before completing.
///
/// The precise behavior depends on whether we are in DFS or BFS
/// mode. If this thread is in depth-first mode, then this is
/// equivalent to just invoking `body()` directly. Otherwise, in
/// bread-first mode, check if the local deque is empty. If so,
/// then there are no higher-priority tasks, so just call `body()`
/// directly. Otherwise, spawn normally.
///
/// If you know that your threads are in depth-first mode, you are
/// better off just calling `body()` directly than using this
/// function.
pub fn spawn_tail<BODY>(&self, body: BODY)
where BODY: FnOnce(&Scope<'scope>) + 'scope
{
let worker_thread = WorkerThread::current();

// we must always be in a worker thread, since the scope is
// neither send nor sync, and hence only travels between
// threads when we pass the pointer to spawned tasks.
debug_assert!(!WorkerThread::current().is_null());

unsafe {
let worker_thread = &*worker_thread;
if !worker_thread.breadth_first() || worker_thread.local_deque_is_empty() {
body(self)
} else {
self.spawn(body);
}
}
}

#[cfg(feature = "unstable")]
pub fn spawn_future<F>(&self, future: F) -> RayonFuture<F::Item, F::Error>
where F: Future + Send + 'scope
Expand Down