Skip to content

Commit d43d91f

Browse files
committed
Close waits on doneSignal. Test Close no-op when called multiple times.
1 parent eee9d50 commit d43d91f

File tree

2 files changed

+45
-39
lines changed

2 files changed

+45
-39
lines changed

internal/queues/callback_queue.go

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@ var ErrQueueClosed = errors.New("queue is closed")
2121
type CallBackQueue struct {
2222
// The ordered list of callbacks to be processed.
2323
list *lists.List[*callBackRequest]
24-
// enqueueSignals signals when a new item is added. It must be a buffered
24+
// enqueueSignals are sent when a new item is added. It must be a buffered
2525
// channel to avoid missing signals.
2626
enqueueSignals chan struct{}
2727
// running prevents concurrent processCallBacks execution.
2828
running sync.Mutex
2929
// If false, the queue must not be used; return ErrQueueClosed.
3030
opened atomic.Bool
31-
// closeSignal is closed as a signal to shut down the queue processing.
31+
// closeSignal is closed to signal the queue to begin shutdown.
3232
closeSignal chan struct{}
33+
// doneSignal is closed to signal the queue is fully shutdown.
34+
doneSignal chan struct{}
3335
}
3436

3537
// newUnopenedCallBackQueue creates a queue in the closed state. Use
@@ -39,6 +41,7 @@ func newUnopenedCallBackQueue() *CallBackQueue {
3941
list: lists.NewList[*callBackRequest](),
4042
enqueueSignals: make(chan struct{}, 1),
4143
closeSignal: make(chan struct{}),
44+
doneSignal: make(chan struct{}),
4245
}
4346
}
4447

