diff --git a/Cargo.lock b/Cargo.lock index ed3e30342f2f6..8163ecfb01c49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,12 +1753,13 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.2.0" +version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ab7905ea95c6d9af62940f9d7dd9596d54c334ae2c15300c482051292d5637f" +checksum = "d37fb7dc756218a0559bfc21e4381f03cbb696cdaf959e7e95e927496f0564cd" dependencies = [ "compiler_builtins", "libc", + "rustc-std-workspace-alloc", "rustc-std-workspace-core", ] @@ -5048,7 +5049,7 @@ dependencies = [ "dlmalloc", "fortanix-sgx-abi", "hashbrown", - "hermit-abi 0.2.0", + "hermit-abi 0.2.3", "libc", "miniz_oxide 0.4.0", "object 0.26.2", diff --git a/library/std/Cargo.toml b/library/std/Cargo.toml index 229e546e08549..e62b4e30dd853 100644 --- a/library/std/Cargo.toml +++ b/library/std/Cargo.toml @@ -42,7 +42,7 @@ dlmalloc = { version = "0.2.3", features = ['rustc-dep-of-std'] } fortanix-sgx-abi = { version = "0.3.2", features = ['rustc-dep-of-std'] } [target.'cfg(target_os = "hermit")'.dependencies] -hermit-abi = { version = "0.2.0", features = ['rustc-dep-of-std'] } +hermit-abi = { version = "0.2.3", features = ['rustc-dep-of-std'] } [target.wasm32-wasi.dependencies] wasi = { version = "0.11.0", features = ['rustc-dep-of-std'], default-features = false } diff --git a/library/std/src/sys/hermit/mod.rs b/library/std/src/sys/hermit/mod.rs index 60b7a973cc2c1..35d61fb7e089e 100644 --- a/library/std/src/sys/hermit/mod.rs +++ b/library/std/src/sys/hermit/mod.rs @@ -43,6 +43,7 @@ pub mod thread; pub mod thread_local_dtor; #[path = "../unsupported/thread_local_key.rs"] pub mod thread_local_key; +pub mod thread_parker; pub mod time; mod condvar; diff --git a/library/std/src/sys/hermit/mutex.rs b/library/std/src/sys/hermit/mutex.rs index eb15a04ffcffb..d7b7a0bb0568f 100644 --- a/library/std/src/sys/hermit/mutex.rs +++ b/library/std/src/sys/hermit/mutex.rs @@ -2,7 +2,6 @@ use crate::cell::UnsafeCell; use crate::collections::VecDeque; use crate::hint; use crate::ops::{Deref, DerefMut, Drop}; -use crate::ptr; use crate::sync::atomic::{AtomicUsize, Ordering}; use crate::sys::hermit::abi; diff --git a/library/std/src/sys/hermit/thread_parker.rs b/library/std/src/sys/hermit/thread_parker.rs new file mode 100644 index 0000000000000..f600bec591fc9 --- /dev/null +++ b/library/std/src/sys/hermit/thread_parker.rs @@ -0,0 +1,134 @@ +// Hermit has efficient thread parking primitives in the form of the `block_current_task`/ +// `wakeup_task` syscalls. `block_current_task` marks the current thread as blocked, which +// means the scheduler will not try to reschedule the task once it is switched away from. +// `wakeup_task` undoes this. Since Hermit is not pre-emptive, this means programs get to +// run code in between calling `block_current_task` and actually waiting (with a call to +// `yield_now`) without encountering any race conditions. +// +// The thread parker uses an atomic variable which is set one of three states: +// * EMPTY: the token has not been made available +// * NOTIFIED: the token is available +// * some pid: the thread with the specified PID is waiting or about to be waiting for +// the token +// Since `wakeup_task` requires the task to be actually waiting, the state needs to +// be checked in between preparing to park and actually parking to avoid deadlocks. +// If the state has changed, the thread resets its own state by calling `wakeup_task`. + +use super::abi; +use crate::pin::Pin; +use crate::sync::atomic::AtomicU64; +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; +use crate::time::{Duration, Instant}; + +// These values are larger than u32::MAX, so they never conflict with the thread's `pid`. +const EMPTY: u64 = 0x1_0000_0000; +const NOTIFIED: u64 = 0x2_0000_0000; + +// Note about atomic memory orderings: +// Acquire ordering is necessary to obtain the token, as otherwise the parked thread +// could perform memory operations visible before it was unparked. Since once set, +// the token cannot be unset by other threads, the state can be reset with a relaxed +// store once it has been read with acquire ordering. +pub struct Parker { + state: AtomicU64, +} + +impl Parker { + /// Construct the thread parker. The UNIX parker implementation + /// requires this to happen in-place. + pub unsafe fn new(parker: *mut Parker) { + parker.write(Parker { state: AtomicU64::new(EMPTY) }) + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park(self: Pin<&Self>) { + if self.state.load(Acquire) == NOTIFIED { + self.state.store(EMPTY, Relaxed); + return; + } + + let pid = abi::getpid(); + abi::block_current_task(); + if self.state.compare_exchange(EMPTY, pid as u64, Acquire, Acquire).is_ok() { + // Loop to avoid spurious wakeups. + loop { + abi::yield_now(); + + if self.state.load(Acquire) == NOTIFIED { + break; + } + + abi::block_current_task(); + + if self.state.load(Acquire) == NOTIFIED { + abi::wakeup_task(pid); + break; + } + } + } else { + abi::wakeup_task(pid); + } + + self.state.store(EMPTY, Relaxed); + } + + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { + if self.state.load(Acquire) == NOTIFIED { + self.state.store(EMPTY, Relaxed); + return; + } + + if dur < Duration::from_millis(1) { + // Spin on the token for sub-millisecond parking. + let now = Instant::now(); + let Some(deadline) = now.checked_add(dur) else { return; }; + loop { + abi::yield_now(); + + if self.state.load(Acquire) == NOTIFIED { + self.state.store(EMPTY, Relaxed); + return; + } else if Instant::now() >= deadline { + // Swap to provide acquire ordering even if the timeout occurred + // before the token was set. + self.state.swap(EMPTY, Acquire); + return; + } + } + } else { + let timeout = dur.as_millis().try_into().unwrap_or(u64::MAX); + let pid = abi::getpid(); + abi::block_current_task_with_timeout(timeout); + if self.state.compare_exchange(EMPTY, pid as u64, Acquire, Acquire).is_ok() { + abi::yield_now(); + + // Swap to provide acquire ordering even if the timeout occurred + // before the token was set. This situation can result in spurious + // wakeups on the next call to `park_timeout`, but it is better to let + // those be handled by the user rather than to do some perhaps unnecessary, + // but always expensive guarding. + self.state.swap(EMPTY, Acquire); + } else { + abi::wakeup_task(pid); + self.state.store(EMPTY, Relaxed); + } + } + } + + // This implementation doesn't require `Pin`, but other implementations do. + pub fn unpark(self: Pin<&Self>) { + // Use release ordering to synchonize with the parked thread. + let state = self.state.swap(NOTIFIED, Release); + + if !matches!(state, EMPTY | NOTIFIED) { + // SAFETY: `wakeup_task` is marked unsafe, but is actually safe to use + unsafe { + let pid = state as u32; + // This is a noop if the task is not blocked or has terminated, but + // that is fine. + abi::wakeup_task(pid); + } + } + } +} diff --git a/library/std/src/sys_common/thread_parker/mod.rs b/library/std/src/sys_common/thread_parker/mod.rs index cbd7832eb7a4c..4018e4956cee1 100644 --- a/library/std/src/sys_common/thread_parker/mod.rs +++ b/library/std/src/sys_common/thread_parker/mod.rs @@ -13,7 +13,7 @@ cfg_if::cfg_if! { } else if #[cfg(target_os = "solid_asp3")] { mod wait_flag; pub use wait_flag::Parker; - } else if #[cfg(any(windows, target_family = "unix"))] { + } else if #[cfg(any(windows, target_family = "unix", target_os = "hermit"))] { pub use crate::sys::thread_parker::Parker; } else { mod generic;