Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ lazy_static = "1"
[dependencies.crossbeam-deque]
version = "0.2.0"

# Also held back for rustc compatibility
[dependencies.crossbeam]
version = "0.3.0"

[dev-dependencies]
rand = "0.5"

Expand Down
33 changes: 33 additions & 0 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crossbeam::sync::SegQueue;
use latch::Latch;
use std::any::Any;
use std::cell::UnsafeCell;
Expand Down Expand Up @@ -179,3 +180,35 @@ impl<T> JobResult<T> {
}
}
}

/// Indirect queue to provide FIFO job priority.
pub struct JobFifo {
inner: SegQueue<JobRef>,
}

impl JobFifo {
pub fn new() -> Self {
JobFifo {
inner: SegQueue::new(),
}
}

pub unsafe fn push(&self, job_ref: JobRef) -> JobRef {
// A little indirection ensures that spawns are always prioritized in FIFO order. The
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
// (FIFO), but either way they will end up popping from the front of this queue.
self.inner.push(job_ref);
JobRef::new(self)
}
}

impl Job for JobFifo {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this)
.inner
.try_pop()
.expect("job in fifo queue")
.execute()
}
}
33 changes: 22 additions & 11 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::io;
use std::marker::PhantomData;
use std::str::FromStr;

extern crate crossbeam;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -61,7 +62,8 @@ mod test;
pub mod internal;
pub use join::{join, join_context};
pub use scope::{scope, Scope};
pub use spawn::spawn;
pub use scope::{scope_fifo, ScopeFifo};
pub use spawn::{spawn, spawn_fifo};
pub use thread_pool::current_thread_has_pending_tasks;
pub use thread_pool::current_thread_index;
pub use thread_pool::ThreadPool;
Expand Down Expand Up @@ -314,16 +316,17 @@ impl ThreadPoolBuilder {
self
}