@@ -50,8 +53,6 @@ func OpenCallBackQueue() *CallBackQueue {
5053
q.running.Lock()
5154
processCallBacksIsRunning := make(chan struct{})
5255
go func() {
53-
defer q.running.Unlock()
54-
defer q.Close()
5556
defer func() {
5657
if v := recover(); v != nil {
5758
// This should not happen, but if it does, this defer only adds
@@ -60,6 +61,8 @@ func OpenCallBackQueue() *CallBackQueue {
6061
panic(fmt.Sprintf("callback queue panicked: %v", v))
6162
}
6263
}()
64+
defer q.running.Unlock()
65+
defer close(q.doneSignal)
6366
q.opened.Store(true)
6467
close(processCallBacksIsRunning)
6568
q.processCallBacks()
@@ -78,12 +81,24 @@ func (q *CallBackQueue) Close() {
7881
if !q.opened.Swap(false) {
7982
return // The queue is already closed.
8083
}
81-
close(q.closeSignal)
82-
// Obtain the running lock to ensure the queue is finished processing before
83-
// returning.
84-
q.running.Lock()
85-
defer q.running.Unlock()
8684
q.list.Clear()
85+
close(q.closeSignal)
86+
<-q.doneSignal
87+
}
88+
89+
// Push adds a callback to the queue. It panics if cb is nil. It returns
90+
// ErrQueueClosed if the queue is closed.
91+
func (q *CallBackQueue) Push(cb CallBackFunc) error {
92+
if cb == nil {
93+
panic("nil callback function")
94+
}
95+
if !q.opened.Load() {
96+
return ErrQueueClosed
97+
}
98+
// Preserve order.
99+
q.list.PushBack(&callBackRequest{fn: cb, tm: time.Now()})
100+
q.signalEnqueue()
101+
return nil
87102
}
88103

89104
// processCallBacks is responsible for invoking callbacks from the list when it
@@ -146,21 +161,6 @@ func (q *CallBackQueue) invokeOneCallBack() {
146161
curr.fn(callbackCtx, time.Since(curr.tm))
147162
}
148163

149-
// Push adds a callback to the queue. It panics if cb is nil. It returns
150-
// ErrQueueClosed if the queue is closed.
151-
func (q *CallBackQueue) Push(cb CallBackFunc) error {
152-
if cb == nil {
153-
panic("nil callback function")
154-
}
155-
if !q.opened.Load() {
156-
return ErrQueueClosed
157-
}
158-
// Preserve order.
159-
q.list.PushBack(&callBackRequest{fn: cb, tm: time.Now()})
160-
q.signalEnqueue()
161-
return nil
162-
}
163-
164164
// CallBackFunc is a function type that represents a callback to be executed.
165165
// The ctx is canceled if the queue is closed while the callback is executing.
166166
// The delay is the time since the callback was added to the queue.

internal/queues/callback_queue_test.go

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ func assertErrorIs(t *testing.T, err error, target error, msg string) {
3838
func TestCallbackQueue_newUnopenedCallBackQueue(t *testing.T) {
3939
q := newUnopenedCallBackQueue()
4040
assertTrue(t, q != nil, "newUnopenedCallBackQueue should return a non-nil queue")
41-
assertTrue(t, !q.opened.Load(), "newUnopenedCallBackQueue should return a closed queue")
41+
assertTrue(t, !q.opened.Load(), "newUnopenedCallBackQueue should return an opened queue")
4242
assertTrue(t, q.list.Len() == 0, "newUnopenedCallBackQueue should return an empty queue")
43-
// check that the enqueueSignals channel is buffered
43+
// Check that the enqueueSignals channel is buffered.
4444
select {
4545
case q.enqueueSignals <- struct{}{}:
4646
default:
@@ -52,16 +52,19 @@ func TestCallbackQueue_OpenCallBackQueue(t *testing.T) {
5252
q := OpenCallBackQueue()
5353
assertTrue(t, q != nil, "OpenCallBackQueue should return a non-nil queue")
5454
assertTrue(t, q.opened.Load(), "OpenCallBackQueue should return an opened queue")
55+
assertTrue(t, !q.running.TryLock(), "OpenCallBackQueue should not have the running lock locked")
5556
q.Close()
5657
assertTrue(t, !q.opened.Load(), "OpenCallBackQueue should close the queue after Close() is called")
57-
// check that the queue was closed by acquiring the running lock.
58+
// Check that doneSignal is closed.
59+
<-q.doneSignal
60+
// Check that the running lock was released.
5861
q.running.Lock()
5962
defer q.running.Unlock()
6063
}
6164

6265
func TestCallbackQueue_processCallBacks_starts_and_cancels(t *testing.T) {
6366
q := newUnopenedCallBackQueue()
64-
// stage a callback to be processed
67+
// Stage a callback to be processed
6568
cbStarted := make(chan struct{})
6669
cbFinished := make(chan struct{})
6770
cb := func(ctx context.Context, d time.Duration) {
@@ -70,19 +73,19 @@ func TestCallbackQueue_processCallBacks_starts_and_cancels(t *testing.T) {
7073
close(cbFinished)
7174
}
7275
q.list.PushBack(&callBackRequest{fn: cb, tm: time.Now()})
73-
// run the processCallBacks method
76+
// Run the processCallBacks method
7477
go q.processCallBacks()
7578
<-cbStarted // wait for the callback to start processing
7679
assertTrue(t, q.list.Len() == 0, "Callback queue should be empty after processing")
7780
close(q.closeSignal)
78-
// the context was canceled when closeSignal was closed.
81+
// The context was canceled when closeSignal was closed.
7982
<-cbFinished
8083
}
8184

8285
func TestCallbackQueue_processCallBacks_auto_dequeues(t *testing.T) {
8386
q := newUnopenedCallBackQueue()
8487
go q.processCallBacks()
85-
// stage a callback to be processed
88+
// Stage a callback to be processed.
8689
n := 10
8790
var wg sync.WaitGroup
8891
wg.Add(n)
@@ -92,7 +95,7 @@ func TestCallbackQueue_processCallBacks_auto_dequeues(t *testing.T) {
9295
}
9396
q.list.PushBack(&callBackRequest{fn: cb, tm: time.Now()})
9497
}
95-
// only send one signal, the processCallBacks should dequeue the rest.
98+
// Only send one signal; processCallBacks should dequeue the rest.
9699
q.signalEnqueue()
97100
wg.Wait()
98101
assertTrue(t, q.list.Len() == 0, "Callback queue should be empty after processing all callbacks")
@@ -106,7 +109,7 @@ func TestCallbackQueue_Push_does_not_block(t *testing.T) {
106109
neverProcessed := func(ctx context.Context, d time.Duration) {}
107110
n := 100
108111
for range n {
109-
// push callbacks while there is nothing to dequeue.
112+
// Push callbacks while there is nothing to dequeue.
110113
err := q.Push(neverProcessed)
111114
assertNoError(t, err, "Push should not return an error")
112115
}
@@ -146,7 +149,7 @@ func TestCallbackQueue_Push_order_preserved(t *testing.T) {
146149
// Process callbacks.
147150
results := make([]string, len(expectedResults))
148151
for i, v := range expectedResults {
149-
i, v := i, v // Capture loop variables
152+
i, v := i, v // Capture loop variables.
150153
err := q.Push(func(_ context.Context, _ time.Duration) {
151154
defer wg.Done()
152155
results[i] = v
@@ -175,15 +178,18 @@ func TestCallbackQueue_Close(t *testing.T) {
175178
q.closeSignal <- struct{}{} // This should panic.
176179
}
177180

181+
func TestCallbackQueue_Close_multiple_calls_no_ops(t *testing.T) {
182+
q := OpenCallBackQueue()
183+
for range 10 {
184+
q.Close()
185+
}
186+
}
187+
178188
func TestCallbackQueue_Push_after_close_returns_ErrQueueClosed(t *testing.T) {
179189
q := OpenCallBackQueue()
180190
q.Close()
181-
var executed bool
182-
err := q.Push(func(_ context.Context, _ time.Duration) {
183-
executed = true
184-
})
191+
err := q.Push(func(_ context.Context, _ time.Duration) {})
185192
assertErrorIs(t, err, ErrQueueClosed, "Push should return an error after queue close")
186-
assertTrue(t, !executed, "Callback should not be executed after queue close")
187193
}
188194

189195
func TestCallbackQueue_Push_unopened_returns_ErrClosed(t *testing.T) {

0 commit comments

Comments
 (0)