117117//! ```
118118
119119use crate :: loom:: cell:: UnsafeCell ;
120- use crate :: loom:: sync:: atomic:: AtomicUsize ;
120+ use crate :: loom:: sync:: atomic:: { AtomicBool , AtomicUsize } ;
121121use crate :: loom:: sync:: { Arc , Mutex , MutexGuard , RwLock , RwLockReadGuard } ;
122122use crate :: util:: linked_list:: { self , GuardedLinkedList , LinkedList } ;
123123use crate :: util:: WakeList ;
@@ -127,7 +127,7 @@ use std::future::Future;
127127use std:: marker:: PhantomPinned ;
128128use std:: pin:: Pin ;
129129use std:: ptr:: NonNull ;
130- use std:: sync:: atomic:: Ordering :: SeqCst ;
130+ use std:: sync:: atomic:: Ordering :: { Acquire , Relaxed , Release , SeqCst } ;
131131use std:: task:: { Context , Poll , Waker } ;
132132use std:: usize;
133133
@@ -354,7 +354,7 @@ struct Slot<T> {
354354/// An entry in the wait queue.
355355struct Waiter {
356356 /// True if queued.
357- queued : bool ,
357+ queued : AtomicBool ,
358358
359359 /// Task waiting on the broadcast channel.
360360 waker : Option < Waker > ,
@@ -369,7 +369,7 @@ struct Waiter {
369369impl Waiter {
370370 fn new ( ) -> Self {
371371 Self {
372- queued : false ,
372+ queued : AtomicBool :: new ( false ) ,
373373 waker : None ,
374374 pointers : linked_list:: Pointers :: new ( ) ,
375375 _p : PhantomPinned ,
@@ -897,15 +897,22 @@ impl<T> Shared<T> {
897897 ' outer: loop {
898898 while wakers. can_push ( ) {
899899 match list. pop_back_locked ( & mut tail) {
900- Some ( mut waiter) => {
901- // Safety: `tail` lock is still held.
902- let waiter = unsafe { waiter. as_mut ( ) } ;
903-
904- assert ! ( waiter. queued) ;
905- waiter. queued = false ;
906-
907- if let Some ( waker) = waiter. waker . take ( ) {
908- wakers. push ( waker) ;
900+ Some ( waiter) => {
901+ unsafe {
902+ // Safety: accessing `waker` is safe because
903+ // the tail lock is held.
904+ if let Some ( waker) = ( * waiter. as_ptr ( ) ) . waker . take ( ) {
905+ wakers. push ( waker) ;
906+ }
907+
908+ // Safety: `queued` is atomic.
909+ let queued = & ( * waiter. as_ptr ( ) ) . queued ;
910+ // `Relaxed` suffices because the tail lock is held.
911+ assert ! ( queued. load( Relaxed ) ) ;
912+ // `Release` is needed to synchronize with `Recv::drop`.
913+ // It is critical to set this variable **after** waker
914+ // is extracted, otherwise we may data race with `Recv::drop`.
915+ queued. store ( false , Release ) ;
909916 }
910917 }
911918 None => {
@@ -1104,8 +1111,13 @@ impl<T> Receiver<T> {
11041111 }
11051112 }
11061113
1107- if !( * ptr) . queued {
1108- ( * ptr) . queued = true ;
1114+ // If the waiter is not already queued, enqueue it.
1115+ // `Relaxed` order suffices: we have synchronized with
1116+ // all writers through the tail lock that we hold.
1117+ if !( * ptr) . queued . load ( Relaxed ) {
1118+ // `Relaxed` order suffices: all the readers will
1119+ // synchronize with this write through the tail lock.
1120+ ( * ptr) . queued . store ( true , Relaxed ) ;
11091121 tail. waiters . push_front ( NonNull :: new_unchecked ( & mut * ptr) ) ;
11101122 }
11111123 } ) ;
@@ -1357,7 +1369,7 @@ impl<'a, T> Recv<'a, T> {
13571369 Recv {
13581370 receiver,
13591371 waiter : UnsafeCell :: new ( Waiter {
1360- queued : false ,
1372+ queued : AtomicBool :: new ( false ) ,
13611373 waker : None ,
13621374 pointers : linked_list:: Pointers :: new ( ) ,
13631375 _p : PhantomPinned ,
@@ -1402,22 +1414,37 @@ where
14021414
14031415impl < ' a , T > Drop for Recv < ' a , T > {
14041416 fn drop ( & mut self ) {
1405- // Acquire the tail lock. This is required for safety before accessing
1406- // the waiter node.
1407- let mut tail = self . receiver . shared . tail . lock ( ) ;
1408-
1409- // safety: tail lock is held
1410- let queued = self . waiter . with ( |ptr| unsafe { ( * ptr) . queued } ) ;
1411-
1417+ // Safety: `waiter.queued` is atomic.
1418+ // Acquire ordering is required to synchronize with
1419+ // `Shared::notify_rx` before we drop the object.
1420+ let queued = self
1421+ . waiter
1422+ . with ( |ptr| unsafe { ( * ptr) . queued . load ( Acquire ) } ) ;
1423+
1424+ // If the waiter is queued, we need to unlink it from the waiters list.
1425+ // If not, no further synchronization is required, since the waiter
1426+ // is not in the list and, as such, is not shared with any other threads.
14121427 if queued {
1413- // Remove the node
1414- //
1415- // safety: tail lock is held and the wait node is verified to be in
1416- // the list.
1417- unsafe {
1418- self . waiter . with_mut ( |ptr| {
1419- tail. waiters . remove ( ( & mut * ptr) . into ( ) ) ;
1420- } ) ;
1428+ // Acquire the tail lock. This is required for safety before accessing
1429+ // the waiter node.
1430+ let mut tail = self . receiver . shared . tail . lock ( ) ;
1431+
1432+ // Safety: tail lock is held.
1433+ // `Relaxed` order suffices because we hold the tail lock.
1434+ let queued = self
1435+ . waiter
1436+ . with_mut ( |ptr| unsafe { ( * ptr) . queued . load ( Relaxed ) } ) ;
1437+
1438+ if queued {
1439+ // Remove the node
1440+ //
1441+ // safety: tail lock is held and the wait node is verified to be in
1442+ // the list.
1443+ unsafe {
1444+ self . waiter . with_mut ( |ptr| {
1445+ tail. waiters . remove ( ( & mut * ptr) . into ( ) ) ;
1446+ } ) ;
1447+ }
14211448 }
14221449 }
14231450 }
0 commit comments