Skip to content

Commit c11c8f6

Browse files
committed
internal/task: use a lock-free queue
This changes task.Stack and task.Queue to use atomic compare-and-swap operations. With this change, they can also be executed safely from multiple threads/cores. Additionally, the queue is actually LIFO now (not sure what I was thinking before).
1 parent aa053b5 commit c11c8f6

File tree

1 file changed

+63
-76
lines changed

1 file changed

+63
-76
lines changed

src/internal/task/queue.go

Lines changed: 63 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,63 @@
11
package task
22

3-
import "runtime/interrupt"
3+
import (
4+
"sync/atomic"
5+
"unsafe"
6+
)
47

58
const asserts = false
69

710
// Queue is a FIFO container of tasks.
811
// The zero value is an empty queue.
912
type Queue struct {
10-
head, tail *Task
13+
// in is a stack used to buffer incoming tasks.
14+
in Stack
15+
16+
// out is a singly linked list of tasks in oldest-first order.
17+
// Once empty, it is refilled by dumping and flipping the input stack.
18+
out *Task
1119
}
1220

1321
// Push a task onto the queue.
22+
// This is atomic.
1423
func (q *Queue) Push(t *Task) {
15-
i := interrupt.Disable()
16-
if asserts && t.Next != nil {
17-
interrupt.Restore(i)
18-
panic("runtime: pushing a task to a queue with a non-nil Next pointer")
19-
}
20-
if q.tail != nil {
21-
q.tail.Next = t
22-
}
23-
q.tail = t
24-
t.Next = nil
25-
if q.head == nil {
26-
q.head = t
27-
}
28-
interrupt.Restore(i)
24+
q.in.Push(t)
2925
}
3026

3127
// Pop a task off of the queue.
28+
// This cannot be called concurrently.
3229
func (q *Queue) Pop() *Task {
33-
i := interrupt.Disable()
34-
t := q.head
35-
if t == nil {
36-
interrupt.Restore(i)
37-
return nil
38-
}
39-
q.head = t.Next
40-
if q.tail == t {
41-
q.tail = nil
42-
}
43-
t.Next = nil
44-
interrupt.Restore(i)
45-
return t
46-
}
30+
next := q.out
31+
if next == nil {
32+
// Dump the input stack.
33+
s := q.in.dump()
34+
35+
// Flip it.
36+
var prev *Task
37+
for t := s.top; t != nil; {
38+
next := t.Next
39+
t.Next = prev
40+
prev = t
41+
t = next
42+
}
43+
if prev == nil {
44+
// The queue is empty.
45+
return nil
46+
}
4747

48-
// Append pops the contents of another queue and pushes them onto the end of this queue.
49-
func (q *Queue) Append(other *Queue) {
50-
i := interrupt.Disable()
51-
if q.head == nil {
52-
q.head = other.head
53-
} else {
54-
q.tail.Next = other.head
48+
// Save it in the output list.
49+
next = prev
5550
}
56-
q.tail = other.tail
57-
other.head, other.tail = nil, nil
58-
interrupt.Restore(i)
51+
52+
q.out = next.Next
53+
next.Next = nil
54+
return next
5955
}
6056

6157
// Empty checks if the queue is empty.
58+
// This cannot be called concurrently with Pop.
6259
func (q *Queue) Empty() bool {
63-
i := interrupt.Disable()
64-
empty := q.head == nil
65-
interrupt.Restore(i)
66-
return empty
60+
return q.out == nil && atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.in.top))) == nil
6761
}
6862

6963
// Stack is a LIFO container of tasks.
@@ -74,51 +68,44 @@ type Stack struct {
7468
}
7569

7670
// Push a task onto the stack.
71+
// This is atomic.
7772
func (s *Stack) Push(t *Task) {
78-
i := interrupt.Disable()
7973
if asserts && t.Next != nil {
80-
interrupt.Restore(i)
8174
panic("runtime: pushing a task to a stack with a non-nil Next pointer")
8275
}
83-
s.top, t.Next = t, s.top
84-
interrupt.Restore(i)
76+
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
77+
doPush:
78+
top := atomic.LoadPointer(topPtr)
79+
t.Next = (*Task)(top)
80+
if !atomic.CompareAndSwapPointer(topPtr, top, unsafe.Pointer(t)) {
81+
goto doPush
82+
}
8583
}
8684

8785
// Pop a task off of the stack.
86+
// This is atomic.
8887
func (s *Stack) Pop() *Task {
89-
i := interrupt.Disable()
90-
t := s.top
91-
if t != nil {
92-
s.top = t.Next
93-
t.Next = nil
94-
}
95-
interrupt.Restore(i)
96-
return t
97-
}
98-
99-
// tail follows the chain of tasks.
100-
// If t is nil, returns nil.
101-
// Otherwise, returns the task in the chain where the Next field is nil.
102-
func (t *Task) tail() *Task {
103-
if t == nil {
88+
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
89+
doPop:
90+
top := atomic.LoadPointer(topPtr)
91+
if top == nil {
10492
return nil
10593
}
106-
for t.Next != nil {
107-
t = t.Next
94+
t := (*Task)(top)
95+
next := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.Next)))
96+
if !atomic.CompareAndSwapPointer(topPtr, top, next) {
97+
goto doPop
10898
}
99+
t.Next = nil
109100
return t
110101
}
111102

112-
// Queue moves the contents of the stack into a queue.
113-
// Elements can be popped from the queue in the same order that they would be popped from the stack.
114-
func (s *Stack) Queue() Queue {
115-
i := interrupt.Disable()
116-
head := s.top
117-
s.top = nil
118-
q := Queue{
119-
head: head,
120-
tail: head.tail(),
103+
// dump the contents of the stack to another stack.
104+
func (s *Stack) dump() Stack {
105+
return Stack{
106+
top: (*Task)(atomic.SwapPointer(
107+
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
108+
nil,
109+
)),
121110
}
122-
interrupt.Restore(i)
123-
return q
124111
}

0 commit comments

Comments
 (0)