Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ lazy_static = "1"
[dependencies.crossbeam-deque]
version = "0.2.0"

# Also held back for rustc compatibility
[dependencies.crossbeam]
version = "0.3.0"

[dev-dependencies]
rand = "0.5"

Expand Down
33 changes: 33 additions & 0 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crossbeam::sync::SegQueue;
use latch::Latch;
use std::any::Any;
use std::cell::UnsafeCell;
Expand Down Expand Up @@ -179,3 +180,35 @@ impl<T> JobResult<T> {
}
}
}

/// Indirect queue to provide FIFO job priority.
pub struct JobFifo {
inner: SegQueue<JobRef>,
}

impl JobFifo {
pub fn new() -> Self {
JobFifo {
inner: SegQueue::new(),
}
}

pub unsafe fn push(&self, job_ref: JobRef) -> JobRef {
// A little indirection ensures that spawns are always prioritized in FIFO order. The
// jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front
// (FIFO), but either way they will end up popping from the front of this queue.
self.inner.push(job_ref);
JobRef::new(self)
}
}

impl Job for JobFifo {
unsafe fn execute(this: *const Self) {
// We "execute" a queue by executing its first job, FIFO.
(*this)
.inner
.try_pop()
.expect("job in fifo queue")
.execute()
}
}
31 changes: 20 additions & 11 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::io;
use std::marker::PhantomData;
use std::str::FromStr;

extern crate crossbeam;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
Expand Down Expand Up @@ -61,7 +62,8 @@ mod test;
pub mod internal;
pub use join::{join, join_context};
pub use scope::{scope, Scope};
pub use spawn::spawn;
pub use scope::{scope_fifo, ScopeFifo};
pub use spawn::{spawn, spawn_fifo};
pub use thread_pool::current_thread_has_pending_tasks;
pub use thread_pool::current_thread_index;
pub use thread_pool::ThreadPool;
Expand Down Expand Up @@ -314,16 +316,17 @@ impl ThreadPoolBuilder {
self
}

