Skip to content

Commit f3f7667

Browse files
committed
Port with_thread_data from word_lock into parking_lot
1 parent 2bc60d8 commit f3f7667

File tree

1 file changed

+115
-106
lines changed

1 file changed

+115
-106
lines changed

core/src/parking_lot.rs

Lines changed: 115 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,11 @@ impl ThreadData {
162162
}
163163
}
164164

165-
// Returns a ThreadData structure for the current thread
166-
unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
165+
// Invokes the given closure with a reference to the current thread `ThreadData`.
166+
fn with_thread_data<F, T>(f: F) -> T
167+
where
168+
F: FnOnce(&ThreadData) -> T,
169+
{
167170
// Try to read from thread-local storage, but return None if the TLS has
168171
// already been destroyed.
169172
#[cfg(has_localkey_try_with)]
@@ -175,16 +178,22 @@ unsafe fn get_thread_data(local: &mut Option<ThreadData>) -> &ThreadData {
175178
panic::catch_unwind(|| key.with(|x| x as *const ThreadData)).ok()
176179
}
177180

178-
// Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
179-
// to construct. Try to use a thread-local version if possible.
180-
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
181-
if let Some(tls) = try_get_tls(&THREAD_DATA) {
182-
return &*tls;
181+
let mut thread_data_ptr = ptr::null();
182+
// If ThreadData is expensive to construct, then we want to use a cached
183+
// version in thread-local storage if possible.
184+
if !ThreadParker::IS_CHEAP_TO_CONSTRUCT {
185+
thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
186+
if let Some(tls_thread_data) = try_get_tls(&THREAD_DATA) {
187+
thread_data_ptr = tls_thread_data;
188+
}
183189
}
184-
185190
// Otherwise just create a ThreadData on the stack
186-
*local = Some(ThreadData::new());
187-
local.as_ref().unwrap()
191+
let mut thread_data_storage = None;
192+
if thread_data_ptr.is_null() {
193+
thread_data_ptr = thread_data_storage.get_or_insert_with(ThreadData::new);
194+
}
195+
196+
f(unsafe { &*thread_data_ptr })
188197
}
189198

190199
impl Drop for ThreadData {
@@ -579,106 +588,106 @@ unsafe fn park_internal(
579588
timeout: Option<Instant>,
580589
) -> ParkResult {
581590
// Grab our thread data, this also ensures that the hash table exists
582-
let mut thread_data = None;
583-
let thread_data = get_thread_data(&mut thread_data);
591+
with_thread_data(|thread_data| {
584592

585-
// Lock the bucket for the given key
586-
let bucket = lock_bucket(key);
593+
// Lock the bucket for the given key
594+
let bucket = lock_bucket(key);
587595

588-
// If the validation function fails, just return
589-
if !validate() {
596+
// If the validation function fails, just return
597+
if !validate() {
598+
bucket.mutex.unlock();
599+
return ParkResult::Invalid;
600+
}
601+
602+
// Append our thread data to the queue and unlock the bucket
603+
thread_data.parked_with_timeout.set(timeout.is_some());
604+
thread_data.next_in_queue.set(ptr::null());
605+
thread_data.key.store(key, Ordering::Relaxed);
606+
thread_data.park_token.set(park_token);
607+
thread_data.parker.prepare_park();
608+
if !bucket.queue_head.get().is_null() {
609+
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
610+
} else {
611+
bucket.queue_head.set(thread_data);
612+
}
613+
bucket.queue_tail.set(thread_data);
590614
bucket.mutex.unlock();
591-
return ParkResult::Invalid;
592-
}
593615

594-
// Append our thread data to the queue and unlock the bucket
595-
thread_data.parked_with_timeout.set(timeout.is_some());
596-
thread_data.next_in_queue.set(ptr::null());
597-
thread_data.key.store(key, Ordering::Relaxed);
598-
thread_data.park_token.set(park_token);
599-
thread_data.parker.prepare_park();
600-
if !bucket.queue_head.get().is_null() {
601-
(*bucket.queue_tail.get()).next_in_queue.set(thread_data);
602-
} else {
603-
bucket.queue_head.set(thread_data);
604-
}
605-
bucket.queue_tail.set(thread_data);
606-
bucket.mutex.unlock();
616+
// Invoke the pre-sleep callback
617+
before_sleep();
618+
619+
// Park our thread and determine whether we were woken up by an unpark or by
620+
// our timeout. Note that this isn't precise: we can still be unparked since
621+
// we are still in the queue.
622+
let unparked = match timeout {
623+
Some(timeout) => thread_data.parker.park_until(timeout),
624+
None => {
625+
thread_data.parker.park();
626+
// call deadlock detection on_unpark hook
627+
deadlock::on_unpark(thread_data);
628+
true
629+
}
630+
};
607631

608-
// Invoke the pre-sleep callback
609-
before_sleep();
610-
611-
// Park our thread and determine whether we were woken up by an unpark or by
612-
// our timeout. Note that this isn't precise: we can still be unparked since
613-
// we are still in the queue.
614-
let unparked = match timeout {
615-
Some(timeout) => thread_data.parker.park_until(timeout),
616-
None => {
617-
thread_data.parker.park();
618-
// call deadlock detection on_unpark hook
619-
deadlock::on_unpark(thread_data);
620-
true
632+
// If we were unparked, return now
633+
if unparked {
634+
return ParkResult::Unparked(thread_data.unpark_token.get());
621635
}
622-
};
623-
624-
// If we were unparked, return now
625-
if unparked {
626-
return ParkResult::Unparked(thread_data.unpark_token.get());
627-
}
628636

629-
// Lock our bucket again. Note that the hashtable may have been rehashed in
630-
// the meantime. Our key may also have changed if we were requeued.
631-
let (key, bucket) = lock_bucket_checked(&thread_data.key);
637+
// Lock our bucket again. Note that the hashtable may have been rehashed in
638+
// the meantime. Our key may also have changed if we were requeued.
639+
let (key, bucket) = lock_bucket_checked(&thread_data.key);
632640

633-
// Now we need to check again if we were unparked or timed out. Unlike the
634-
// last check this is precise because we hold the bucket lock.
635-
if !thread_data.parker.timed_out() {
636-
bucket.mutex.unlock();
637-
return ParkResult::Unparked(thread_data.unpark_token.get());
638-
}
641+
// Now we need to check again if we were unparked or timed out. Unlike the
642+
// last check this is precise because we hold the bucket lock.
643+
if !thread_data.parker.timed_out() {
644+
bucket.mutex.unlock();
645+
return ParkResult::Unparked(thread_data.unpark_token.get());
646+
}
639647

640-
// We timed out, so we now need to remove our thread from the queue
641-
let mut link = &bucket.queue_head;
642-
let mut current = bucket.queue_head.get();
643-
let mut previous = ptr::null();
644-
while !current.is_null() {
645-
if current == thread_data {
646-
let next = (*current).next_in_queue.get();
647-
link.set(next);
648-
let mut was_last_thread = true;
649-
if bucket.queue_tail.get() == current {
650-
bucket.queue_tail.set(previous);
651-
} else {
652-
// Scan the rest of the queue to see if there are any other
653-
// entries with the given key.
654-
let mut scan = next;
655-
while !scan.is_null() {
656-
if (*scan).key.load(Ordering::Relaxed) == key {
657-
was_last_thread = false;
658-
break;
648+
// We timed out, so we now need to remove our thread from the queue
649+
let mut link = &bucket.queue_head;
650+
let mut current = bucket.queue_head.get();
651+
let mut previous = ptr::null();
652+
while !current.is_null() {
653+
if current == thread_data {
654+
let next = (*current).next_in_queue.get();
655+
link.set(next);
656+
let mut was_last_thread = true;
657+
if bucket.queue_tail.get() == current {
658+
bucket.queue_tail.set(previous);
659+
} else {
660+
// Scan the rest of the queue to see if there are any other
661+
// entries with the given key.
662+
let mut scan = next;
663+
while !scan.is_null() {
664+
if (*scan).key.load(Ordering::Relaxed) == key {
665+
was_last_thread = false;
666+
break;
667+
}
668+
scan = (*scan).next_in_queue.get();
659669
}
660-
scan = (*scan).next_in_queue.get();
661670
}
662-
}
663671

664-
// Callback to indicate that we timed out, and whether we were the
665-
// last thread on the queue.
666-
timed_out(key, was_last_thread);
667-
break;
668-
} else {
669-
link = &(*current).next_in_queue;
670-
previous = current;
671-
current = link.get();
672+
// Callback to indicate that we timed out, and whether we were the
673+
// last thread on the queue.
674+
timed_out(key, was_last_thread);
675+
break;
676+
} else {
677+
link = &(*current).next_in_queue;
678+
previous = current;
679+
current = link.get();
680+
}
672681
}
673-
}
674682

675-
// There should be no way for our thread to have been removed from the queue
676-
// if we timed out.
677-
debug_assert!(!current.is_null());
683+
// There should be no way for our thread to have been removed from the queue
684+
// if we timed out.
685+
debug_assert!(!current.is_null());
678686

679-
// Unlock the bucket, we are done
680-
bucket.mutex.unlock();
681-
ParkResult::TimedOut
687+
// Unlock the bucket, we are done
688+
bucket.mutex.unlock();
689+
ParkResult::TimedOut
690+
})
682691
}
683692

684693
/// Unparks one thread from the queue associated with the given key.
@@ -1149,7 +1158,7 @@ pub mod deadlock {
11491158

11501159
#[cfg(feature = "deadlock_detection")]
11511160
mod deadlock_impl {
1152-
use super::{get_hashtable, get_thread_data, lock_bucket, ThreadData, NUM_THREADS};
1161+
use super::{get_hashtable, with_thread_data, lock_bucket, ThreadData, NUM_THREADS};
11531162
use backtrace::Backtrace;
11541163
use petgraph;
11551164
use petgraph::graphmap::DiGraphMap;
@@ -1222,19 +1231,19 @@ mod deadlock_impl {
12221231
}
12231232

12241233
pub unsafe fn acquire_resource(key: usize) {
1225-
let mut thread_data = None;
1226-
let thread_data = get_thread_data(&mut thread_data);
1227-
(*thread_data.deadlock_data.resources.get()).push(key);
1234+
with_thread_data(|thread_data| {
1235+
(*thread_data.deadlock_data.resources.get()).push(key);
1236+
});
12281237
}
12291238

12301239
pub unsafe fn release_resource(key: usize) {
1231-
let mut thread_data = None;
1232-
let thread_data = get_thread_data(&mut thread_data);
1233-
let resources = &mut (*thread_data.deadlock_data.resources.get());
1234-
match resources.iter().rposition(|x| *x == key) {
1235-
Some(p) => resources.swap_remove(p),
1236-
None => panic!("key {} not found in thread resources", key),
1237-
};
1240+
with_thread_data(|thread_data| {
1241+
let resources = &mut (*thread_data.deadlock_data.resources.get());
1242+
match resources.iter().rposition(|x| *x == key) {
1243+
Some(p) => resources.swap_remove(p),
1244+
None => panic!("key {} not found in thread resources", key),
1245+
};
1246+
});
12381247
}
12391248

12401249
pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {

0 commit comments

Comments
 (0)