Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rayon-core/src/internal/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
/// Represents a task that can be scheduled onto the Rayon
/// thread-pool. Once a task is scheduler, it will execute exactly
/// once (eventually).
pub trait Task: Send {
pub trait Task: Send + Sync {
fn execute(this: Arc<Self>);
}

Expand Down
27 changes: 19 additions & 8 deletions rayon-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum JobResult<T> {
/// deque while the main worker manages the bottom of the deque. This
/// deque is managed by the `thread_pool` module.
pub trait Job {
/// Unsafe: this may be called from a different thread than the one
/// which scheduled the job, so the implementer must ensure the
/// appropriate traits are met, whether `Send`, `Sync`, or both.
unsafe fn execute(this: *const Self);
}

Expand Down Expand Up @@ -61,14 +64,20 @@ impl JobRef {
/// A job that will be owned by a stack slot. This means that when it
/// executes it need not free any heap data, the cleanup occurs when
/// the stack frame is later popped.
pub struct StackJob<L: Latch, F, R> {
pub struct StackJob<L, F, R>
where L: Latch + Sync,
F: FnOnce() -> R + Send,
R: Send
{
pub latch: L,
func: UnsafeCell<Option<F>>,
result: UnsafeCell<JobResult<R>>,
}

impl<L: Latch, F, R> StackJob<L, F, R>
where F: FnOnce() -> R + Send
impl<L, F, R> StackJob<L, F, R>
where L: Latch + Sync,
F: FnOnce() -> R + Send,
R: Send
{
pub fn new(func: F, latch: L) -> StackJob<L, F, R> {
StackJob {
Expand All @@ -91,8 +100,10 @@ impl<L: Latch, F, R> StackJob<L, F, R>
}
}

impl<L: Latch, F, R> Job for StackJob<L, F, R>
where F: FnOnce() -> R
impl<L, F, R> Job for StackJob<L, F, R>
where L: Latch + Sync,
F: FnOnce() -> R + Send,
R: Send
{
unsafe fn execute(this: *const Self) {
let this = &*this;
Expand All @@ -114,13 +125,13 @@ impl<L: Latch, F, R> Job for StackJob<L, F, R>
///
/// (Probably `StackJob` should be refactored in a similar fashion.)
pub struct HeapJob<BODY>
where BODY: FnOnce()
where BODY: FnOnce() + Send
{
job: UnsafeCell<Option<BODY>>,
}

impl<BODY> HeapJob<BODY>
where BODY: FnOnce()
where BODY: FnOnce() + Send
{
pub fn new(func: BODY) -> Self {
HeapJob { job: UnsafeCell::new(Some(func)) }
Expand All @@ -136,7 +147,7 @@ impl<BODY> HeapJob<BODY>
}

impl<BODY> Job for HeapJob<BODY>
where BODY: FnOnce()
where BODY: FnOnce() + Send
{
unsafe fn execute(this: *const Self) {
let this: Box<Self> = mem::transmute(this);
Expand Down
48 changes: 32 additions & 16 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,36 @@ impl Registry {
stolen
}

/// If already in a worker-thread of this registry, just execute `op`.
/// Otherwise, inject `op` in this thread-pool. Either way, block until `op`
/// completes and return its return value. If `op` panics, that panic will
/// be propagated as well.
pub fn in_worker<OP, R>(&self, op: OP) -> R
where OP: FnOnce(&WorkerThread) -> R + Send, R: Send
{
unsafe {
let worker_thread = WorkerThread::current();
if !worker_thread.is_null() && (*worker_thread).registry().id() == self.id() {
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
op(&*worker_thread)
} else {
self.in_worker_cold(op)
}
}
}

#[cold]
unsafe fn in_worker_cold<OP, R>(&self, op: OP) -> R
where OP: FnOnce(&WorkerThread) -> R + Send, R: Send
{
let job = StackJob::new(|| in_worker(op), LockLatch::new());
self.inject(&[job.as_job_ref()]);
job.latch.wait();
job.into_result()
}

/// Increment the terminate counter. This increment should be
/// balanced by a call to `terminate`, which will decrement. This
/// is used when spawning asynchronous work, which needs to
Expand Down Expand Up @@ -644,23 +674,9 @@ pub fn in_worker<OP, R>(op: OP) -> R
// Perfectly valid to give them a `&T`: this is the
// current thread, so we know the data structure won't be
// invalidated until we return.
return op(&*owner_thread);
op(&*owner_thread)
} else {
return in_worker_cold(op);
global_registry().in_worker_cold(op)
}
}
}

#[cold]
unsafe fn in_worker_cold<OP, R>(op: OP) -> R
where OP: FnOnce(&WorkerThread) -> R + Send, R: Send
{
// never run from a worker thread; just shifts over into worker threads
debug_assert!(WorkerThread::current().is_null());
let job = StackJob::new(|| in_worker(op), LockLatch::new());
global_registry().inject(&[job.as_job_ref()]);
job.latch.wait();
job.into_result()
}


2 changes: 1 addition & 1 deletion rayon-core/src/scope/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<'scope> Drop for LocalScopeHandle<'scope> {
unsafe impl<'scope> ScopeHandle<'scope> for LocalScopeHandle<'scope> {
unsafe fn spawn_task<T: Task + 'scope>(&self, task: Arc<T>) {
let scope = &*self.scope;
(*scope.owner_thread).registry().submit_task(task);
scope.registry.submit_task(task);
}

fn ok(self) {
Expand Down
51 changes: 28 additions & 23 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::Arc;
use std::sync::atomic::{AtomicPtr, Ordering};
use registry::{in_worker, WorkerThread};
use registry::{in_worker, WorkerThread, Registry};
use unwind;

#[cfg(test)]
Expand All @@ -21,7 +22,10 @@ pub struct Scope<'scope> {
/// thread where `scope()` was executed (note that individual jobs
/// may be executing on different worker threads, though they
/// should always be within the same pool of threads)
owner_thread: *const WorkerThread,
owner_thread_index: usize,

/// thread registry where `scope()` was executed.
registry: Arc<Registry>,

/// if some job panicked, the error is stored here; it will be
/// propagated to the one who created the scope
Expand All @@ -30,9 +34,11 @@ pub struct Scope<'scope> {
/// latch to set when the counter drops to zero (and hence this scope is complete)
job_completed_latch: CountLatch,

/// you can think of a scope as containing a list of closures to
/// execute, all of which outlive `'scope`
marker: PhantomData<Box<FnOnce(&Scope<'scope>) + 'scope>>,
/// You can think of a scope as containing a list of closures to execute,
/// all of which outlive `'scope`. They're not actually required to be
/// `Sync`, but it's still safe to let the `Scope` implement `Sync` because
/// the closures are only *moved* across threads to be executed.
marker: PhantomData<Box<FnOnce(&Scope<'scope>) + Send + Sync + 'scope>>,
}

/// Create a "fork-join" scope `s` and invokes the closure with a
Expand Down Expand Up @@ -250,13 +256,14 @@ pub fn scope<'scope, OP, R>(op: OP) -> R
in_worker(|owner_thread| {
unsafe {
let scope: Scope<'scope> = Scope {
owner_thread: owner_thread as *const WorkerThread as *mut WorkerThread,
owner_thread_index: owner_thread.index(),
registry: owner_thread.registry().clone(),
panic: AtomicPtr::new(ptr::null_mut()),
job_completed_latch: CountLatch::new(),
marker: PhantomData,
};
let result = scope.execute_job_closure(op);
scope.steal_till_jobs_complete();
scope.steal_till_jobs_complete(owner_thread);
result.unwrap() // only None if `op` panicked, and that would have been propagated
}
})
Expand All @@ -269,20 +276,17 @@ impl<'scope> Scope<'scope> {
/// own reference to `self` as argument. This can be used to
/// inject new jobs into `self`.
pub fn spawn<BODY>(&self, body: BODY)
where BODY: FnOnce(&Scope<'scope>) + 'scope
where BODY: FnOnce(&Scope<'scope>) + Send + 'scope
{
unsafe {
self.job_completed_latch.increment();
let job_ref = Box::new(HeapJob::new(move || self.execute_job(body)))
.as_job_ref();
let worker_thread = WorkerThread::current();

// the `Scope` is not send or sync, and we only give out
// pointers to it from within a worker thread
debug_assert!(!WorkerThread::current().is_null());

let worker_thread = &*worker_thread;
worker_thread.push(job_ref);
// Since `Scope` implements `Sync`, we can't be sure
// that we're still in a thread of this pool, so we
// can't just push to the local worker thread.
self.registry.inject_or_push(job_ref);
}
}

Expand Down Expand Up @@ -315,43 +319,44 @@ impl<'scope> Scope<'scope> {
let nil = ptr::null_mut();
let mut err = Box::new(err); // box up the fat ptr
if self.panic.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed).is_ok() {
log!(JobPanickedErrorStored { owner_thread: (*self.owner_thread).index() });
log!(JobPanickedErrorStored { owner_thread: self.owner_thread_index });
mem::forget(err); // ownership now transferred into self.panic
} else {
log!(JobPanickedErrorNotStored { owner_thread: (*self.owner_thread).index() });
log!(JobPanickedErrorNotStored { owner_thread: self.owner_thread_index });
}


self.job_completed_latch.set();
}

unsafe fn job_completed_ok(&self) {
log!(JobCompletedOk { owner_thread: (*self.owner_thread).index() });
log!(JobCompletedOk { owner_thread: self.owner_thread_index });
self.job_completed_latch.set();
}

unsafe fn steal_till_jobs_complete(&self) {
unsafe fn steal_till_jobs_complete(&self, owner_thread: &WorkerThread) {
// wait for job counter to reach 0:
(*self.owner_thread).wait_until(&self.job_completed_latch);
owner_thread.wait_until(&self.job_completed_latch);

// propagate panic, if any occurred; at this point, all
// outstanding jobs have completed, so we can use a relaxed
// ordering:
let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
if !panic.is_null() {
log!(ScopeCompletePanicked { owner_thread: (*self.owner_thread).index() });
log!(ScopeCompletePanicked { owner_thread: owner_thread.index() });
let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic);
unwind::resume_unwinding(*value);
} else {
log!(ScopeCompleteNoPanic { owner_thread: (*self.owner_thread).index() });
log!(ScopeCompleteNoPanic { owner_thread: owner_thread.index() });
}
}
}

impl<'scope> fmt::Debug for Scope<'scope> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Scope")
.field("owner_thread", &self.owner_thread)
.field("pool_id", &self.registry.id())
.field("owner_thread_index", &self.owner_thread_index)
.field("panic", &self.panic)
.field("job_completed_latch", &self.job_completed_latch)
.finish()
Expand Down
4 changes: 2 additions & 2 deletions rayon-core/src/scope/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,12 @@ fn divide_and_conquer_seq(counter: &AtomicUsize, size: usize) {
}
}

struct Tree<T> {
struct Tree<T: Send> {
value: T,
children: Vec<Tree<T>>,
}

impl<T> Tree<T> {
impl<T: Send> Tree<T> {
pub fn iter<'s>(&'s self) -> impl Iterator<Item = &'s T> + 's {
once(&self.value)
.chain(self.children.iter().flat_map(|c| c.iter()))
Expand Down
38 changes: 29 additions & 9 deletions rayon-core/src/thread_pool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use Configuration;
use latch::LockLatch;
#[allow(unused_imports)]
use log::Event::*;
use job::StackJob;
use join;
use {scope, Scope};
use spawn;
Expand Down Expand Up @@ -86,6 +84,32 @@ impl ThreadPool {
/// thread-local data from the current thread will not be
/// accessible.
///
/// # Warning: inter-pool deadlocks
///
/// If a thread within a threadpool calls `install()` for some
/// other threadpool, that thread will block, unable to participate
/// in its own pool until that call is done. If the other pool were
/// to call `install()` back to the first, then they'll both be blocked.
///
/// ```rust,ignore
/// # use rayon_core as rayon;
/// let pool1 = rayon::Configuration::new().num_threads(1).build().unwrap();
/// let pool2 = rayon::Configuration::new().num_threads(1).build().unwrap();
///
/// pool1.install(|| {
/// // this will block pool1's thread:
/// pool2.install(|| {
/// // this will block pool2's thread:
/// pool1.install(|| {
/// // there are no threads left to run this!
/// println!("hello?");
/// });
/// });
/// });
/// ```
///
/// (Note: Any blocking in rayon threads is generally discouraged.)
///
/// # Panics
///
/// If `op` should panic, that panic will be propagated.
Expand All @@ -109,14 +133,10 @@ impl ThreadPool {
/// }
/// ```
pub fn install<OP, R>(&self, op: OP) -> R
where OP: FnOnce() -> R + Send
where OP: FnOnce() -> R + Send,
R: Send
{
unsafe {
let job_a = StackJob::new(op, LockLatch::new());
self.registry.inject(&[job_a.as_job_ref()]);
job_a.latch.wait();
job_a.into_result()
}
self.registry.in_worker(|_| op())
}

/// Returns the (current) number of threads in the thread pool.
Expand Down
8 changes: 8 additions & 0 deletions rayon-core/src/thread_pool/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,11 @@ fn panic_thread_name() {
assert_eq!(5, wait_for_counter(start_count));
assert_eq!(5, wait_for_counter(exit_count));
}

#[test]
fn self_install() {
let pool = Configuration::new().num_threads(1).build().unwrap();

// If the inner `install` blocks, then nothing will actually run it!
assert!(pool.install(|| pool.install(|| true)));
}