From f1a40410ecce3c1b115e244c7e189e019e226c13 Mon Sep 17 00:00:00 2001 From: Mara Bos <m-ou.se@m-ou.se> Date: Wed, 6 Apr 2022 16:30:49 +0200 Subject: [PATCH 1/5] Return status from futex_wake(). --- library/std/src/sys/unix/futex.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/library/std/src/sys/unix/futex.rs b/library/std/src/sys/unix/futex.rs index c61d948fb601d..4231241a14224 100644 --- a/library/std/src/sys/unix/futex.rs +++ b/library/std/src/sys/unix/futex.rs @@ -69,14 +69,14 @@ pub fn futex_wait(futex: &AtomicI32, expected: i32, timeout: Option<Duration>) { } #[cfg(any(target_os = "linux", target_os = "android"))] -pub fn futex_wake(futex: &AtomicI32) { +pub fn futex_wake(futex: &AtomicI32) -> bool { unsafe { libc::syscall( libc::SYS_futex, futex as *const AtomicI32, libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG, 1, - ); + ) > 0 } } @@ -93,12 +93,10 @@ pub fn futex_wake_all(futex: &AtomicI32) { } #[cfg(target_os = "emscripten")] -pub fn futex_wake(futex: &AtomicI32) { +pub fn futex_wake(futex: &AtomicI32) -> bool { extern "C" { fn emscripten_futex_wake(addr: *const AtomicI32, count: libc::c_int) -> libc::c_int; } - unsafe { - emscripten_futex_wake(futex as *const AtomicI32, 1); - } + unsafe { emscripten_futex_wake(futex as *const AtomicI32, 1) > 0 } } From 6b2344b892b3fdbf0db3ee5c63f7531bad2f87e4 Mon Sep 17 00:00:00 2001 From: Mara Bos <m-ou.se@m-ou.se> Date: Wed, 6 Apr 2022 16:31:11 +0200 Subject: [PATCH 2/5] Add futex-based RwLock on Linux. --- .../std/src/sys/unix/locks/futex_rwlock.rs | 228 ++++++++++++++++++ library/std/src/sys/unix/locks/mod.rs | 4 +- 2 files changed, 230 insertions(+), 2 deletions(-) create mode 100644 library/std/src/sys/unix/locks/futex_rwlock.rs diff --git a/library/std/src/sys/unix/locks/futex_rwlock.rs b/library/std/src/sys/unix/locks/futex_rwlock.rs new file mode 100644 index 0000000000000..221d5af9a02d2 --- /dev/null +++ b/library/std/src/sys/unix/locks/futex_rwlock.rs @@ -0,0 +1,228 @@ +use crate::sync::atomic::{ + AtomicI32, + Ordering::{Acquire, Relaxed, Release}, +}; +use crate::sys::futex::{futex_wait, futex_wake, futex_wake_all}; + +pub type MovableRwLock = RwLock; + +pub struct RwLock { + // The state consists of a 30-bit reader counter, a 'readers waiting' flag, and a 'writers waiting' flag. + // All bits of the reader counter set means write locked. + // A reader count of zero means the lock is unlocked. + // See the constants below. + // Readers wait on this futex. + state: AtomicI32, + // The 'condition variable' to notify writers through. + // Incremented on every signal. + // Writers wait on this futex. + writer_notify: AtomicI32, +} + +const READ_LOCKED: i32 = 1; +const MAX_READERS: i32 = (1 << 30) - 2; +const WRITE_LOCKED: i32 = (1 << 30) - 1; +const READERS_WAITING: i32 = 1 << 30; +const WRITERS_WAITING: i32 = 1 << 31; + +fn readers(state: i32) -> i32 { + state & !(READERS_WAITING + WRITERS_WAITING) +} + +fn readers_waiting(state: i32) -> bool { + state & READERS_WAITING != 0 +} + +fn writers_waiting(state: i32) -> bool { + state & WRITERS_WAITING != 0 +} + +fn read_lockable(state: i32) -> bool { + readers(state) < MAX_READERS && !readers_waiting(state) && !writers_waiting(state) +} + +impl RwLock { + #[inline] + pub const fn new() -> Self { + Self { state: AtomicI32::new(0), writer_notify: AtomicI32::new(0) } + } + + #[inline] + pub unsafe fn destroy(&self) {} + + #[inline] + pub unsafe fn try_read(&self) -> bool { + self.state + .fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED)) + .is_ok() + } + + #[inline] + pub unsafe fn read(&self) { + if let Err(s) = + self.state.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED)) + { + self.read_contended(s); + } + } + + #[inline] + pub unsafe fn try_write(&self) -> bool { + self.state + .fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED)) + .is_ok() + } + + #[inline] + pub unsafe fn write(&self) { + if let Err(s) = self + .state + .fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED)) + { + self.write_contended(s); + } + } + + #[inline] + pub unsafe fn read_unlock(&self) { + if self.state.fetch_sub(READ_LOCKED, Release) == READ_LOCKED + WRITERS_WAITING { + self.wake_after_read_unlock(); + } + } + + #[inline] + pub unsafe fn write_unlock(&self) { + if let Err(e) = self.state.compare_exchange(WRITE_LOCKED, 0, Release, Relaxed) { + self.write_unlock_contended(e); + } + } + + #[cold] + fn read_contended(&self, mut state: i32) { + loop { + if read_lockable(state) { + match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + if readers(state) == MAX_READERS { + panic!("too many active read locks on RwLock"); + } + + // Make sure the readers waiting bit is set before we go to sleep. + if !readers_waiting(state) { + if let Err(s) = + self.state.compare_exchange(state, state | READERS_WAITING, Relaxed, Relaxed) + { + state = s; + continue; + } + } + + // Wait for the state to change. + futex_wait(&self.state, state | READERS_WAITING, None); + + state = self.state.load(Relaxed); + } + } + + #[cold] + fn write_contended(&self, mut state: i32) { + loop { + // If it's unlocked, we try to lock it. + if readers(state) == 0 { + match self.state.compare_exchange( + state, + state | WRITE_LOCKED | WRITERS_WAITING, // Other threads might be waiting. + Acquire, + Relaxed, + ) { + Ok(_) => return, // Locked! + Err(s) => { + state = s; + continue; + } + } + } + + // Set the waiting bit indicating that we're waiting on it. + if !writers_waiting(state) { + match self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed) + { + Ok(_) => state |= WRITERS_WAITING, + Err(s) => { + state = s; + continue; + } + } + } + + // Examine the notification counter before we check if `state` has changed, + // to make sure we don't miss any notifications. + let seq = self.writer_notify.load(Acquire); + + // Don't go to sleep if the state has already changed. + let s = self.state.load(Relaxed); + if state != s { + state = s; + continue; + } + + // Wait for the state to change. + futex_wait(&self.writer_notify, seq, None); + + // Check out the new state. + state = self.state.load(Relaxed); + } + } + + #[cold] + fn wake_after_read_unlock(&self) { + // If this compare_exchange fails, another writer already locked, which + // will take care of waking up the next waiting writer. + if self.state.compare_exchange(WRITERS_WAITING, 0, Relaxed, Relaxed).is_ok() { + self.writer_notify.fetch_add(1, Release); + futex_wake(&self.writer_notify); + } + } + + #[cold] + fn write_unlock_contended(&self, mut state: i32) { + // If there are any waiting writers _or_ waiting readers, but not both (!), + // we turn off that bit while unlocking. + if readers_waiting(state) != writers_waiting(state) { + if self.state.compare_exchange(state, 0, Release, Relaxed).is_err() { + // The only way this can fail is if the other waiting bit was set too. + state |= READERS_WAITING | WRITERS_WAITING; + } + } + + // If both readers and writers are waiting, unlock but leave the readers waiting. + if readers_waiting(state) && writers_waiting(state) { + self.state.store(READERS_WAITING, Release); + } + + if writers_waiting(state) { + // Notify one writer, if any writer was waiting. + self.writer_notify.fetch_add(1, Release); + if !futex_wake(&self.writer_notify) { + // If there was no writer to wake up, maybe there's readers to wake up instead. + if readers_waiting(state) { + // If this compare_exchange fails, another writer already locked, which + // will take care of waking up the next waiting writer. + if self.state.compare_exchange(READERS_WAITING, 0, Relaxed, Relaxed).is_ok() { + futex_wake_all(&self.state); + } + } + } + } else if readers_waiting(state) { + // Notify all readers, if any reader was waiting. + futex_wake_all(&self.state); + } + } +} diff --git a/library/std/src/sys/unix/locks/mod.rs b/library/std/src/sys/unix/locks/mod.rs index 2b8dd168068b5..85afc939d2e89 100644 --- a/library/std/src/sys/unix/locks/mod.rs +++ b/library/std/src/sys/unix/locks/mod.rs @@ -4,13 +4,13 @@ cfg_if::cfg_if! { target_os = "android", ))] { mod futex; + mod futex_rwlock; #[allow(dead_code)] mod pthread_mutex; // Only used for PthreadMutexAttr, needed by pthread_remutex. mod pthread_remutex; // FIXME: Implement this using a futex - mod pthread_rwlock; // FIXME: Implement this using a futex pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar}; pub use pthread_remutex::ReentrantMutex; - pub use pthread_rwlock::{RwLock, MovableRwLock}; + pub use futex_rwlock::{RwLock, MovableRwLock}; } else { mod pthread_mutex; mod pthread_remutex; From faa9279745edec41adb7bcc1eb3fdeff446773e0 Mon Sep 17 00:00:00 2001 From: Mara Bos <m-ou.se@m-ou.se> Date: Thu, 7 Apr 2022 12:02:10 +0200 Subject: [PATCH 3/5] Add some comments to futex rwlock impl. --- library/std/src/sys/unix/locks/futex_rwlock.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/library/std/src/sys/unix/locks/futex_rwlock.rs b/library/std/src/sys/unix/locks/futex_rwlock.rs index 221d5af9a02d2..09d7b5213e914 100644 --- a/library/std/src/sys/unix/locks/futex_rwlock.rs +++ b/library/std/src/sys/unix/locks/futex_rwlock.rs @@ -75,17 +75,20 @@ impl RwLock { #[inline] pub unsafe fn write(&self) { - if let Err(s) = self - .state - .fetch_update(Acquire, Relaxed, |s| (readers(s) == 0).then(|| s + WRITE_LOCKED)) - { - self.write_contended(s); + if !self.try_write() { + self.write_contended(); } } #[inline] pub unsafe fn read_unlock(&self) { - if self.state.fetch_sub(READ_LOCKED, Release) == READ_LOCKED + WRITERS_WAITING { + let s = self.state.fetch_sub(READ_LOCKED, Release); + + // It's impossible for readers to be waiting if it was read locked. + debug_assert!(!readers_waiting(s)); + + // Wake up a writer if we were the last reader and there's a writer waiting. + if s == READ_LOCKED + WRITERS_WAITING { self.wake_after_read_unlock(); } } @@ -93,6 +96,7 @@ impl RwLock { #[inline] pub unsafe fn write_unlock(&self) { if let Err(e) = self.state.compare_exchange(WRITE_LOCKED, 0, Release, Relaxed) { + // Readers or writers (or both) are waiting. self.write_unlock_contended(e); } } @@ -100,6 +104,7 @@ impl RwLock { #[cold] fn read_contended(&self, mut state: i32) { loop { + // If we can lock it, lock it. if read_lockable(state) { match self.state.compare_exchange(state, state + READ_LOCKED, Acquire, Relaxed) { Ok(_) => return, // Locked! @@ -110,6 +115,7 @@ impl RwLock { } } + // Check for overflow. if readers(state) == MAX_READERS { panic!("too many active read locks on RwLock"); } From de4a29079bcf3553166933257a06f0c707a25af1 Mon Sep 17 00:00:00 2001 From: Mara Bos <m-ou.se@m-ou.se> Date: Thu, 7 Apr 2022 12:02:57 +0200 Subject: [PATCH 4/5] Spin in futex rwlock. --- .../std/src/sys/unix/locks/futex_rwlock.rs | 49 +++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/library/std/src/sys/unix/locks/futex_rwlock.rs b/library/std/src/sys/unix/locks/futex_rwlock.rs index 09d7b5213e914..d0fa5e5b92738 100644 --- a/library/std/src/sys/unix/locks/futex_rwlock.rs +++ b/library/std/src/sys/unix/locks/futex_rwlock.rs @@ -59,10 +59,8 @@ impl RwLock { #[inline] pub unsafe fn read(&self) { - if let Err(s) = - self.state.fetch_update(Acquire, Relaxed, |s| read_lockable(s).then(|| s + READ_LOCKED)) - { - self.read_contended(s); + if !self.try_read() { + self.read_contended(); } } @@ -102,7 +100,9 @@ impl RwLock { } #[cold] - fn read_contended(&self, mut state: i32) { + fn read_contended(&self) { + let mut state = self.spin_read(); + loop { // If we can lock it, lock it. if read_lockable(state) { @@ -133,12 +133,15 @@ impl RwLock { // Wait for the state to change. futex_wait(&self.state, state | READERS_WAITING, None); - state = self.state.load(Relaxed); + // Spin again after waking up. + state = self.spin_read(); } } #[cold] - fn write_contended(&self, mut state: i32) { + fn write_contended(&self) { + let mut state = self.spin_write(); + loop { // If it's unlocked, we try to lock it. if readers(state) == 0 { @@ -182,8 +185,8 @@ impl RwLock { // Wait for the state to change. futex_wait(&self.writer_notify, seq, None); - // Check out the new state. - state = self.state.load(Relaxed); + // Spin again after waking up. + state = self.spin_write(); } } @@ -231,4 +234,32 @@ impl RwLock { futex_wake_all(&self.state); } } + + /// Spin for a while, but stop directly at the given condition. + fn spin_until(&self, f: impl Fn(i32) -> bool) -> i32 { + let mut spin = 100; // Chosen by fair dice roll. + loop { + let state = self.state.load(Relaxed); + if f(state) || spin == 0 { + return state; + } + crate::hint::spin_loop(); + spin -= 1; + } + } + + fn spin_write(&self) -> i32 { + self.spin_until(|state| { + // Stop spinning when we can lock it, or when there's waiting + // writers, to keep things somewhat fair. + readers(state) == 0 || writers_waiting(state) + }) + } + + fn spin_read(&self) -> i32 { + self.spin_until(|state| { + // Stop spinning when it's unlocked or read locked, or when there's waiting threads. + readers(state) != WRITE_LOCKED || readers_waiting(state) || writers_waiting(state) + }) + } } From b656db29f2fd1fef9031d0ad1cb1e55bc357c8d4 Mon Sep 17 00:00:00 2001 From: Mara Bos <m-ou.se@m-ou.se> Date: Thu, 7 Apr 2022 12:19:25 +0200 Subject: [PATCH 5/5] Don't make writers spin when #readers changes in futex RwLock. --- library/std/src/sys/unix/locks/futex_rwlock.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/library/std/src/sys/unix/locks/futex_rwlock.rs b/library/std/src/sys/unix/locks/futex_rwlock.rs index d0fa5e5b92738..a3f316b0e6f29 100644 --- a/library/std/src/sys/unix/locks/futex_rwlock.rs +++ b/library/std/src/sys/unix/locks/futex_rwlock.rs @@ -161,13 +161,11 @@ impl RwLock { // Set the waiting bit indicating that we're waiting on it. if !writers_waiting(state) { - match self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed) + if let Err(s) = + self.state.compare_exchange(state, state | WRITERS_WAITING, Relaxed, Relaxed) { - Ok(_) => state |= WRITERS_WAITING, - Err(s) => { - state = s; - continue; - } + state = s; + continue; } } @@ -175,9 +173,10 @@ impl RwLock { // to make sure we don't miss any notifications. let seq = self.writer_notify.load(Acquire); - // Don't go to sleep if the state has already changed. + // Don't go to sleep if the lock has become available, or the + // writers waiting bit is no longer set. let s = self.state.load(Relaxed); - if state != s { + if readers(s) == 0 || !writers_waiting(s) { state = s; continue; }