Skip to content
This repository was archived by the owner on Nov 5, 2018. It is now read-only.

Commit 60eed74

Browse files
authored
Use vector for dtors in thread and remove defer() (#32)
* Use vector for dtors in thread * Add tests * Fix semantics of defer and join * Rustfmt * Remove Scope::defer() * Update comments * Revise (@stjepang's comments)
1 parent 5380e22 commit 60eed74

1 file changed

Lines changed: 152 additions & 75 deletions

File tree

src/thread.rs

Lines changed: 152 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,17 @@
106106
/// ```
107107
///
108108
/// Much more straightforward.
109-
109+
use std::any::Any;
110110
use std::cell::RefCell;
111111
use std::fmt;
112+
use std::io;
112113
use std::marker::PhantomData;
113114
use std::mem;
114115
use std::ops::DerefMut;
115-
use std::panic::{self, AssertUnwindSafe};
116+
use std::panic;
116117
use std::rc::Rc;
117-
use std::thread;
118-
use std::io;
119118
use std::sync::{Arc, Mutex};
119+
use std::thread;
120120

121121
#[doc(hidden)]
122122
trait FnBox<T> {
@@ -152,50 +152,36 @@ where
152152
{
153153
let closure: Box<FnBox<T> + 'a> = Box::new(f);
154154
let closure: Box<FnBox<T> + Send> = mem::transmute(closure);
155-
builder.spawn(move || {
156-
closure.call_box()
157-
})
155+
builder.spawn(move || closure.call_box())
158156
}
159157

160158
pub struct Scope<'env> {
161-
/// The list of the deferred functions and thread join jobs.
162-
dtors: RefCell<Option<DtorChain<'env, thread::Result<()>>>>,
159+
/// The list of the thread join jobs.
160+
joins: RefCell<Vec<Box<FnBox<thread::Result<()>> + 'env>>>,
161+
/// Thread panics invoked so far.
162+
panics: RefCell<Vec<Box<Any + Send + 'static>>>,
163163
// !Send + !Sync
164164
_marker: PhantomData<*const ()>,
165165
}
166166

167-
struct DtorChain<'env, T> {
168-
dtor: Box<FnBox<T> + 'env>,
169-
next: Option<Box<DtorChain<'env, T>>>,
170-
}
171-
172-
impl<'env, T> DtorChain<'env, T> {
173-
pub fn pop(chain: &mut Option<DtorChain<'env, T>>) -> Option<Box<FnBox<T> + 'env>> {
174-
chain.take().map(|mut node| {
175-
*chain = node.next.take().map(|b| *b);
176-
node.dtor
177-
})
178-
}
179-
}
180-
181167
struct JoinState<T> {
182168
join_handle: thread::JoinHandle<()>,
183-
result: ScopedThreadResult<T>
169+
result: ScopedThreadResult<T>,
184170
}
185171

186172
impl<T> JoinState<T> {
187173
fn new(join_handle: thread::JoinHandle<()>, result: ScopedThreadResult<T>) -> JoinState<T> {
188174
JoinState {
189175
join_handle,
190-
result
176+
result,
191177
}
192178
}
193179

194180
fn join(self) -> thread::Result<T> {
195181
let result = self.result;
196-
self.join_handle.join().map(|_| {
197-
result.lock().unwrap().take().unwrap()
198-
})
182+
self.join_handle
183+
.join()
184+
.map(|_| result.lock().unwrap().take().unwrap())
199185
}
200186
}
201187

@@ -210,39 +196,49 @@ pub struct ScopedJoinHandle<'scope, T: 'scope> {
210196
unsafe impl<'scope, T> Send for ScopedJoinHandle<'scope, T> {}
211197
unsafe impl<'scope, T> Sync for ScopedJoinHandle<'scope, T> {}
212198

213-
/// Create a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn).
199+
/// Creates a new `Scope` for [*scoped thread spawning*](struct.Scope.html#method.spawn).
214200
///
215-
/// In addition, you can [register ad-hoc functions](struct.Scope.html#method.defer) that are
216-
/// deferred to be run. No matter what happens, before the `Scope` is dropped, it is guaranteed that
217-
/// all the unjoined spawned scoped threads are joined and the deferred functions are run.
201+
/// No matter what happens, before the `Scope` is dropped, it is guaranteed that all the unjoined
202+
/// spawned scoped threads are joined.
218203
///
219-
/// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads and the deferred
220-
/// functions did not panic. It returns `Err(e)` if one of them panics with `e`. If many of them
221-
/// panics, it is still guaranteed that all the threads are joined and all the functions are run,
222-
/// and `thread::scope()` returns `Err(e)` with `e` from a panicking thread or function.
204+
/// `thread::scope()` returns `Ok(())` if all the unjoined spawned threads did not panic. It returns
205+
/// `Err(e)` if one of them panics with `e`. If many of them panic, it is still guaranteed that all
206+
/// the threads are joined, and `thread::scope()` returns `Err(e)` with `e` from a panicking thread.
223207
///
224208
/// # Examples
225209
///
226210
/// Creating and using a scope:
227211
///
228212
/// ```
229213
/// crossbeam_utils::thread::scope(|scope| {
230-
/// scope.defer(|| println!("Exiting scope"));
214+
/// scope.spawn(|| println!("Exiting scope"));
231215
/// scope.spawn(|| println!("Running child thread in scope"));
232216
/// }).unwrap();
233-
/// // Prints messages
234217
/// ```
235218
pub fn scope<'env, F, R>(f: F) -> thread::Result<R>
236219
where
237220
F: FnOnce(&Scope<'env>) -> R,
238221
{
239222
let mut scope = Scope {
240-
dtors: RefCell::new(None),
223+
joins: RefCell::new(Vec::new()),
224+
panics: RefCell::new(Vec::new()),
241225
_marker: PhantomData,
242226
};
243-
let ret = f(&scope);
244-
scope.drop_all()?;
245-
Ok(ret)
227+
228+
// Executes the scoped function. Panic will be catched as `Err`.
229+
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| f(&scope)));
230+
231+
// Joins all the threads.
232+
scope.join_all();
233+
let panic = scope.panics.borrow_mut().pop();
234+
235+
// If any of the threads panicked, returns the panic's payload.
236+
if let Some(payload) = panic {
237+
return Err(payload);
238+
}
239+
240+
// Returns the result of the scoped function.
241+
result
246242
}
247243

248244
impl<'env> fmt::Debug for Scope<'env> {
@@ -259,37 +255,20 @@ impl<'scope, T> fmt::Debug for ScopedJoinHandle<'scope, T> {
259255

260256
impl<'env> Scope<'env> {
261257
// This method is carefully written in a transactional style, so that it can be called directly
262-
// and, if any dtor panics, can be resumed in the unwinding this causes. By initially running
263-
// the method outside of any destructor, we avoid any leakage problems due to
258+
// and, if any thread join panics, can be resumed in the unwinding this causes. By initially
259+
// running the method outside of any destructor, we avoid any leakage problems due to
264260
// @rust-lang/rust#14875.
265-
fn drop_all(&mut self) -> thread::Result<()> {
266-
let mut ret = Ok(());
267-
while let Some(dtor) = DtorChain::pop(&mut self.dtors.borrow_mut()) {
268-
ret = ret.and(dtor.call_box());
261+
//
262+
// FIXME(jeehoonkang): @rust-lang/rust#14875 is fixed, so maybe we can remove the above comment.
263+
// But I'd like to write tests to check it before removing the comment.
264+
fn join_all(&mut self) {
265+
let mut joins = self.joins.borrow_mut();
266+
for join in joins.drain(..) {
267+
let result = join.call_box();
268+
if let Err(payload) = result {
269+
self.panics.borrow_mut().push(payload);
270+
}
269271
}
270-
ret
271-
}
272-
273-
fn defer_inner<F>(&self, f: F)
274-
where
275-
F: (FnOnce() -> thread::Result<()>) + 'env,
276-
{
277-
let mut dtors = self.dtors.borrow_mut();
278-
*dtors = Some(DtorChain {
279-
dtor: Box::new(f),
280-
next: dtors.take().map(Box::new),
281-
});
282-
}
283-
284-
/// Schedule code to be executed when exiting the scope.
285-
///
286-
/// This is akin to having a destructor on the stack, except that it is *guaranteed* to be
287-
/// run. It is guaranteed that the function is called after all the spawned threads are joined.
288-
pub fn defer<F>(&self, f: F)
289-
where
290-
F: FnOnce() + 'env,
291-
{
292-
self.defer_inner(move || panic::catch_unwind(AssertUnwindSafe(f)));
293272
}
294273

295274
/// Create a scoped thread.
@@ -363,14 +342,14 @@ impl<'scope, 'env: 'scope> ScopedThreadBuilder<'scope, 'env> {
363342
let deferred_handle = Rc::new(RefCell::new(Some(join_state)));
364343
let my_handle = deferred_handle.clone();
365344

366-
self.scope.defer_inner(move || {
345+
self.scope.joins.borrow_mut().push(Box::new(move || {
367346
let state = deferred_handle.borrow_mut().deref_mut().take();
368347
if let Some(state) = state {
369348
state.join().map(|_| ())
370349
} else {
371350
Ok(())
372351
}
373-
});
352+
}));
374353

375354
Ok(ScopedJoinHandle {
376355
inner: my_handle,
@@ -407,9 +386,107 @@ impl<'scope, T> ScopedJoinHandle<'scope, T> {
407386

408387
impl<'env> Drop for Scope<'env> {
409388
fn drop(&mut self) {
410-
// Actually, there should be no deferred functions left to be run.
411-
self.drop_all().unwrap();
389+
// Note that `self.joins` can be non-empty when the code inside a `scope()` panics and
390+
// `drop()` is called in unwinding. Even if it's the case, we will join the unjoined
391+
// threads.
392+
//
393+
// We ignore panics from any threads because we're in course of unwinding anyway.
394+
self.join_all();
412395
}
413396
}
414397

415398
type ScopedThreadResult<T> = Arc<Mutex<Option<T>>>;
399+
400+
#[cfg(test)]
401+
mod tests {
402+
use super::*;
403+
use std::sync::atomic::AtomicUsize;
404+
use std::sync::atomic::Ordering;
405+
use std::{thread, time};
406+
407+
const TIMES: usize = 10;
408+
const SMALL_STACK_SIZE: usize = 20;
409+
410+
#[test]
411+
fn join() {
412+
let counter = AtomicUsize::new(0);
413+
scope(|scope| {
414+
let handle = scope.spawn(|| {
415+
counter.store(1, Ordering::Relaxed);
416+
});
417+
assert!(handle.join().is_ok());
418+
419+
let panic_handle = scope.spawn(|| {
420+
panic!("\"My honey is running out!\", said Pooh.");
421+
});
422+
assert!(panic_handle.join().is_err());
423+
}).unwrap();
424+
425+
// There should be sufficient synchronization.
426+
assert_eq!(1, counter.load(Ordering::Relaxed));
427+
}
428+
429+
#[test]
430+
fn counter() {
431+
let counter = AtomicUsize::new(0);
432+
scope(|scope| {
433+
for _ in 0..TIMES {
434+
scope.spawn(|| {
435+
counter.fetch_add(1, Ordering::Relaxed);
436+
});
437+
}
438+
}).unwrap();
439+
440+
assert_eq!(TIMES, counter.load(Ordering::Relaxed));
441+
}
442+
443+
#[test]
444+
fn counter_builder() {
445+
let counter = AtomicUsize::new(0);
446+
scope(|scope| {
447+
for i in 0..TIMES {
448+
scope
449+
.builder()
450+
.name(format!("child-{}", i))
451+
.stack_size(SMALL_STACK_SIZE)
452+
.spawn(|| {
453+
counter.fetch_add(1, Ordering::Relaxed);
454+
})
455+
.unwrap();
456+
}
457+
}).unwrap();
458+
459+
assert_eq!(TIMES, counter.load(Ordering::Relaxed));
460+
}
461+
462+
#[test]
463+
fn counter_panic() {
464+
let counter = AtomicUsize::new(0);
465+
let result = scope(|scope| {
466+
scope.spawn(|| {
467+
panic!("\"My honey is running out!\", said Pooh.");
468+
});
469+
thread::sleep(time::Duration::from_millis(100));
470+
471+
for _ in 0..TIMES {
472+
scope.spawn(|| {
473+
counter.fetch_add(1, Ordering::Relaxed);
474+
});
475+
}
476+
});
477+
478+
assert_eq!(TIMES, counter.load(Ordering::Relaxed));
479+
assert!(result.is_err());
480+
}
481+
482+
#[test]
483+
fn panic_twice() {
484+
let result = scope(|scope| {
485+
scope.spawn(|| {
486+
panic!();
487+
});
488+
panic!();
489+
});
490+
assert!(result.is_err());
491+
}
492+
}

0 commit comments

Comments
 (0)