1+ use crossbeam:: sync:: SegQueue ;
12use crossbeam_deque:: { Deque , Steal , Stealer } ;
23#[ cfg( rayon_unstable) ]
34use internal:: task:: Task ;
45#[ cfg( rayon_unstable) ]
56use job:: Job ;
6- use job:: { JobRef , StackJob } ;
7+ use job:: { JobFifo , JobRef , StackJob } ;
78use latch:: { CountLatch , Latch , LatchProbe , LockLatch , SpinLatch , TickleLatch } ;
89use log:: Event :: * ;
910use sleep:: Sleep ;
@@ -13,7 +14,7 @@ use std::collections::hash_map::DefaultHasher;
1314use std:: hash:: Hasher ;
1415use std:: mem;
1516use std:: sync:: atomic:: { AtomicUsize , Ordering , ATOMIC_USIZE_INIT } ;
16- use std:: sync:: { Arc , Mutex , Once , ONCE_INIT } ;
17+ use std:: sync:: { Arc , Once , ONCE_INIT } ;
1718use std:: thread;
1819use std:: usize;
1920use unwind;
@@ -22,9 +23,8 @@ use {ErrorKind, ExitHandler, PanicHandler, StartHandler, ThreadPoolBuildError, T
2223
2324pub struct Registry {
2425 thread_infos : Vec < ThreadInfo > ,
25- state : Mutex < RegistryState > ,
2626 sleep : Sleep ,
27- job_uninjector : Stealer < JobRef > ,
27+ injected_jobs : SegQueue < JobRef > ,
2828 panic_handler : Option < Box < PanicHandler > > ,
2929 start_handler : Option < Box < StartHandler > > ,
3030 exit_handler : Option < Box < ExitHandler > > ,
@@ -45,10 +45,6 @@ pub struct Registry {
4545 terminate_latch : CountLatch ,
4646}
4747
48- struct RegistryState {
49- job_injector : Deque < JobRef > ,
50- }
51-
5248/// ////////////////////////////////////////////////////////////////////////
5349/// Initialization
5450
@@ -104,16 +100,13 @@ impl Registry {
104100 let n_threads = builder. get_num_threads ( ) ;
105101 let breadth_first = builder. get_breadth_first ( ) ;
106102
107- let inj_worker = Deque :: new ( ) ;
108- let inj_stealer = inj_worker. stealer ( ) ;
109103 let workers: Vec < _ > = ( 0 ..n_threads) . map ( |_| Deque :: new ( ) ) . collect ( ) ;
110104 let stealers: Vec < _ > = workers. iter ( ) . map ( |d| d. stealer ( ) ) . collect ( ) ;
111105
112106 let registry = Arc :: new ( Registry {
113107 thread_infos : stealers. into_iter ( ) . map ( |s| ThreadInfo :: new ( s) ) . collect ( ) ,
114- state : Mutex :: new ( RegistryState :: new ( inj_worker) ) ,
115108 sleep : Sleep :: new ( ) ,
116- job_uninjector : inj_stealer ,
109+ injected_jobs : SegQueue :: new ( ) ,
117110 terminate_latch : CountLatch :: new ( ) ,
118111 panic_handler : builder. take_panic_handler ( ) ,
119112 start_handler : builder. take_start_handler ( ) ,
@@ -175,6 +168,18 @@ impl Registry {
175168 }
176169 }
177170
171+ /// Returns the current `WorkerThread` if it's part of this `Registry`.
172+ pub fn current_thread ( & self ) -> Option < & WorkerThread > {
173+ unsafe {
174+ if let Some ( worker) = WorkerThread :: current ( ) . as_ref ( ) {
175+ if worker. registry ( ) . id ( ) == self . id ( ) {
176+ return Some ( worker) ;
177+ }
178+ }
179+ None
180+ }
181+ }
182+
178183 /// Returns an opaque identifier for this registry.
179184 pub fn id ( & self ) -> RegistryId {
180185 // We can rely on `self` not to change since we only ever create
@@ -297,39 +302,31 @@ impl Registry {
297302 log ! ( InjectJobs {
298303 count: injected_jobs. len( )
299304 } ) ;
300- {
301- let state = self . state . lock ( ) . unwrap ( ) ;
302-
303- // It should not be possible for `state.terminate` to be true
304- // here. It is only set to true when the user creates (and
305- // drops) a `ThreadPool`; and, in that case, they cannot be
306- // calling `inject()` later, since they dropped their
307- // `ThreadPool`.
308- assert ! (
309- !self . terminate_latch. probe( ) ,
310- "inject() sees state.terminate as true"
311- ) ;
312-
313- for & job_ref in injected_jobs {
314- state. job_injector . push ( job_ref) ;
315- }
305+
306+ // It should not be possible for `state.terminate` to be true
307+ // here. It is only set to true when the user creates (and
308+ // drops) a `ThreadPool`; and, in that case, they cannot be
309+ // calling `inject()` later, since they dropped their
310+ // `ThreadPool`.
311+ assert ! (
312+ !self . terminate_latch. probe( ) ,
313+ "inject() sees state.terminate as true"
314+ ) ;
315+
316+ for & job_ref in injected_jobs {
317+ self . injected_jobs . push ( job_ref) ;
316318 }
317319 self . sleep . tickle ( usize:: MAX ) ;
318320 }
319321
320322 fn pop_injected_job ( & self , worker_index : usize ) -> Option < JobRef > {
321- loop {
322- match self . job_uninjector . steal ( ) {
323- Steal :: Empty => return None ,
324- Steal :: Data ( d) => {
325- log ! ( UninjectedWork {
326- worker: worker_index
327- } ) ;
328- return Some ( d) ;
329- }
330- Steal :: Retry => { }
331- }
323+ let job = self . injected_jobs . try_pop ( ) ;
324+ if job. is_some ( ) {
325+ log ! ( UninjectedWork {
326+ worker: worker_index
327+ } ) ;
332328 }
329+ job
333330 }
334331
335332 /// If already in a worker-thread of this registry, just execute `op`.
@@ -439,14 +436,6 @@ pub struct RegistryId {
439436 addr : usize ,
440437}
441438
442- impl RegistryState {
443- pub fn new ( job_injector : Deque < JobRef > ) -> RegistryState {
444- RegistryState {
445- job_injector : job_injector,
446- }
447- }
448- }
449-
450439struct ThreadInfo {
451440 /// Latch set once thread has started and we are entering into the
452441 /// main loop. Used to wait for worker threads to become primed,
@@ -478,6 +467,9 @@ pub struct WorkerThread {
478467 /// the "worker" half of our local deque
479468 worker : Deque < JobRef > ,
480469
470+ /// local queue used for `spawn_fifo` indirection
471+ fifo : JobFifo ,
472+
481473 index : usize ,
482474
483475 /// are these workers configured to steal breadth-first or not?
@@ -534,6 +526,11 @@ impl WorkerThread {
534526 self . registry . sleep . tickle ( self . index ) ;
535527 }
536528
529+ #[ inline]
530+ pub unsafe fn push_fifo ( & self , job : JobRef ) {
531+ self . push ( self . fifo . push ( job) ) ;
532+ }
533+
537534 #[ inline]
538535 pub fn local_deque_is_empty ( & self ) -> bool {
539536 self . worker . len ( ) == 0
@@ -663,6 +660,7 @@ unsafe fn main_loop(
663660) {
664661 let worker_thread = WorkerThread {
665662 worker : worker,
663+ fifo : JobFifo :: new ( ) ,
666664 breadth_first : breadth_first,
667665 index : index,
668666 rng : XorShift64Star :: new ( ) ,
0 commit comments