Skip to content

Commit 0759efb

Browse files
committed
Arc<Scheduler> -> *mut Scheduler
This commit introduces the ditribution of Scheduler among Processors using a plain pointer. This can be done since we know that all Processor threads are joined anyways before Scheduler::run() ever returns.
1 parent fb3c393 commit 0759efb

File tree

2 files changed

+64
-36
lines changed

2 files changed

+64
-36
lines changed

src/runtime/processor.rs

Lines changed: 54 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ use std::boxed::FnBox;
2727
use std::cell::UnsafeCell;
2828
use std::io;
2929
use std::mem;
30-
use std::sync::Arc;
31-
use std::sync::mpsc::{self, Sender, Receiver};
30+
use std::ptr;
31+
use std::sync::mpsc::{self, Receiver, Sender};
3232
use std::thread::{self, Builder};
3333
use std::time::Duration;
3434

@@ -40,14 +40,23 @@ use coroutine::{self, Coroutine, State, Handle, SendableCoroutinePtr};
4040
use options::Options;
4141
use scheduler::Scheduler;
4242

43-
thread_local!(static PROCESSOR: UnsafeCell<Option<Processor>> = UnsafeCell::new(None));
43+
// TODO:
44+
// Reconsider making PROCESSOR a static object instead of a pointer.
45+
// This would simplify storage of Processor and improve performance of Processor::current()
46+
// Blockers:
47+
// - std::thread::LocalKeyState is unstable (to avoid triggering the lazy init.).
48+
// - Investigate if LLVM generates dynamic or non-dynamic ELF TLS.
49+
// "dynamic" TLS is allocated lazily at first access of the store (which is what we want)
50+
// instead of being allocated statically for every thread.
51+
// ELF TLS Paper: "ELF Handling For Thread-Local Storage".
52+
thread_local!(static PROCESSOR: UnsafeCell<*mut Processor> = UnsafeCell::new(ptr::null_mut()));
4453

4554
#[derive(Debug)]
4655
pub struct ForceUnwind;
4756

