@@ -13,8 +13,7 @@ import (
13
13
14
14
// A spanSet is a set of *mspans.
15
15
//
16
- // spanSet is safe for concurrent push operations *or* concurrent
17
- // pop operations, but not both simultaneously.
16
+ // spanSet is safe for concurrent push and pop operations.
18
17
type spanSet struct {
19
18
// A spanSet is a two-level data structure consisting of a
20
19
// growable spine that points to fixed-sized blocks. The spine
@@ -38,9 +37,18 @@ type spanSet struct {
38
37
spineLen uintptr // Spine array length, accessed atomically
39
38
spineCap uintptr // Spine array cap, accessed under lock
40
39
41
- // index is the first unused slot in the logical concatenation
42
- // of all blocks. It is accessed atomically.
43
- index uint32
40
+ // index is the head and tail of the spanSet in a single field.
41
+ // The head and the tail both represent an index into the logical
42
+ // concatenation of all blocks, with the head always behind or
43
+ // equal to the tail (indicating an empty set). This field is
44
+ // always accessed atomically.
45
+ //
46
+ // The head and the tail are only 32 bits wide, which means we
47
+ // can only support up to 2^32 pushes before a reset. If every
48
+ // span in the heap were stored in this set, and each span were
49
+ // the minimum size (1 runtime page, 8 KiB), then roughly the
50
+ // smallest heap which would be unrepresentable is 32 TiB in size.
51
+ index headTailIndex
44
52
}
45
53
46
54
const (
@@ -64,10 +72,10 @@ type spanSetBlock struct {
64
72
}
65
73
66
74
// push adds span s to buffer b. push is safe to call concurrently
67
- // with other push operations, but NOT to call concurrently with pop.
75
+ // with other push and pop operations .
68
76
func (b * spanSet ) push (s * mspan ) {
69
77
// Obtain our slot.
70
- cursor := uintptr (atomic . Xadd ( & b .index , + 1 ) - 1 )
78
+ cursor := uintptr (b .index . incTail (). tail ( ) - 1 )
71
79
top , bottom := cursor / spanSetBlockEntries , cursor % spanSetBlockEntries
72
80
73
81
// Do we need to add a block?
@@ -132,72 +140,115 @@ retry:
132
140
}
133
141
134
142
// pop removes and returns a span from buffer b, or nil if b is empty.
135
- // pop is safe to call concurrently with other pop operations, but NOT
136
- // to call concurrently with push.
143
+ // pop is safe to call concurrently with other pop and push operations.
137
144
func (b * spanSet ) pop () * mspan {
138
- cursor := atomic .Xadd (& b .index , - 1 )
139
- if int32 (cursor ) < 0 {
140
- atomic .Xadd (& b .index , + 1 )
141
- return nil
145
+ var head , tail uint32
146
+ claimLoop:
147
+ for {
148
+ headtail := b .index .load ()
149
+ head , tail = headtail .split ()
150
+ if head >= tail {
151
+ // The buf is empty, as far as we can tell.
152
+ return nil
153
+ }
154
+ // Check if the head position we want to claim is actually
155
+ // backed by a block.
156
+ spineLen := atomic .Loaduintptr (& b .spineLen )
157
+ if spineLen <= uintptr (head )/ spanSetBlockEntries {
158
+ // We're racing with a spine growth and the allocation of
159
+ // a new block (and maybe a new spine!), and trying to grab
160
+ // the span at the index which is currently being pushed.
161
+ // Instead of spinning, let's just notify the caller that
162
+ // there's nothing currently here. Spinning on this is
163
+ // almost definitely not worth it.
164
+ return nil
165
+ }
166
+ // Try to claim the current head by CASing in an updated head.
167
+ // This may fail transiently due to a push which modifies the
168
+ // tail, so keep trying while the head isn't changing.
169
+ want := head
170
+ for want == head {
171
+ if b .index .cas (headtail , makeHeadTailIndex (want + 1 , tail )) {
172
+ break claimLoop
173
+ }
174
+ headtail = b .index .load ()
175
+ head , tail = headtail .split ()
176
+ }
177
+ // We failed to claim the spot we were after and the head changed,
178
+ // meaning a popper got ahead of us. Try again from the top because
179
+ // the buf may not be empty.
142
180
}
181
+ top , bottom := head / spanSetBlockEntries , head % spanSetBlockEntries
143
182
144
- // There are no concurrent spine or block modifications during
145
- // pop, so we can omit the atomics.
146
- top , bottom := cursor / spanSetBlockEntries , cursor % spanSetBlockEntries
147
- blockp := (* * spanSetBlock )(add (b .spine , sys .PtrSize * uintptr (top )))
148
- block := * blockp
149
- s := block .spans [bottom ]
150
- // Clear the pointer for block(i).
151
- block .spans [bottom ] = nil
152
-
153
- // If we're the last popper in the block, free the block.
154
- if used := atomic .Xadd (& block .used , - 1 ); used == 0 {
155
- // Decrement spine length and clear the block's pointer.
156
- atomic .Xadduintptr (& b .spineLen , ^ uintptr (0 ) /* -1 */ )
157
- atomic .StorepNoWB (add (b .spine , sys .PtrSize * uintptr (top )), nil )
183
+ // We may be reading a stale spine pointer, but because the length
184
+ // grows monotonically and we've already verified it, we'll definitely
185
+ // be reading from a valid block.
186
+ spine := atomic .Loadp (unsafe .Pointer (& b .spine ))
187
+ blockp := add (spine , sys .PtrSize * uintptr (top ))
188
+
189
+ // Given that the spine length is correct, we know we will never
190
+ // see a nil block here, since the length is always updated after
191
+ // the block is set.
192
+ block := (* spanSetBlock )(atomic .Loadp (blockp ))
193
+ s := (* mspan )(atomic .Loadp (unsafe .Pointer (& block .spans [bottom ])))
194
+ for s == nil {
195
+ // We raced with the span actually being set, but given that we
196
+ // know a block for this span exists, the race window here is
197
+ // extremely small. Try again.
198
+ s = (* mspan )(atomic .Loadp (unsafe .Pointer (& block .spans [bottom ])))
199
+ }
200
+ // Clear the pointer. This isn't strictly necessary, but defensively
201
+ // avoids accidentally re-using blocks which could lead to memory
202
+ // corruption. This way, we'll get a nil pointer access instead.
203
+ atomic .StorepNoWB (unsafe .Pointer (& block .spans [bottom ]), nil )
204
+
205
+ // If we're the last possible popper in the block, free the block.
206
+ if used := atomic .Xadd (& block .used , - 1 ); used == 0 && bottom == spanSetBlockEntries - 1 {
207
+ // Clear the block's pointer.
208
+ atomic .StorepNoWB (blockp , nil )
158
209
159
210
// Return the block to the block pool.
160
211
spanSetBlockPool .free (block )
161
212
}
162
213
return s
163
214
}
164
215
165
- // numBlocks returns the number of blocks in buffer b. numBlocks is
166
- // safe to call concurrently with any other operation. Spans that have
167
- // been pushed prior to the call to numBlocks are guaranteed to appear
168
- // in some block in the range [0, numBlocks()), assuming there are no
169
- // intervening pops. Spans that are pushed after the call may also
170
- // appear in these blocks.
171
- func (b * spanSet ) numBlocks () int {
172
- return int ((atomic .Load (& b .index ) + spanSetBlockEntries - 1 ) / spanSetBlockEntries )
173
- }
174
-
175
- // block returns the spans in the i'th block of buffer b. block is
176
- // safe to call concurrently with push. The block may contain nil
177
- // pointers that must be ignored, and each entry in the block must be
178
- // loaded atomically.
179
- func (b * spanSet ) block (i int ) []* mspan {
180
- // Perform bounds check before loading spine address since
181
- // push ensures the allocated length is at least spineLen.
182
- if i < 0 || uintptr (i ) >= atomic .Loaduintptr (& b .spineLen ) {
183
- throw ("block index out of range" )
216
+ // reset resets a spanSet which is empty. It will also clean up
217
+ // any left over blocks.
218
+ //
219
+ // Throws if the buf is not empty.
220
+ //
221
+ // reset may not be called concurrently with any other operations
222
+ // on the span set.
223
+ func (b * spanSet ) reset () {
224
+ head , tail := b .index .load ().split ()
225
+ if head < tail {
226
+ print ("head = " , head , ", tail = " , tail , "\n " )
227
+ throw ("attempt to clear non-empty span set" )
184
228
}
229
+ top := head / spanSetBlockEntries
230
+ if uintptr (top ) < b .spineLen {
231
+ // If the head catches up to the tail and the set is empty,
232
+ // we may not clean up the block containing the head and tail
233
+ // since it may be pushed into again. In order to avoid leaking
234
+ // memory since we're going to reset the head and tail, clean
235
+ // up such a block now, if it exists.
236
+ blockp := (* * spanSetBlock )(add (b .spine , sys .PtrSize * uintptr (top )))
237
+ block := * blockp
238
+ if block != nil {
239
+ // Sanity check the used value.
240
+ if block .used != 0 {
241
+ throw ("found used block in empty span set" )
242
+ }
243
+ // Clear the pointer to the block.
244
+ atomic .StorepNoWB (unsafe .Pointer (blockp ), nil )
185
245
186
- // Get block i.
187
- spine := atomic .Loadp (unsafe .Pointer (& b .spine ))
188
- blockp := add (spine , sys .PtrSize * uintptr (i ))
189
- block := (* spanSetBlock )(atomic .Loadp (blockp ))
190
-
191
- // Slice the block if necessary.
192
- cursor := uintptr (atomic .Load (& b .index ))
193
- top , bottom := cursor / spanSetBlockEntries , cursor % spanSetBlockEntries
194
- var spans []* mspan
195
- if uintptr (i ) < top {
196
- spans = block .spans [:]
197
- } else {
198
- spans = block .spans [:bottom ]
246
+ // Return the block to the block pool.
247
+ spanSetBlockPool .free (block )
248
+ }
199
249
}
200
- return spans
250
+ b .index .reset ()
251
+ atomic .Storeuintptr (& b .spineLen , 0 )
201
252
}
202
253
203
254
// spanSetBlockPool is a global pool of spanSetBlocks.
@@ -221,3 +272,64 @@ func (p *spanSetBlockAlloc) alloc() *spanSetBlock {
221
272
func (p * spanSetBlockAlloc ) free (block * spanSetBlock ) {
222
273
p .stack .push (& block .lfnode )
223
274
}
275
+
276
+ // haidTailIndex represents a combined 32-bit head and 32-bit tail
277
+ // of a queue into a single 64-bit value.
278
+ type headTailIndex uint64
279
+
280
+ // makeHeadTailIndex creates a headTailIndex value from a separate
281
+ // head and tail.
282
+ func makeHeadTailIndex (head , tail uint32 ) headTailIndex {
283
+ return headTailIndex (uint64 (head )<< 32 | uint64 (tail ))
284
+ }
285
+
286
+ // head returns the head of a headTailIndex value.
287
+ func (h headTailIndex ) head () uint32 {
288
+ return uint32 (h >> 32 )
289
+ }
290
+
291
+ // tail returns the tail of a headTailIndex value.
292
+ func (h headTailIndex ) tail () uint32 {
293
+ return uint32 (h )
294
+ }
295
+
296
+ // split splits the headTailIndex value into its parts.
297
+ func (h headTailIndex ) split () (head uint32 , tail uint32 ) {
298
+ return h .head (), h .tail ()
299
+ }
300
+
301
+ // load atomically reads a headTailIndex value.
302
+ func (h * headTailIndex ) load () headTailIndex {
303
+ return headTailIndex (atomic .Load64 ((* uint64 )(h )))
304
+ }
305
+
306
+ // cas atomically compares-and-swaps a headTailIndex value.
307
+ func (h * headTailIndex ) cas (old , new headTailIndex ) bool {
308
+ return atomic .Cas64 ((* uint64 )(h ), uint64 (old ), uint64 (new ))
309
+ }
310
+
311
+ // incHead atomically increments the head of a headTailIndex.
312
+ func (h * headTailIndex ) incHead () headTailIndex {
313
+ return headTailIndex (atomic .Xadd64 ((* uint64 )(h ), (1 << 32 )))
314
+ }
315
+
316
+ // decHead atomically decrements the head of a headTailIndex.
317
+ func (h * headTailIndex ) decHead () headTailIndex {
318
+ return headTailIndex (atomic .Xadd64 ((* uint64 )(h ), - (1 << 32 )))
319
+ }
320
+
321
+ // incTail atomically increments the tail of a headTailIndex.
322
+ func (h * headTailIndex ) incTail () headTailIndex {
323
+ ht := headTailIndex (atomic .Xadd64 ((* uint64 )(h ), + 1 ))
324
+ // Check for overflow.
325
+ if ht .tail () == 0 {
326
+ print ("runtime: head = " , ht .head (), ", tail = " , ht .tail (), "\n " )
327
+ throw ("headTailIndex overflow" )
328
+ }
329
+ return ht
330
+ }
331
+
332
+ // reset clears the headTailIndex to (0, 0).
333
+ func (h * headTailIndex ) reset () {
334
+ atomic .Store64 ((* uint64 )(h ), 0 )
335
+ }
0 commit comments