Skip to content

Commit 8c75c65

Browse files
committed
use context to close subscription
1 parent b7841ae commit 8c75c65

File tree

9 files changed

+141
-114
lines changed

9 files changed

+141
-114
lines changed

.github/workflows/test.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
name: Test
2+
23
on:
34
push:
45

56
pull_request:
67

8+
env:
9+
GODEBUG: tracebackancestors=1000
10+
711
jobs:
812

913
linux:

examples_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
package goob_test
22

33
import (
4+
"context"
45
"fmt"
6+
"time"
57

68
"github.com/ysmood/goob"
79
)
810

911
func Example_basic() {
12+
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
13+
defer cancel()
14+
1015
// create an observable instance
11-
ob := goob.New()
12-
defer ob.Close()
16+
ob := goob.New(ctx)
1317

14-
events := ob.Subscribe()
18+
events := ob.Subscribe(context.TODO())
1519

1620
// publish events without blocking
1721
ob.Publish(1)
@@ -21,10 +25,6 @@ func Example_basic() {
2125
// consume events
2226
for e := range events {
2327
fmt.Print(e)
24-
25-
if e.(int) == 3 {
26-
break
27-
}
2828
}
2929

3030
// Output: 123

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
module github.com/ysmood/goob
22

33
go 1.15
4+
5+
require github.com/ysmood/gotrace v0.6.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
github.com/ysmood/gotrace v0.6.0 h1:SyI1d4jclswLhg7SWTL6os3L1WOKeNn/ZtzVQF8QmdY=
2+
github.com/ysmood/gotrace v0.6.0/go.mod h1:TzhIG7nHDry5//eYZDYcTzuJLYQIkykJzCRIo4/dzQM=

goob.go

Lines changed: 23 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
package goob
22

33
import (
4+
"context"
45
"sync"
56
)
67

78
// Observable hub
89
type Observable struct {
10+
ctx context.Context
911
lock *sync.Mutex
10-
subscribers map[Subscriber]*Pipe
12+
subscribers map[Events]func(Event)
1113
}
1214

13-
// Subscriber type
14-
type Subscriber <-chan Event
15-
1615
// New observable instance
17-
func New() *Observable {
16+
func New(ctx context.Context) *Observable {
1817
ob := &Observable{
18+
ctx: ctx,
1919
lock: &sync.Mutex{},
20-
subscribers: map[Subscriber]*Pipe{},
20+
subscribers: map[Events]func(Event){},
2121
}
2222
return ob
2323
}
@@ -27,47 +27,36 @@ func (ob *Observable) Publish(e Event) {
2727
ob.lock.Lock()
2828
defer ob.lock.Unlock()
2929

30-
for _, p := range ob.subscribers {
31-
p.Write(e)
30+
for _, write := range ob.subscribers {
31+
write(e)
3232
}
3333
}
3434

3535
// Subscribe message
36-
func (ob *Observable) Subscribe() Subscriber {
36+
func (ob *Observable) Subscribe(ctx context.Context) Events {
3737
ob.lock.Lock()
3838
defer ob.lock.Unlock()
3939

40-
p := NewPipe()
40+
ctx, cancel := context.WithCancel(ctx)
4141

42-
if ob.subscribers == nil {
43-
p.Stop()
44-
} else {
45-
ob.subscribers[p.Events] = p
46-
}
42+
write, events := NewPipe(ctx)
4743

48-
return p.Events
49-
}
44+
ob.subscribers[events] = write
5045

51-
// Unsubscribe from observable
52-
func (ob *Observable) Unsubscribe(s Subscriber) {
53-
ob.lock.Lock()
54-
defer ob.lock.Unlock()
55-
if p, has := ob.subscribers[s]; has {
56-
p.Stop()
57-
delete(ob.subscribers, s)
58-
}
59-
}
46+
go func() {
47+
select {
48+
case <-ctx.Done():
49+
case <-ob.ctx.Done():
50+
}
6051

61-
// Close subscribers
62-
func (ob *Observable) Close() {
63-
ob.lock.Lock()
64-
defer ob.lock.Unlock()
52+
ob.lock.Lock()
53+
defer ob.lock.Unlock()
6554

66-
for _, p := range ob.subscribers {
67-
p.Stop()
68-
}
55+
delete(ob.subscribers, events)
56+
cancel()
57+
}()
6958

70-
ob.subscribers = nil
59+
return events
7160
}
7261

7362
// Len of the subscribers

goob_test.go

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package goob_test
22

33
import (
4+
"context"
45
"math/rand"
56
"reflect"
67
"runtime"
@@ -10,10 +11,11 @@ import (
1011
"time"
1112

1213
"github.com/ysmood/goob"
14+
"github.com/ysmood/gotrace"
1315
)
1416

1517
func checkLeak(t *testing.T) {
16-
// testleak.Check(t, 0)
18+
gotrace.CheckLeak(t, 0)
1719
}
1820

1921
type null struct{}
@@ -27,9 +29,11 @@ func eq(t *testing.T, expected, actual interface{}) {
2729
func TestNew(t *testing.T) {
2830
checkLeak(t)
2931

30-
ob := goob.New()
31-
defer ob.Close()
32-
s := ob.Subscribe()
32+
ctx, cancel := context.WithCancel(context.Background())
33+
defer t.Cleanup(cancel)
34+
35+
ob := goob.New(ctx)
36+
s := ob.Subscribe(ctx)
3337
size := 1000
3438

3539
expected := []int{}
@@ -51,31 +55,31 @@ func TestNew(t *testing.T) {
5155
eq(t, expected, result)
5256
}
5357

54-
func TestUnsubscribe(t *testing.T) {
58+
func TestCancel(t *testing.T) {
5559
checkLeak(t)
5660

57-
ob := goob.New()
58-
59-
s := ob.Subscribe()
60-
ob.Unsubscribe(s)
61+
ob := goob.New(context.Background())
6162

63+
ctx, cancel := context.WithCancel(context.Background())
64+
ob.Subscribe(ctx)
65+
cancel()
6266
time.Sleep(10 * time.Millisecond)
63-
6467
eq(t, ob.Len(), 0)
6568
}
6669

6770
func TestClosed(t *testing.T) {
6871
checkLeak(t)
6972

70-
ob := goob.New()
71-
ob.Subscribe()
72-
ob.Close()
73+
ctx, cancel := context.WithCancel(context.Background())
7374

74-
s := ob.Subscribe()
75+
ob := goob.New(ctx)
76+
ob.Subscribe(ctx)
77+
cancel()
78+
79+
s := ob.Subscribe(context.Background())
7580
_, ok := <-s
7681

7782
ob.Publish(1)
78-
ob.Unsubscribe(s)
7983

8084
eq(t, ok, false)
8185
eq(t, ob.Len(), 0)
@@ -84,11 +88,13 @@ func TestClosed(t *testing.T) {
8488
func TestMultipleConsumers(t *testing.T) {
8589
checkLeak(t)
8690

87-
ob := goob.New()
88-
defer ob.Close()
89-
s1 := ob.Subscribe()
90-
s2 := ob.Subscribe()
91-
s3 := ob.Subscribe()
91+
ctx, cancel := context.WithCancel(context.Background())
92+
defer t.Cleanup(cancel)
93+
94+
ob := goob.New(ctx)
95+
s1 := ob.Subscribe(ctx)
96+
s2 := ob.Subscribe(ctx)
97+
s3 := ob.Subscribe(ctx)
9298
size := 1000
9399

94100
expected := []int{}
@@ -136,9 +142,11 @@ func TestMultipleConsumers(t *testing.T) {
136142
func TestSlowConsumer(t *testing.T) {
137143
checkLeak(t)
138144

139-
ob := goob.New()
140-
defer ob.Close()
141-
s := ob.Subscribe()
145+
ctx, cancel := context.WithCancel(context.Background())
146+
defer t.Cleanup(cancel)
147+
148+
ob := goob.New(ctx)
149+
s := ob.Subscribe(ctx)
142150

143151
ob.Publish(1)
144152

@@ -160,9 +168,11 @@ func TestMonkey(t *testing.T) {
160168
run := func() {
161169
defer wg.Done()
162170

163-
ob := goob.New()
164-
defer ob.Close()
165-
s := ob.Subscribe()
171+
ctx, cancel := context.WithCancel(context.Background())
172+
defer cancel()
173+
174+
ob := goob.New(ctx)
175+
s := ob.Subscribe(ctx)
166176

167177
go func() {
168178
for range make([]null, size) {
@@ -198,9 +208,11 @@ func TestMonkey(t *testing.T) {
198208
}
199209

200210
func BenchmarkPublish(b *testing.B) {
201-
ob := goob.New()
202-
defer ob.Close()
203-
s := ob.Subscribe()
211+
ctx, cancel := context.WithCancel(context.Background())
212+
defer b.Cleanup(cancel)
213+
214+
ob := goob.New(ctx)
215+
s := ob.Subscribe(ctx)
204216

205217
for i := 0; i < runtime.NumCPU(); i++ {
206218
go func() {
@@ -219,9 +231,11 @@ func BenchmarkPublish(b *testing.B) {
219231
}
220232

221233
func BenchmarkConsume(b *testing.B) {
222-
ob := goob.New()
223-
defer ob.Close()
224-
s := ob.Subscribe()
234+
ctx, cancel := context.WithCancel(context.Background())
235+
defer b.Cleanup(cancel)
236+
237+
ob := goob.New(ctx)
238+
s := ob.Subscribe(ctx)
225239

226240
for i := 0; i < b.N; i++ {
227241
ob.Publish(nil)

pipe.go

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,23 @@
11
package goob
22

33
import (
4+
"context"
45
"sync"
56
)
67

78
// Event interface
89
type Event interface{}
910

10-
// Pipe the Event via Write to Events. Events uses an internal buffer so it won't block Write.
11-
// Call Stop to abort.
12-
type Pipe struct {
13-
Write func(Event)
14-
Events <-chan Event
15-
Stop func()
16-
}
11+
// Events channel
12+
type Events <-chan Event
1713

18-
// NewPipe instance
19-
func NewPipe() *Pipe {
14+
// NewPipe instance.
15+
// Pipe the Event via Write to Events. Events uses an internal buffer so it won't block Write.
16+
func NewPipe(ctx context.Context) (Write func(Event), Events <-chan Event) {
2017
events := make(chan Event)
2118
lock := sync.Mutex{}
2219
buf := []Event{} // using slice is faster than linked-list in general cases
2320
wait := make(chan struct{}, 1)
24-
stop := make(chan struct{})
2521

2622
write := func(e Event) {
2723
lock.Lock()
@@ -31,7 +27,7 @@ func NewPipe() *Pipe {
3127

3228
if len(wait) == 0 {
3329
select {
34-
case <-stop:
30+
case <-ctx.Done():
3531
return
3632
case wait <- struct{}{}:
3733
}
@@ -49,19 +45,19 @@ func NewPipe() *Pipe {
4945

5046
for _, e := range section {
5147
select {
52-
case <-stop:
48+
case <-ctx.Done():
5349
return
5450
case events <- e:
5551
}
5652
}
5753

5854
select {
59-
case <-stop:
55+
case <-ctx.Done():
6056
return
6157
case <-wait:
6258
}
6359
}
6460
}()
6561

66-
return &Pipe{write, events, func() { close(stop) }}
62+
return write, events
6763
}

0 commit comments

Comments
 (0)