Skip to content

Commit 01464c1

Browse files
committed
WIP Attempt at updating to use crossbeam-deque 0.6.1.
See <rayon-rs#590> <rayon-rs#601>
1 parent df86443 commit 01464c1

File tree

4 files changed

+51
-29
lines changed

4 files changed

+51
-29
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ exclude = ["ci"]
1919

2020
[dependencies]
2121
rayon-core = { version = "1.4", path = "rayon-core" }
22-
crossbeam-deque = "0.2.0"
22+
crossbeam-deque = "0.6.1"
2323

2424
# This is a public dependency!
2525
[dependencies.either]

rayon-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ lazy_static = "1"
2121
# This is deliberately not the latest version, because we want
2222
# to support older rustc than crossbeam-deque 0.3+ does.
2323
[dependencies.crossbeam-deque]
24-
version = "0.2.0"
24+
version = "0.6.1"
2525

2626
[dev-dependencies]
2727
rand = "0.5"

rayon-core/src/registry.rs

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
2-
use crossbeam_deque::{Deque, Steal, Stealer};
2+
use crossbeam_deque::{self as deque, Worker, Pop, Steal, Stealer};
33
use job::{JobRef, StackJob};
44
#[cfg(rayon_unstable)]
55
use job::Job;
@@ -46,7 +46,7 @@ pub struct Registry {
4646
}
4747

4848
struct RegistryState {
49-
job_injector: Deque<JobRef>,
49+
job_injector: Worker<JobRef>,
5050
}
5151

