Skip to content

Commit fb3c393

Browse files
committed
Hardened Processor::current() by making it return an Optional
1 parent 25b27b8 commit fb3c393

File tree

10 files changed

+134
-136
lines changed

10 files changed

+134
-136
lines changed

Cargo.toml

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@ rand = "*"
1717
time = "*"
1818

1919
[dependencies]
20-
log = "^0.3.1"
21-
rand = "^0.3.10"
2220
chrono = "^0.2.15"
23-
libc = "^0.1.10"
21+
context = { git = "https://github.com/zonyitoo/context-rs.git" }
2422
deque = "^0.2.3"
23+
libc = "^0.1.10"
24+
log = "^0.3.1"
2525
mio = "^0.5.0"
26-
27-
[dependencies.context]
28-
git = "https://github.com/zonyitoo/context-rs.git"
26+
rand = "^0.3.10"

src/coroutine.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ extern "C" fn coroutine_initialize(_: usize, f: *mut libc::c_void) -> ! {
4040
func()
4141
};
4242

43-
let processor = Processor::current();
44-
processor.yield_with(Ok(State::Finished));
43+
Processor::current().unwrap().yield_with(Ok(State::Finished));
4544

4645
unreachable!("Should not reach here");
4746
}

src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ pub fn sched() {
8181
/// Put the current coroutine to sleep for the specific amount of time
8282
#[inline]
8383
pub fn sleep_ms(ms: u64) {
84-
Scheduler::instance().sleep_ms(ms);
84+
if let Some(s) = Scheduler::instance() {
85+
s.sleep_ms(ms);
86+
}
8587
}
8688

8789
/// Put the current coroutine to sleep for the specific amount of time
8890
#[inline]
8991
pub fn sleep(duration: Duration) {
90-
Scheduler::instance().sleep(duration)
92+
if let Some(s) = Scheduler::instance() {
93+
s.sleep(duration);
94+
}
9195
}
9296

9397
/// Coroutine configuration. Provides detailed control over the properties and behavior of new coroutines.

src/net/tcp.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl TcpListener {
5656
}
5757

5858
loop {
59-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
59+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
6060

6161
match self.0.accept() {
6262
Ok(None) => {
@@ -188,7 +188,7 @@ impl io::Read for TcpStream {
188188
Err(ref err) if err.kind() == ErrorKind::NotConnected => {
189189
// If the socket is still still connecting, just register it into the loop
190190
debug!("Read: Going to register event, socket is not connected");
191-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
191+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
192192
debug!("Read: Got read event");
193193
try!(self.take_socket_error());
194194
}
@@ -200,7 +200,7 @@ impl io::Read for TcpStream {
200200

201201
loop {
202202
debug!("Read: Going to register event");
203-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
203+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
204204
debug!("Read: Got read event");
205205

206206
match self.0.try_read(buf) {
@@ -236,7 +236,7 @@ impl io::Write for TcpStream {
236236
Err(ref err) if err.kind() == ErrorKind::NotConnected => {
237237
// If the socket is still still connecting, just register it into the loop
238238
debug!("Write: Going to register event, socket is not connected");
239-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
239+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
240240
debug!("Write: Got write event");
241241
try!(self.take_socket_error());
242242
}
@@ -246,7 +246,7 @@ impl io::Write for TcpStream {
246246

247247
loop {
248248
debug!("Write: Going to register event");
249-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
249+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
250250
debug!("Write: Got write event");
251251

252252
match self.0.try_write(buf) {
@@ -273,7 +273,7 @@ impl io::Write for TcpStream {
273273

274274
loop {
275275
debug!("Write: Going to register event");
276-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
276+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
277277
debug!("Write: Got write event");
278278

279279
match self.0.flush() {

src/net/udp.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ impl UdpSocket {
6161
debug!("UdpSocket send_to WOULDBLOCK");
6262

6363
loop {
64-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
64+
try!(Scheduler::instance()
65+
.unwrap()
66+
.wait_event(&self.0, EventSet::writable()));
6567

6668
match self.0.send_to(buf, &addr) {
6769
Ok(None) => {
@@ -97,7 +99,7 @@ impl UdpSocket {
9799
}
98100

99101
loop {
100-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
102+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
101103

102104
match try!(self.0.recv_from(buf)) {
103105
None => {

src/net/unix.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Read for UnixStream {
123123

124124
loop {
125125
debug!("Read: Going to register event");
126-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
126+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
127127
debug!("Read: Got read event");
128128

129129
match self.0.try_read(buf) {
@@ -157,7 +157,7 @@ impl Write for UnixStream {
157157

158158
loop {
159159
debug!("Write: Going to register event");
160-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
160+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
161161
debug!("Write: Got write event");
162162

163163
match self.0.try_write(buf) {
@@ -184,7 +184,7 @@ impl Write for UnixStream {
184184

185185
loop {
186186
debug!("Write: Going to register event");
187-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
187+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
188188
debug!("Write: Got write event");
189189

190190
match self.0.flush() {
@@ -252,7 +252,7 @@ impl UnixListener {
252252
}
253253

254254
loop {
255-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
255+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
256256

257257
match self.0.accept() {
258258
Ok(None) => {
@@ -330,7 +330,7 @@ impl Read for PipeReader {
330330

331331
loop {
332332
debug!("Read: Going to register event");
333-
try!(Scheduler::instance().wait_event(&self.0, EventSet::readable()));
333+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::readable()));
334334
debug!("Read: Got read event");
335335

336336
match self.0.try_read(buf) {
@@ -399,7 +399,7 @@ impl Write for PipeWriter {
399399

400400
loop {
401401
debug!("Write: Going to register event");
402-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
402+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
403403
debug!("Write: Got write event");
404404

405405
match self.0.try_write(buf) {
@@ -426,7 +426,7 @@ impl Write for PipeWriter {
426426

427427
loop {
428428
debug!("Write: Going to register event");
429-
try!(Scheduler::instance().wait_event(&self.0, EventSet::writable()));
429+
try!(Scheduler::instance().unwrap().wait_event(&self.0, EventSet::writable()));
430430
debug!("Write: Got write event");
431431

432432
match self.0.flush() {

src/runtime/processor.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ use std::boxed::FnBox;
2727
use std::cell::UnsafeCell;
2828
use std::io;
2929
use std::mem;
30-
use std::ptr;
3130
use std::sync::Arc;
3231
use std::sync::mpsc::{self, Sender, Receiver};
3332
use std::thread::{self, Builder};
@@ -41,7 +40,7 @@ use coroutine::{self, Coroutine, State, Handle, SendableCoroutinePtr};
4140
use options::Options;
4241
use scheduler::Scheduler;
4342

44-
thread_local!(static PROCESSOR: UnsafeCell<*mut Processor> = UnsafeCell::new(ptr::null_mut()));
43+
thread_local!(static PROCESSOR: UnsafeCell<Option<Processor>> = UnsafeCell::new(None));
4544

4645
#[derive(Debug)]
4746
pub struct ForceUnwind;
@@ -100,21 +99,27 @@ impl Processor {
10099
}
101100
}
102101

102+
fn set_and_get_tls(p: Processor) -> &'static mut Processor {
103+
PROCESSOR.with(move |proc_opt| unsafe {
104+
*proc_opt.get() = Some(p);
105+
(*proc_opt.get()).as_mut().unwrap()
106+
})
107+
}
108+
103109
#[inline]
104110
pub fn run_with_neighbors(processor_id: usize,
105111
sched: Arc<Scheduler>,
106112
neigh: Vec<Stealer<SendableCoroutinePtr>>)
107113
-> (thread::JoinHandle<()>,
108114
Sender<ProcMessage>,
109115
Stealer<SendableCoroutinePtr>) {
110-
let mut p = Processor::new_with_neighbors(processor_id, sched, neigh);
111-
let (msg, st) = (p.handle(), p.stealer());
116+
let p = Processor::new_with_neighbors(processor_id, sched, neigh);
117+
let msg = p.handle();
118+
let st = p.stealer();
112119
let hdl = Builder::new()
113120
.name(format!("Processor #{}", processor_id))
114121
.spawn(move || {
115-
// Set to thread local
116-
PROCESSOR.with(|proc_ptr| unsafe { *proc_ptr.get() = &mut p });
117-
122+
let mut p = Processor::set_and_get_tls(p);
118123
if let Err(err) = p.schedule() {
119124
panic!("Processor::schedule return Err: {:?}", err);
120125
}
@@ -135,15 +140,11 @@ impl Processor {
135140
where M: FnOnce() -> T + Send + 'static,
136141
T: Send + 'static
137142
{
138-
let mut p = Processor::new_with_neighbors(processor_id, sched, Vec::new());
143+
let p = Processor::new_with_neighbors(processor_id, sched, Vec::new());
139144
let (msg, st) = (p.handle(), p.stealer());
140145
let (tx, rx) = ::std::sync::mpsc::channel();
141146
let hdl = Builder::new().name(format!("Processor #{}", processor_id)).spawn(move|| {
142-
// Set to thread local
143-
PROCESSOR.with(|proc_ptr| unsafe {
144-
*proc_ptr.get() = &mut p
145-
});
146-
147+
let mut p = Processor::set_and_get_tls(p);
147148
let wrapper = move|| {
148149
let ret = unsafe { ::try(move|| f()) };
149150

@@ -164,16 +165,22 @@ impl Processor {
164165
&*self.scheduler
165166
}
166167

168+
/// Get the thread local processor
169+
#[inline]
170+
pub fn current() -> Option<&'static mut Processor> {
171+
PROCESSOR.with(|proc_opt| unsafe { (*proc_opt.get()).as_mut() })
172+
}
173+
167174
#[inline]
168175
// Get the current running coroutine
169-
pub unsafe fn running(&mut self) -> Option<*mut Coroutine> {
176+
pub fn running(&mut self) -> Option<*mut Coroutine> {
170177
self.cur_running
171178
}
172179

173-
/// Get the thread local processor
174180
#[inline]
175-
pub fn current() -> &'static mut Processor {
176-
PROCESSOR.with(|p| unsafe { &mut **p.get() })
181+
// Get the current running coroutine
182+
pub fn current_running() -> Option<*mut Coroutine> {
183+
Processor::current().and_then(Processor::running)
177184
}
178185

179186
#[inline]
@@ -188,7 +195,7 @@ impl Processor {
188195

189196
#[inline]
190197
// Call by scheduler
191-
pub unsafe fn ready(&mut self, coro_ptr: *mut Coroutine) {
198+
pub fn ready(&mut self, coro_ptr: *mut Coroutine) {
192199
self.has_ready_tasks = true;
193200
self.queue_worker.push(SendableCoroutinePtr(coro_ptr));
194201
}
@@ -211,7 +218,7 @@ impl Processor {
211218
}
212219

213220
#[inline]
214-
unsafe fn run_with_all_local_tasks(&mut self, coro_ptr: *mut Coroutine) {
221+
fn run_with_all_local_tasks(&mut self, coro_ptr: *mut Coroutine) {
215222
let mut hdl = coro_ptr;
216223
loop {
217224
let is_suspended = match self.resume(hdl) {
@@ -252,9 +259,7 @@ impl Processor {
252259
'outerloop: loop {
253260
// 1. Run all tasks in local queue
254261
if let Some(hdl) = self.queue_worker.pop() {
255-
unsafe {
256-
self.run_with_all_local_tasks(hdl.0);
257-
}
262+
self.run_with_all_local_tasks(hdl.0);
258263
} else {
259264
self.has_ready_tasks = false;
260265
}
@@ -266,10 +271,10 @@ impl Processor {
266271
ProcMessage::Shutdown => {
267272
self.destroy_all_coroutines();
268273
}
269-
ProcMessage::Ready(SendableCoroutinePtr(ptr)) => unsafe {
274+
ProcMessage::Ready(SendableCoroutinePtr(ptr)) => {
270275
self.ready(ptr);
271276
self.has_ready_tasks = true;
272-
},
277+
}
273278
}
274279
}
275280

@@ -288,9 +293,7 @@ impl Processor {
288293
for idx in (0..self.neighbor_stealers.len()).map(|x| (x + rand_idx) % total_stealers) {
289294
if let Stolen::Data(SendableCoroutinePtr(hdl)) = self.neighbor_stealers[idx]
290295
.steal() {
291-
unsafe {
292-
self.run_with_all_local_tasks(hdl);
293-
}
296+
self.run_with_all_local_tasks(hdl);
294297
continue 'outerloop;
295298
}
296299
}
@@ -312,9 +315,7 @@ impl Processor {
312315

313316
// 1. Drain the work queue.
314317
if let Some(hdl) = self.queue_worker.pop() {
315-
unsafe {
316-
self.run_with_all_local_tasks(hdl.0);
317-
}
318+
self.run_with_all_local_tasks(hdl.0);
318319
}
319320
}
320321

0 commit comments

Comments
 (0)