@@ -22,50 +22,61 @@ use core::cell::UnsafeCell;
2222
2323use sync:: atomic:: { AtomicPtr , AtomicUsize , Ordering } ;
2424
25+ use super :: cache_aligned:: CacheAligned ;
26+
2527// Node within the linked list queue of messages to send
2628struct Node < T > {
2729 // FIXME: this could be an uninitialized T if we're careful enough, and
2830 // that would reduce memory usage (and be a bit faster).
2931 // is it worth it?
3032 value : Option < T > , // nullable for re-use of nodes
33+ cached : bool , // This node goes into the node cache
3134 next : AtomicPtr < Node < T > > , // next node in the queue
3235}
3336
3437/// The single-producer single-consumer queue. This structure is not cloneable,
3538/// but it can be safely shared in an Arc if it is guaranteed that there
3639/// is only one popper and one pusher touching the queue at any one point in
3740/// time.
38- pub struct Queue < T > {
41+ pub struct Queue < T , ProducerAddition = ( ) , ConsumerAddition = ( ) > {
3942 // consumer fields
43+ consumer : CacheAligned < Consumer < T , ConsumerAddition > > ,
44+
45+ // producer fields
46+ producer : CacheAligned < Producer < T , ProducerAddition > > ,
47+ }
48+
49+ struct Consumer < T , Addition > {
4050 tail : UnsafeCell < * mut Node < T > > , // where to pop from
4151 tail_prev : AtomicPtr < Node < T > > , // where to pop from
52+ cache_bound : usize , // maximum cache size
53+ cached_nodes : AtomicUsize , // number of nodes marked as cachable
54+ addition : Addition ,
55+ }
4256
43- // producer fields
57+ struct Producer < T , Addition > {
4458 head : UnsafeCell < * mut Node < T > > , // where to push to
4559 first : UnsafeCell < * mut Node < T > > , // where to get new nodes from
4660 tail_copy : UnsafeCell < * mut Node < T > > , // between first/tail
47-
48- // Cache maintenance fields. Additions and subtractions are stored
49- // separately in order to allow them to use nonatomic addition/subtraction.
50- cache_bound : usize ,
51- cache_additions : AtomicUsize ,
52- cache_subtractions : AtomicUsize ,
61+ addition : Addition ,
5362}
5463
55- unsafe impl < T : Send > Send for Queue < T > { }
64+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Send for Queue < T , P , C > { }
5665
57- unsafe impl < T : Send > Sync for Queue < T > { }
66+ unsafe impl < T : Send , P : Send + Sync , C : Send + Sync > Sync for Queue < T , P , C > { }
5867
5968impl < T > Node < T > {
6069 fn new ( ) -> * mut Node < T > {
6170 Box :: into_raw ( box Node {
6271 value : None ,
72+ cached : false ,
6373 next : AtomicPtr :: new ( ptr:: null_mut :: < Node < T > > ( ) ) ,
6474 } )
6575 }
6676}
6777
6878impl < T > Queue < T > {
79+ #[ cfg( test) ]
6980 /// Creates a new queue.
7081 ///
7182 /// This is unsafe as the type system doesn't enforce a single
@@ -84,18 +95,60 @@ impl<T> Queue<T> {
8495 /// no bound. Otherwise, the cache will never grow larger than
8596 /// `bound` (although the queue itself could be much larger.
8697 pub unsafe fn new ( bound : usize ) -> Queue < T > {
98+ Self :: with_additions ( bound, ( ) , ( ) )
99+ }
100+ }
101+
102+ impl < T , ProducerAddition , ConsumerAddition > Queue < T , ProducerAddition , ConsumerAddition > {
103+
104+ /// Creates a new queue. With given additional elements in the producer and
105+ /// consumer portions of the queue.
106+ ///
107+ /// Due to the performance implications of cache-contention,
108+ /// we wish to keep fields used mainly by the producer on a separate cache
109+ /// line than those used by the consumer.
110+ /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
111+ /// allocate one for small fields, so we allow users to insert additional
112+ /// fields into the cache lines already allocated by this for the producer
113+ /// and consumer.
114+ ///
115+ /// This is unsafe as the type system doesn't enforce a single
116+ /// consumer-producer relationship. It also allows the consumer to `pop`
117+ /// items while there is a `peek` active due to all methods having a
118+ /// non-mutable receiver.
119+ ///
120+ /// # Arguments
121+ ///
122+ /// * `bound` - This queue implementation is implemented with a linked
123+ /// list, and this means that a push is always a malloc. In
124+ /// order to amortize this cost, an internal cache of nodes is
125+ /// maintained to prevent a malloc from always being
126+ /// necessary. This bound is the limit on the size of the
127+ /// cache (if desired). If the value is 0, then the cache has
128+ /// no bound. Otherwise, the cache will never grow larger than
129+ /// `bound` (although the queue itself could be much larger.
130+ pub unsafe fn with_additions (
131+ bound : usize ,
132+ producer_addition : ProducerAddition ,
133+ consumer_addition : ConsumerAddition ,
134+ ) -> Self {
87135 let n1 = Node :: new ( ) ;
88136 let n2 = Node :: new ( ) ;
89137 ( * n1) . next . store ( n2, Ordering :: Relaxed ) ;
90138 Queue {
91- tail : UnsafeCell :: new ( n2) ,
92- tail_prev : AtomicPtr :: new ( n1) ,
93- head : UnsafeCell :: new ( n2) ,
94- first : UnsafeCell :: new ( n1) ,
95- tail_copy : UnsafeCell :: new ( n1) ,
96- cache_bound : bound,
97- cache_additions : AtomicUsize :: new ( 0 ) ,
98- cache_subtractions : AtomicUsize :: new ( 0 ) ,
139+ consumer : CacheAligned :: new ( Consumer {
140+ tail : UnsafeCell :: new ( n2) ,
141+ tail_prev : AtomicPtr :: new ( n1) ,
142+ cache_bound : bound,
143+ cached_nodes : AtomicUsize :: new ( 0 ) ,
144+ addition : consumer_addition
145+ } ) ,
146+ producer : CacheAligned :: new ( Producer {
147+ head : UnsafeCell :: new ( n2) ,
148+ first : UnsafeCell :: new ( n1) ,
149+ tail_copy : UnsafeCell :: new ( n1) ,
150+ addition : producer_addition
151+ } ) ,
99152 }
100153 }
101154
@@ -109,35 +162,25 @@ impl<T> Queue<T> {
109162 assert ! ( ( * n) . value. is_none( ) ) ;
110163 ( * n) . value = Some ( t) ;
111164 ( * n) . next . store ( ptr:: null_mut ( ) , Ordering :: Relaxed ) ;
112- ( * * self . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
113- * self . head . get ( ) = n;
165+ ( * * self . producer . head . get ( ) ) . next . store ( n, Ordering :: Release ) ;
166+ * ( & self . producer . head ) . get ( ) = n;
114167 }
115168 }
116169
117170 unsafe fn alloc ( & self ) -> * mut Node < T > {
118171 // First try to see if we can consume the 'first' node for our uses.
119- // We try to avoid as many atomic instructions as possible here, so
120- // the addition to cache_subtractions is not atomic (plus we're the
121- // only one subtracting from the cache).
122- if * self . first . get ( ) != * self . tail_copy . get ( ) {
123- if self . cache_bound > 0 {
124- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
125- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
126- }
127- let ret = * self . first . get ( ) ;
128- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
172+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
173+ let ret = * self . producer . first . get ( ) ;
174+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
129175 return ret;
130176 }
131177 // If the above fails, then update our copy of the tail and try
132178 // again.
133- * self . tail_copy . get ( ) = self . tail_prev . load ( Ordering :: Acquire ) ;
134- if * self . first . get ( ) != * self . tail_copy . get ( ) {
135- if self . cache_bound > 0 {
136- let b = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
137- self . cache_subtractions . store ( b + 1 , Ordering :: Relaxed ) ;
138- }
139- let ret = * self . first . get ( ) ;
140- * self . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
179+ * self . producer . 0 . tail_copy . get ( ) =
180+ self . consumer . tail_prev . load ( Ordering :: Acquire ) ;
181+ if * self . producer . first . get ( ) != * self . producer . tail_copy . get ( ) {
182+ let ret = * self . producer . first . get ( ) ;
183+ * self . producer . 0 . first . get ( ) = ( * ret) . next . load ( Ordering :: Relaxed ) ;
141184 return ret;
142185 }
143186 // If all of that fails, then we have to allocate a new node
@@ -153,27 +196,27 @@ impl<T> Queue<T> {
153196 // sentinel from where we should start popping from. Hence, look at
154197 // tail's next field and see if we can use it. If we do a pop, then
155198 // the current tail node is a candidate for going into the cache.
156- let tail = * self . tail . get ( ) ;
199+ let tail = * self . consumer . tail . get ( ) ;
157200 let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
158201 if next. is_null ( ) { return None }
159202 assert ! ( ( * next) . value. is_some( ) ) ;
160203 let ret = ( * next) . value . take ( ) ;
161204
162- * self . tail . get ( ) = next;
163- if self . cache_bound == 0 {
164- self . tail_prev . store ( tail, Ordering :: Release ) ;
205+ * self . consumer . 0 . tail . get ( ) = next;
206+ if self . consumer . cache_bound == 0 {
207+ self . consumer . tail_prev . store ( tail, Ordering :: Release ) ;
165208 } else {
166- // FIXME: this is dubious with overflow.
167- let additions = self . cache_additions . load ( Ordering :: Relaxed ) ;
168- let subtractions = self . cache_subtractions . load ( Ordering :: Relaxed ) ;
169- let size = additions - subtractions ;
170-
171- if size < self . cache_bound {
172- self . tail_prev . store ( tail, Ordering :: Release ) ;
173- self . cache_additions . store ( additions + 1 , Ordering :: Relaxed ) ;
209+ let cached_nodes = self . consumer . cached_nodes . load ( Ordering :: Relaxed ) ;
210+ if cached_nodes < self . consumer . cache_bound && ! ( * tail ) . cached {
211+ self . consumer . cached_nodes . store ( cached_nodes , Ordering :: Relaxed ) ;
212+ ( * tail ) . cached = true ;
213+ }
214+
215+ if ( * tail) . cached {
216+ self . consumer . tail_prev . store ( tail , Ordering :: Release ) ;
174217 } else {
175- ( * self . tail_prev . load ( Ordering :: Relaxed ) )
176- . next . store ( next, Ordering :: Relaxed ) ;
218+ ( * self . consumer . tail_prev . load ( Ordering :: Relaxed ) )
219+ . next . store ( next, Ordering :: Relaxed ) ;
177220 // We have successfully erased all references to 'tail', so
178221 // now we can safely drop it.
179222 let _: Box < Node < T > > = Box :: from_raw ( tail) ;
@@ -194,17 +237,25 @@ impl<T> Queue<T> {
194237 // This is essentially the same as above with all the popping bits
195238 // stripped out.
196239 unsafe {
197- let tail = * self . tail . get ( ) ;
240+ let tail = * self . consumer . tail . get ( ) ;
198241 let next = ( * tail) . next . load ( Ordering :: Acquire ) ;
199242 if next. is_null ( ) { None } else { ( * next) . value . as_mut ( ) }
200243 }
201244 }
245+
246+ pub fn producer_addition ( & self ) -> & ProducerAddition {
247+ & self . producer . addition
248+ }
249+
250+ pub fn consumer_addition ( & self ) -> & ConsumerAddition {
251+ & self . consumer . addition
252+ }
202253}
203254
204- impl < T > Drop for Queue < T > {
255+ impl < T , ProducerAddition , ConsumerAddition > Drop for Queue < T , ProducerAddition , ConsumerAddition > {
205256 fn drop ( & mut self ) {
206257 unsafe {
207- let mut cur = * self . first . get ( ) ;
258+ let mut cur = * self . producer . first . get ( ) ;
208259 while !cur. is_null ( ) {
209260 let next = ( * cur) . next . load ( Ordering :: Relaxed ) ;
210261 let _n: Box < Node < T > > = Box :: from_raw ( cur) ;
0 commit comments