Skip to content

Commit 2f071a8

Browse files
committed
doc
1 parent 765c877 commit 2f071a8

File tree

4 files changed

+49
-16
lines changed

4 files changed

+49
-16
lines changed

README.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/rrgmc/nbchanlist"
1919
)
2020

21-
func ExampleQueue() {
21+
func ExampleNewQueue() {
2222
q := nbchanlist.NewQueue[int]()
2323
q.Put(12) // never blocks
2424
q.Put(13)
@@ -37,6 +37,10 @@ func ExampleQueue() {
3737
fmt.Println("should never happen")
3838
}
3939
}
40+
41+
// Output:
42+
// 12
43+
// queue is closed
4044
}
4145
```
4246

example_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ func ExampleNewQueue() {
2626
fmt.Println("should never happen")
2727
}
2828
}
29+
30+
// Output:
31+
// 12
32+
// queue is closed
2933
}

list.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
)
88

99
var (
10-
ErrClosed = errors.New("list is closed")
10+
ErrStopped = errors.New("stopped")
1111
)
1212

1313
// List is a non-blocking unbounded lock-free channel-based list for Golang.
@@ -28,11 +28,13 @@ func New[E any, Q ListType[E]](list Q) *List[E, Q] {
2828
}
2929

3030
// Get returns a channel to get items. The caller must check for it to be closed.
31+
// It can still be called even after Stop.
3132
func (q *List[E, Q]) Get() <-chan E {
3233
return q.out
3334
}
3435

3536
// GetCtx returns one item, or an error if the context is done or the list is closed.
37+
// It can still be called even after Stop.
3638
func (q *List[E, Q]) GetCtx(ctx context.Context) (E, error) {
3739
select {
3840
case <-ctx.Done():
@@ -41,7 +43,7 @@ func (q *List[E, Q]) GetCtx(ctx context.Context) (E, error) {
4143
case v, ok := <-q.Get():
4244
if !ok {
4345
var ret E
44-
return ret, ErrClosed
46+
return ret, ErrStopped
4547
}
4648
return v, nil
4749
}
@@ -53,13 +55,13 @@ func (q *List[E, Q]) Put(e E) {
5355
}
5456

5557
// PutCheck puts an element in the list. It never fails or blocks.
56-
// Returns ErrClosed if the list is closed.
58+
// Returns ErrStopped if the list stopped accepting new items.
5759
func (q *List[E, Q]) PutCheck(e E) error {
5860
if in := q.in.Load(); in != nil {
5961
*in <- e
6062
return nil
6163
}
62-
return ErrClosed
64+
return ErrStopped
6365
}
6466

6567
func (q *List[E, Q]) Size() int {
@@ -72,12 +74,21 @@ func (q *List[E, Q]) Size() int {
7274
}
7375
}
7476

75-
func (q *List[E, Q]) Closed() bool {
77+
func (q *List[E, Q]) Stopped() bool {
7678
return q.in.Load() == nil
7779
}
7880

81+
// Stop stops accepting new items. Get still works until the list is drained.
82+
func (q *List[E, Q]) Stop() {
83+
if in := q.in.Swap(nil); in != nil {
84+
close(*in)
85+
}
86+
}
87+
88+
// Close stops accepting new items and drains any existing ones, freeing all used resources.
7989
func (q *List[E, Q]) Close() {
80-
if old := q.in.Swap(nil); old != nil {
81-
close(*old)
90+
q.Stop()
91+
// drain items to stop goroutine.
92+
for range q.out {
8293
}
8394
}

queue_test.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,17 @@ func TestQueueCtx(t *testing.T) {
3232
assert.Equal(t, q.Size(), 0)
3333
}
3434

35-
func TestQueueClose(t *testing.T) {
35+
func TestQueueStop(t *testing.T) {
3636
q := NewQueue[int]()
3737
q.Put(12)
3838
q.Put(13)
39-
q.Close()
39+
q.Stop()
4040
q.Put(16)
41-
assert.Assert(t, q.Closed())
41+
assert.Assert(t, q.Stopped())
4242
expected := []int{12, 13}
43-
items, err := readNWithTimeout(q, 3)
44-
assert.ErrorIs(t, err, ErrClosed)
45-
assert.DeepEqual(t, expected, items)
43+
items, err := readNWithTimeout(q, 3) // can still Get after Stop.
44+
assert.ErrorIs(t, err, ErrStopped)
45+
assert.DeepEqual(t, expected, items) // still gets the expected items.
4646
}
4747

4848
func TestQueueTimeout(t *testing.T) {
@@ -55,6 +55,19 @@ func TestQueueTimeout(t *testing.T) {
5555
assert.DeepEqual(t, expected, items)
5656
}
5757

58+
func TestQueueClose(t *testing.T) {
59+
q := NewQueue[int]()
60+
q.Put(12)
61+
q.Put(13)
62+
q.Close()
63+
q.Put(16)
64+
assert.Assert(t, q.Stopped())
65+
var expected []int // Close drains all pending items.
66+
items, err := readNWithTimeout(q, 2) // can still get after close
67+
assert.ErrorIs(t, err, ErrStopped)
68+
assert.DeepEqual(t, expected, items)
69+
}
70+
5871
func TestQueueCtxTimeout(t *testing.T) {
5972
ctx := context.Background()
6073
q := NewQueue[int]()
@@ -98,8 +111,9 @@ func TestQueueConcurrency(t *testing.T) {
98111
}()
99112
}
100113
wgPut.Wait()
101-
q.Close()
114+
q.Stop()
102115
wgGet.Wait()
116+
q.Close()
103117

104118
assert.Assert(t, cmp2.Len(putItems, 10*10))
105119
assert.Assert(t, cmp2.Len(getItems, 10*10))
@@ -147,7 +161,7 @@ func readNWithTimeout[E any, Q ListType[E]](q *List[E, Q], n int) ([]E, error) {
147161
select {
148162
case item, ok := <-q.Get():
149163
if !ok {
150-
return ret, ErrClosed
164+
return ret, ErrStopped
151165
}
152166
ret = append(ret, item)
153167
case <-time.After(300 * time.Millisecond):

0 commit comments

Comments
 (0)