/// Suggest to worker threads that they execute spawned jobs in a
/// "breadth-first" fashion. Typically, when a worker thread is
/// idle or blocked, it will attempt to execute the job from the
/// *top* of its local deque of work (i.e., the job most recently
/// spawned). If this flag is set to true, however, workers will
/// prefer to execute in a *breadth-first* fashion -- that is,
/// they will search for jobs at the *bottom* of their local
/// deque. (At present, workers *always* steal from the bottom of
/// other worker's deques, regardless of the setting of this
/// flag.)
/// **(DEPRECATED)** Suggest to worker threads that they execute
/// spawned jobs in a "breadth-first" fashion.
///
/// Typically, when a worker thread is idle or blocked, it will
/// attempt to execute the job from the *top* of its local deque of
/// work (i.e., the job most recently spawned). If this flag is set
/// to true, however, workers will prefer to execute in a
/// *breadth-first* fashion -- that is, they will search for jobs at
/// the *bottom* of their local deque. (At present, workers *always*
/// steal from the bottom of other worker's deques, regardless of
/// the setting of this flag.)
///
/// If you think of the tasks as a tree, where a parent task
/// spawns its children in the tree, then this flag loosely
Expand All @@ -334,6 +337,12 @@ impl ThreadPoolBuilder {
/// execution is highly dynamic and the precise order in which
/// independent tasks are executed is not intended to be
/// guaranteed.
///
/// This `breadth_first()` method is now deprecated per [RFC #1],
/// and in the future its effect may be removed.
///
/// [RFC #1]: https://github.com/rayon-rs/rfcs/pull/1
#[deprecated(note = "use `scope_fifo` and `spawn_fifo` for similar effect")]
pub fn breadth_first(mut self) -> Self {
self.breadth_first = true;
self
Expand Down
92 changes: 45 additions & 47 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crossbeam::sync::SegQueue;
use crossbeam_deque::{Deque, Steal, Stealer};
#[cfg(rayon_unstable)]
use internal::task::Task;
#[cfg(rayon_unstable)]
use job::Job;
use job::{JobRef, StackJob};
use job::{JobFifo, JobRef, StackJob};
use latch::{CountLatch, Latch, LatchProbe, LockLatch, SpinLatch, TickleLatch};
use log::Event::*;
use sleep::Sleep;
Expand All @@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher;
use std::hash::Hasher;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Mutex, Once, ONCE_INIT};
use std::sync::{Arc, Once, ONCE_INIT};
use std::thread;
use std::usize;
use unwind;
Expand All @@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T

pub struct Registry {
thread_infos: Vec<ThreadInfo>,
state: Mutex<RegistryState>,
sleep: Sleep,
job_uninjector: Stealer<JobRef>,
injected_jobs: SegQueue<JobRef>,
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
Expand All @@ -45,10 +45,6 @@ pub struct Registry {
terminate_latch: CountLatch,
}

struct RegistryState {
job_injector: Deque<JobRef>,
}

/// ////////////////////////////////////////////////////////////////////////
/// Initialization
Expand Down Expand Up @@ -104,16 +100,13 @@ impl Registry {
let n_threads = builder.get_num_threads();
let breadth_first = builder.get_breadth_first();

let inj_worker = Deque::new();
let inj_stealer = inj_worker.stealer();
let workers: Vec<_> = (0..n_threads).map(|_| Deque::new()).collect();
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();

let registry = Arc::new(Registry {
thread_infos: stealers.into_iter().map(|s| ThreadInfo::new(s)).collect(),
state: Mutex::new(RegistryState::new(inj_worker)),
sleep: Sleep::new(),
job_uninjector: inj_stealer,
injected_jobs: SegQueue::new(),
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
Expand Down Expand Up @@ -175,6 +168,18 @@ impl Registry {
}
}

/// Returns the current `WorkerThread` if it's part of this `Registry`.
pub fn current_thread(&self) -> Option<&WorkerThread> {
unsafe {
if let Some(worker) = WorkerThread::current().as_ref() {
if worker.registry().id() == self.id() {
return Some(worker);
}
}
None
}
}

/// Returns an opaque identifier for this registry.
pub fn id(&self) -> RegistryId {
// We can rely on `self` not to change since we only ever create
Expand Down Expand Up @@ -297,39 +302,31 @@ impl Registry {
log!(InjectJobs {
count: injected_jobs.len()
});
{
let state = self.state.lock().unwrap();

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(
!self.terminate_latch.probe(),
"inject() sees state.terminate as true"
);

for &job_ref in injected_jobs {
state.job_injector.push(job_ref);
}

// It should not be possible for `state.terminate` to be true
// here. It is only set to true when the user creates (and
// drops) a `ThreadPool`; and, in that case, they cannot be
// calling `inject()` later, since they dropped their
// `ThreadPool`.
assert!(
!self.terminate_latch.probe(),
"inject() sees state.terminate as true"
);

for &job_ref in injected_jobs {
self.injected_jobs.push(job_ref);
}
self.sleep.tickle(usize::MAX);
}

fn pop_injected_job(&self, worker_index: usize) -> Option<JobRef> {
loop {
match self.job_uninjector.steal() {
Steal::Empty => return None,
Steal::Data(d) => {
log!(UninjectedWork {
worker: worker_index
});
return Some(d);
}
Steal::Retry => {}
}
let job = self.injected_jobs.try_pop();
if job.is_some() {
log!(UninjectedWork {
worker: worker_index
});
}
job
}

/// If already in a worker-thread of this registry, just execute `op`.
Expand Down Expand Up @@ -439,14 +436,6 @@ pub struct RegistryId {
addr: usize,
}

impl RegistryState {
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
RegistryState {
job_injector: job_injector,
}
}
}

struct ThreadInfo {
/// Latch set once thread has started and we are entering into the
/// main loop. Used to wait for worker threads to become primed,
Expand Down Expand Up @@ -478,6 +467,9 @@ pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Deque<JobRef>,

/// local queue used for `spawn_fifo` indirection
fifo: JobFifo,

index: usize,

/// are these workers configured to steal breadth-first or not?
Expand Down Expand Up @@ -534,6 +526,11 @@ impl WorkerThread {
self.registry.sleep.tickle(self.index);
}

#[inline]
pub unsafe fn push_fifo(&self, job: JobRef) {
self.push(self.fifo.push(job));
}

#[inline]
pub fn local_deque_is_empty(&self) -> bool {
self.worker.len() == 0
Expand Down Expand Up @@ -663,6 +660,7 @@ unsafe fn main_loop(
) {
let worker_thread = WorkerThread {
worker: worker,
fifo: JobFifo::new(),
breadth_first: breadth_first,
index: index,
rng: XorShift64Star::new(),
Expand Down
8 changes: 4 additions & 4 deletions rayon-core/src/scope/internal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg(rayon_unstable)]

use super::Scope;
use super::{Scope, ScopeBase};
use internal::task::{ScopeHandle, Task, ToScopeHandle};
use std::any::Any;
use std::mem;
Expand All @@ -16,16 +16,16 @@ impl<'scope> ToScopeHandle<'scope> for Scope<'scope> {

#[derive(Debug)]
pub struct LocalScopeHandle<'scope> {
scope: *const Scope<'scope>,
scope: *const ScopeBase<'scope>,
}

impl<'scope> LocalScopeHandle<'scope> {
/// Caller guarantees that `*scope` will remain valid
/// until the scope completes. Since we acquire a ref,
/// that means it will remain valid until we release it.
unsafe fn new(scope: &Scope<'scope>) -> Self {
scope.job_completed_latch.increment();
LocalScopeHandle { scope: scope }
scope.base.increment();
LocalScopeHandle { scope: &scope.base }
}
}

Expand Down
Loading