5252
/// ////////////////////////////////////////////////////////////////////////
@@ -100,16 +100,21 @@ impl Registry {
100100
let n_threads = builder.get_num_threads();
101101
let breadth_first = builder.get_breadth_first();
102102

103-
let inj_worker = Deque::new();
104-
let inj_stealer = inj_worker.stealer();
105-
let workers: Vec<_> = (0..n_threads)
106-
.map(|_| Deque::new())
107-
.collect();
108-
let stealers: Vec<_> = workers.iter().map(|d| d.stealer()).collect();
103+
let (inj_worker, inj_stealer) = deque::fifo();
104+
let (workers, stealers) = (0..n_threads)
105+
.fold(
106+
(Vec::with_capacity(n_threads), Vec::with_capacity(n_threads)),
107+
|(mut workers, mut stealers), _| {
108+
let (worker, stealer) = deque::fifo();
109+
workers.push(worker);
110+
stealers.push(stealer);
111+
112+
(workers, stealers)
113+
});
109114

110115
let registry = Arc::new(Registry {
111-
thread_infos: stealers.into_iter()
112-
.map(|s| ThreadInfo::new(s))
116+
thread_infos: stealers.iter()
117+
.map(|s| ThreadInfo::new(s.clone()))
113118
.collect(),
114119
state: Mutex::new(RegistryState::new(inj_worker)),
115120
sleep: Sleep::new(),
@@ -123,7 +128,7 @@ impl Registry {
123128
// If we return early or panic, make sure to terminate existing threads.
124129
let t1000 = Terminator(&registry);
125130

126-
for (index, worker) in workers.into_iter().enumerate() {
131+
for (index, (worker, stealer)) in workers.into_iter().zip(stealers.into_iter()).enumerate() {
127132
let registry = registry.clone();
128133
let mut b = thread::Builder::new();
129134
if let Some(name) = builder.get_thread_name(index) {
@@ -132,7 +137,7 @@ impl Registry {
132137
if let Some(stack_size) = builder.get_stack_size() {
133138
b = b.stack_size(stack_size);
134139
}
135-
if let Err(e) = b.spawn(move || unsafe { main_loop(worker, registry, index, breadth_first) }) {
140+
if let Err(e) = b.spawn(move || unsafe { main_loop(worker, stealer, registry, index, breadth_first) }) {
136141
return Err(ThreadPoolBuildError::new(ErrorKind::IOError(e)))
137142
}
138143
}
@@ -417,7 +422,7 @@ pub struct RegistryId {
417422
}
418423

419424
impl RegistryState {
420-
pub fn new(job_injector: Deque<JobRef>) -> RegistryState {
425+
pub fn new(job_injector: Worker<JobRef>) -> RegistryState {
421426
RegistryState {
422427
job_injector: job_injector,
423428
}
@@ -453,7 +458,10 @@ impl ThreadInfo {
453458
454459
pub struct WorkerThread {
455460
/// the "worker" half of our local deque
456-
worker: Deque<JobRef>,
461+
worker: Worker<JobRef>,
462+
463+
/// a stealer to allow us to take jobs from the bottom of our deque
464+
stealer: Stealer<JobRef>,
457465

458466
index: usize,
459467

@@ -513,7 +521,7 @@ impl WorkerThread {
513521

514522
#[inline]
515523
pub fn local_deque_is_empty(&self) -> bool {
516-
self.worker.len() == 0
524+
self.worker.is_empty()
517525
}
518526

519527
/// Attempts to obtain a "local" job -- typically this means
@@ -523,10 +531,16 @@ impl WorkerThread {
523531
#[inline]
524532
pub unsafe fn take_local_job(&self) -> Option<JobRef> {
525533
if !self.breadth_first {
526-
self.worker.pop()
534+
loop {
535+
match self.worker.pop() {
536+
Pop::Empty => return None,
537+
Pop::Data(d) => return Some(d),
538+
Pop::Retry => {},
539+
}
540+
}
527541
} else {
528542
loop {
529-
match self.worker.steal() {
543+
match self.stealer.steal() {
530544
Steal::Empty => return None,
531545
Steal::Data(d) => return Some(d),
532546
Steal::Retry => {},
@@ -596,7 +610,14 @@ impl WorkerThread {
596610
/// local work to do.
597611
unsafe fn steal(&self) -> Option<JobRef> {
598612
// we only steal when we don't have any work to do locally
599-
debug_assert!(self.worker.pop().is_none());
613+
let has_work = loop {
614+
match self.worker.pop() {
615+
Pop::Empty => break false,
616+
Pop::Data(_) => break true,
617+
Pop::Retry => {},
618+
}
619+
};
620+
debug_assert!(!has_work);
600621

601622
// otherwise, try to steal
602623
let num_threads = self.registry.thread_infos.len();
@@ -630,12 +651,14 @@ impl WorkerThread {
630651

631652
/// ////////////////////////////////////////////////////////////////////////
632653
633-
unsafe fn main_loop(worker: Deque<JobRef>,
654+
unsafe fn main_loop(worker: Worker<JobRef>,
655+
stealer: Stealer<JobRef>,
634656
registry: Arc<Registry>,
635657
index: usize,
636658
breadth_first: bool) {
637659
let worker_thread = WorkerThread {
638660
worker: worker,
661+
stealer: stealer,
639662
breadth_first: breadth_first,
640663
index: index,
641664
rng: XorShift64Star::new(),

src/iter/par_bridge.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crossbeam_deque::{Deque, Stealer, Steal};
1+
use crossbeam_deque::{self as deque, Worker, Stealer, Steal};
22

33
use std::thread::yield_now;
44
use std::sync::{Mutex, TryLockError};
@@ -78,10 +78,9 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
7878
where C: UnindexedConsumer<Self::Item>
7979
{
8080
let split_count = AtomicUsize::new(current_num_threads());
81-
let deque = Deque::new();
82-
let stealer = deque.stealer();
81+
let (worker, stealer) = deque::fifo();
8382
let done = AtomicBool::new(false);
84-
let iter = Mutex::new((self.iter, deque));
83+
let iter = Mutex::new((self.iter, worker));
8584

8685
bridge_unindexed(IterParallelProducer {
8786
split_count: &split_count,
@@ -95,7 +94,7 @@ impl<Iter: Iterator + Send> ParallelIterator for IterBridge<Iter>
9594
struct IterParallelProducer<'a, Iter: Iterator + 'a> {
9695
split_count: &'a AtomicUsize,
9796
done: &'a AtomicBool,
98-
iter: &'a Mutex<(Iter, Deque<Iter::Item>)>,
97+
iter: &'a Mutex<(Iter, Worker<Iter::Item>)>,
9998
items: Stealer<Iter::Item>,
10099
}
101100

@@ -159,11 +158,11 @@ impl<'a, Iter: Iterator + Send + 'a> UnindexedProducer for IterParallelProducer<
159158
let count = current_num_threads();
160159
let count = (count * count) * 2;
161160

162-
let (ref mut iter, ref deque) = *guard;
161+
let (ref mut iter, ref worker) = *guard;
163162

164-
while deque.len() < count {
163+
while worker.len() < count {
165164
if let Some(it) = iter.next() {
166-
deque.push(it);
165+
worker.push(it);
167166
} else {
168167
self.done.store(true, Ordering::SeqCst);
169168
break;

0 commit comments

Comments
 (0)