4857
/// Processing unit of a thread
4958
pub struct Processor {
50-
scheduler: Arc<Scheduler>,
59+
scheduler: *mut Scheduler,
5160

5261
main_coro: Handle,
5362
cur_running: Option<*mut Coroutine>,
@@ -70,7 +79,7 @@ unsafe impl Send for Processor {}
7079

7180
impl Processor {
7281
fn new_with_neighbors(_processor_id: usize,
73-
sched: Arc<Scheduler>,
82+
sched: *mut Scheduler,
7483
neigh: Vec<Stealer<SendableCoroutinePtr>>)
7584
-> Processor {
7685
let main_coro = unsafe { Coroutine::empty() };
@@ -99,28 +108,37 @@ impl Processor {
99108
}
100109
}
101110

102-
fn set_and_get_tls(p: Processor) -> &'static mut Processor {
103-
PROCESSOR.with(move |proc_opt| unsafe {
104-
*proc_opt.get() = Some(p);
105-
(*proc_opt.get()).as_mut().unwrap()
111+
fn set_tls(p: &mut Processor) {
112+
PROCESSOR.with(|proc_opt| unsafe {
113+
*proc_opt.get() = p;
114+
})
115+
}
116+
117+
fn unset_tls() {
118+
PROCESSOR.with(|proc_opt| unsafe {
119+
*proc_opt.get() = ptr::null_mut();
106120
})
107121
}
108122

109123
#[inline]
110124
pub fn run_with_neighbors(processor_id: usize,
111-
sched: Arc<Scheduler>,
125+
sched: *mut Scheduler,
112126
neigh: Vec<Stealer<SendableCoroutinePtr>>)
113127
-> (thread::JoinHandle<()>,
114128
Sender<ProcMessage>,
115129
Stealer<SendableCoroutinePtr>) {
116-
let p = Processor::new_with_neighbors(processor_id, sched, neigh);
130+
let mut p = Processor::new_with_neighbors(processor_id, sched, neigh);
117131
let msg = p.handle();
118132
let st = p.stealer();
133+
119134
let hdl = Builder::new()
120135
.name(format!("Processor #{}", processor_id))
121136
.spawn(move || {
122-
let mut p = Processor::set_and_get_tls(p);
123-
if let Err(err) = p.schedule() {
137+
Processor::set_tls(&mut p);
138+
let err = p.schedule();
139+
Processor::unset_tls();
140+
141+
if let Err(err) = err {
124142
panic!("Processor::schedule return Err: {:?}", err);
125143
}
126144
})
@@ -131,7 +149,7 @@ impl Processor {
131149

132150
#[inline]
133151
pub fn run_main<M, T>(processor_id: usize,
134-
sched: Arc<Scheduler>,
152+
sched: *mut Scheduler,
135153
f: M)
136154
-> (thread::JoinHandle<()>,
137155
Sender<ProcMessage>,
@@ -140,35 +158,49 @@ impl Processor {
140158
where M: FnOnce() -> T + Send + 'static,
141159
T: Send + 'static
142160
{
143-
let p = Processor::new_with_neighbors(processor_id, sched, Vec::new());
161+
let mut p = Processor::new_with_neighbors(processor_id, sched, Vec::new());
144162
let (msg, st) = (p.handle(), p.stealer());
145163
let (tx, rx) = ::std::sync::mpsc::channel();
146-
let hdl = Builder::new().name(format!("Processor #{}", processor_id)).spawn(move|| {
147-
let mut p = Processor::set_and_get_tls(p);
148-
let wrapper = move|| {
149-
let ret = unsafe { ::try(move|| f()) };
164+
165+
let hdl = Builder::new().name(format!("Processor #{}", processor_id)).spawn(move || {
166+
Processor::set_tls(&mut p);
167+
168+
let wrapper = move || {
169+
let ret = unsafe { ::try(move || f()) };
150170

151171
// No matter whether it is panicked or not, the result will be sent to the channel
152172
let _ = tx.send(ret); // Just ignore if it failed
153173
};
154174
p.spawn_opts(Box::new(wrapper), Options::default());
155175

156-
if let Err(err) = p.schedule() {
176+
let err = p.schedule();
177+
Processor::unset_tls();
178+
179+
if let Err(err) = err {
157180
panic!("Processor::schedule return Err: {:?}", err);
158181
}
159182
}).unwrap();
183+
160184
(hdl, msg, st, rx)
161185
}
162186

163187
#[inline]
164188
pub fn scheduler(&self) -> &Scheduler {
165-
&*self.scheduler
189+
unsafe { &*self.scheduler }
166190
}
167191

168192
/// Get the thread local processor
169193
#[inline]
170194
pub fn current() -> Option<&'static mut Processor> {
171-
PROCESSOR.with(|proc_opt| unsafe { (*proc_opt.get()).as_mut() })
195+
PROCESSOR.with(|proc_opt| unsafe {
196+
let p: *mut Processor = *proc_opt.get();
197+
198+
if p.is_null() {
199+
None
200+
} else {
201+
Some(&mut *p)
202+
}
203+
})
172204
}
173205

174206
#[inline]

src/scheduler.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
//! Global coroutine scheduler
2323
2424
use std::sync::atomic::{AtomicUsize, Ordering};
25-
use std::sync::Arc;
2625
use std::sync::mpsc::{Sender, TryRecvError};
2726
use std::default::Default;
2827
use std::any::Any;
@@ -249,11 +248,10 @@ impl Scheduler {
249248
}
250249

251250
/// Run the scheduler
252-
pub fn run<M, R>(self, main_fn: M) -> Result<R, Box<Any + Send + 'static>>
251+
pub fn run<M, R>(&mut self, main_fn: M) -> Result<R, Box<Any + Send + 'static>>
253252
where M: FnOnce() -> R + Send + 'static,
254253
R: Send + 'static
255254
{
256-
let the_sched = Arc::new(self);
257255
let mut handles = Vec::new();
258256

259257
let mut processor_handlers: Vec<Sender<ProcMessage>> = Vec::new();
@@ -262,7 +260,7 @@ impl Scheduler {
262260
// Run the main function
263261
let main_coro_hdl = {
264262
// The first worker
265-
let (hdl, msg, st, main_hdl) = Processor::run_main(0, the_sched.clone(), main_fn);
263+
let (hdl, msg, st, main_hdl) = Processor::run_main(0, self, main_fn);
266264
handles.push(hdl);
267265

268266
processor_handlers.push(msg);
@@ -273,9 +271,9 @@ impl Scheduler {
273271

274272
{
275273
// The others
276-
for tid in 1..the_sched.expected_worker_count {
274+
for tid in 1..self.expected_worker_count {
277275
let (hdl, msg, st) = Processor::run_with_neighbors(tid,
278-
the_sched.clone(),
276+
self,
279277
processor_stealers.clone());
280278

281279
for msg in processor_handlers.iter() {
@@ -294,10 +292,8 @@ impl Scheduler {
294292
// The scheduler loop
295293
loop {
296294
{
297-
let io_handler: &mut IoHandler = unsafe { &mut *the_sched.io_handler.get() };
298-
let event_loop: &mut EventLoop<IoHandler> = unsafe {
299-
&mut *the_sched.eventloop.get()
300-
};
295+
let io_handler: &mut IoHandler = unsafe { &mut *self.io_handler.get() };
296+
let event_loop: &mut EventLoop<IoHandler> = unsafe { &mut *self.eventloop.get() };
301297

302298
event_loop.run_once(io_handler, Some(100)).unwrap();
303299
}
@@ -312,14 +308,14 @@ impl Scheduler {
312308

313309
{
314310
let event_loop: &mut EventLoop<IoHandler> = unsafe {
315-
&mut *the_sched.eventloop.get()
316-
};
317-
let io_handler: &mut IoHandler = unsafe {
318-
&mut *the_sched.io_handler.get()
311+
&mut *self.eventloop.get()
319312
};
313+
let io_handler: &mut IoHandler = unsafe { &mut *self.io_handler.get() };
320314
io_handler.wakeup_all(event_loop);
321315
}
322316

317+
// NOTE: It's critical that all threads are joined since Processor
318+
// maintains a reference to this Scheduler using raw pointers.
323319
for hdl in handles {
324320
hdl.join().unwrap();
325321
}

0 commit comments

Comments
 (0)