/// Suggest to worker threads that they execute spawned jobs in a
/// "breadth-first" fashion. Typically, when a worker thread is
/// idle or blocked, it will attempt to execute the job from the
/// *top* of its local deque of work (i.e., the job most recently
/// spawned). If this flag is set to true, however, workers will
/// prefer to execute in a *breadth-first* fashion -- that is,
/// they will search for jobs at the *bottom* of their local
/// deque. (At present, workers *always* steal from the bottom of
/// other worker's deques, regardless of the setting of this
/// flag.)
/// **(DEPRECATED)** Suggest to worker threads that they execute
/// spawned jobs in a "breadth-first" fashion.
///
/// Typically, when a worker thread is idle or blocked, it will
/// attempt to execute the job from the *top* of its local deque of
/// work (i.e., the job most recently spawned). If this flag is set
/// to true, however, workers will prefer to execute in a
/// *breadth-first* fashion -- that is, they will search for jobs at
/// the *bottom* of their local deque. (At present, workers *always*
/// steal from the bottom of other worker's deques, regardless of
/// the setting of this flag.)
///
/// If you think of the tasks as a tree, where a parent task
/// spawns its children in the tree, then this flag loosely
Expand All @@ -334,6 +337,14 @@ impl ThreadPoolBuilder {
/// execution is highly dynamic and the precise order in which
/// independent tasks are executed is not intended to be
/// guaranteed.
///
/// This `breadth_first()` method is now deprecated per [RFC #1],
/// and in the future its effect may be removed. Consider using
/// [`scope_fifo()`] for a similar effect.
///
/// [RFC #1]: https://github.com/rayon-rs/rfcs/blob/master/accepted/rfc0001-scope-scheduling.md
/// [`scope_fifo()`]: fn.scope_fifo.html
#[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
self
Expand Down
92 changes: 45 additions & 47 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crossbeam::sync::SegQueue;
use crossbeam_deque::{Deque, Steal, Stealer};
#[cfg(rayon_unstable)]
use internal::task::Task;
#[cfg(rayon_unstable)]
use job::Job;
use job::{JobRef, StackJob};
use job::{JobFifo, JobRef, StackJob};
use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
use log::Event::*;
use sleep::Sleep;
Expand All @@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
use std::sync::{Arc, Once, ONCE_INIT};
use std::thread;
use std::usize;
use unwind;
Expand All @@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T

pub struct Registry {
thread_infos: Vec<ThreadInfo>,
state: Mutex<RegistryState>,
sleep: Sleep,
job_uninjector: Stealer<JobRef>,
injected_jobs: SegQueue<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
Expand All @@ -45,10 +45,6 @@ pub struct Registry {
terminate_latch: CountLatch,
}

struct RegistryState {
job_injector: Deque<JobRef>,
}

/// ////////////////////////////////////////////////////////////////////////
/// Initialization
Expand Down Expand Up @@ -104,16 +100,13 @@ impl Registry {
let n_threads = builder.get_num_threads();
let breadth_first = builder.get_breadth_first();

let inj_worker = Deque::new();
let inj_stealer = inj_worker.stealer();
let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect();
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();

let registry = Arc::new(Registry {
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
state: Mutex::new(RegistryState::new(inj_worker)),
sleep: Sleep::new(),
job_uninjector: inj_stealer,
injected_jobs: SegQueue::new(),
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
Expand Down Expand Up @@ -175,6 +168,18 @@ impl Registry {
}
}

/// Returns the current `WorkerThread` if it's part of this `Registry`.
pub fn current_thread(&self) -> Option<&WorkerThread> {
unsafe {
if let Some(worker) = WorkerThread::current().as_ref() {
if worker.registry().id() == self.id() {
return Some(worker);
}
}
None
}
}

/// Returns an opaque identifier for this registry.
pub fn id(&self) -> RegistryId {
// We can rely on `self` not to change since we only ever create
Expand Down Expand Up @@ -297,39 +302,31 @@ impl Registry {
log!(InjectJobs {
count: injected_jobs.len()
});
{
let state = self.state.lock().unwrap();

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(
!self.terminate_latch.probe(),
"inject() sees state.terminate as true"
);

for &job_ref in injected_jobs {
state.job_injector.push(job_ref);
}

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(
!self.terminate_latch.probe(),
"inject() sees state.terminate as true"
);

for &job_ref in injected_jobs {
self.injected_jobs.push(job_ref);
}
self.sleep.tickle(usize::MAX);
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
loop {
match self.job_uninjector.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
log!(UninjectedWork {
worker: worker_index
});
return Some(d);
}
Steal::Retry => {}
}
let job = self.injected_jobs.try_pop();
if job.is_some() {
log!(UninjectedWork {
worker: worker_index
});
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down Expand Up @@ -439,14 +436,6 @@ pub struct RegistryId {
addr: usize,
}

impl RegistryState {
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
RegistryState {
job_injector: job_injector,
}
}
}

struct ThreadInfo {
/// Latch set once thread has started and we are entering into the
/// main loop. Used to wait for worker threads to become primed,
Expand Down Expand Up @@ -478,6 +467,9 @@ pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Deque<JobRef>,

/// local queue used for `spawn_fifo` indirection
fifo: JobFifo,

index: usize,

/// are these workers configured to steal breadth-first or not?
Expand Down Expand Up @@ -534,6 +526,11 @@ impl WorkerThread {
self.registry.sleep.tickle(self.index);
}

#[inline]
pub unsafe fn push_fifo(&self, job: JobRef) {
self.push(self.fifo.push(job));
}

#[inline]
pub fn local_deque_is_empty(&self) -> bool {
self.worker.len() == 0
Expand Down Expand Up @@ -663,6 +660,7 @@ unsafe fn main_loop(
) {
let worker_thread = WorkerThread {
worker: worker,
fifo: JobFifo::new(),
breadth_first: breadth_first,
index: index,
rng: XorShift64Star::new(),
Expand Down
8 changes: 4 additions & 4 deletions rayon-core/src/scope/internal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg(rayon_unstable)]

use super::Scope;
use super::{Scope, ScopeBase};
use internal::task::{ScopeHandle, Task, ToScopeHandle};
use std::any::Any;
use std::mem;
Expand All @@ -16,16 +16,16 @@ impl<'scope> ToScopeHandle<'scope> for Scope<'scope> {

#[derive(Debug)]
pub struct LocalScopeHandle<'scope> {
scope: *const Scope<'scope>,
scope: *const ScopeBase<'scope>,
}

impl<'scope> LocalScopeHandle<'scope> {
/// Caller guarantees that `*scope` will remain valid
/// until the scope completes. Since we acquire a ref,
/// that means it will remain valid until we release it.
unsafe fn new(scope: &Scope<'scope>) -> Self {
scope.job_completed_latch.increment();
LocalScopeHandle { scope: scope }
scope.base.increment();
LocalScopeHandle { scope: &scope.base }
}
}

Expand Down
Loading