Skip to content

Commit 0cebcae

Browse files
committed
only allow starting a source once
Signed-off-by: Tim Ramlot <[email protected]>
1 parent a4e8aca commit 0cebcae

File tree

10 files changed

+396
-170
lines changed

10 files changed

+396
-170
lines changed

pkg/controller/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {
7777

7878
ctx, cancel := context.WithCancel(context.Background())
7979
watchChan := make(chan event.GenericEvent, 1)
80-
watch := &source.Channel{Source: watchChan}
80+
watch := source.Channel(source.NewChannelBroadcaster(watchChan))
8181
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8282

8383
reconcileStarted := make(chan struct{})

pkg/internal/controller/controller_test.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,7 @@ var _ = Describe("controller", func() {
226226
Object: p,
227227
}
228228

229-
ins := &source.Channel{Source: ch}
230-
ins.DestBufferSize = 1
229+
ins := source.Channel(source.NewChannelBroadcaster(ch), source.WithDestBufferSize(1))
231230

232231
// send the event to the channel
233232
ch <- evt
@@ -249,18 +248,35 @@ var _ = Describe("controller", func() {
249248
<-processed
250249
})
251250

252-
It("should error when channel source is not specified", func() {
251+
It("should error when ChannelBroadcaster is not specified", func() {
253252
ctx, cancel := context.WithCancel(context.Background())
254253
defer cancel()
255254

256-
ins := &source.Channel{}
255+
ins := source.Channel(nil)
257256
ctrl.startWatches = []watchDescription{{
258257
src: ins,
259258
}}
260259

261260
e := ctrl.Start(ctx)
262261
Expect(e).To(HaveOccurred())
263-
Expect(e.Error()).To(ContainSubstring("must specify Channel.Source"))
262+
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil Broadcaster"))
263+
})
264+
265+
It("should error when channel eventhandler is not specified", func() {
266+
ctx, cancel := context.WithCancel(context.Background())
267+
defer cancel()
268+
269+
ins := source.Channel(
270+
source.NewChannelBroadcaster(make(<-chan event.GenericEvent)),
271+
)
272+
ctrl.startWatches = []watchDescription{{
273+
src: ins,
274+
handler: nil,
275+
}}
276+
277+
e := ctrl.Start(ctx)
278+
Expect(e).To(HaveOccurred())
279+
Expect(e.Error()).To(ContainSubstring("must create Channel with a non-nil EventHandler"))
264280
})
265281

266282
It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {

pkg/source/example_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ func ExampleChannel() {
4343
events := make(chan event.GenericEvent)
4444

4545
err := ctrl.Watch(
46-
&source.Channel{Source: events},
46+
source.Channel(
47+
source.NewChannelBroadcaster(events),
48+
),
4749
&handler.EnqueueRequestForObject{},
4850
)
4951
if err != nil {

pkg/source/internal/channel.go

Lines changed: 58 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -27,30 +27,25 @@ import (
2727
"sigs.k8s.io/controller-runtime/pkg/predicate"
2828
)
2929

30-
const (
31-
// defaultBufferSize is the default number of event notifications that can be buffered.
32-
defaultBufferSize = 1024
33-
)
30+
// ChannelOptions contains the options for the Channel source.
31+
type ChannelOptions struct {
32+
// DestBufferSize is the specified buffer size of dest channels.
33+
// Default to 1024 if not specified.
34+
DestBufferSize int
35+
}
3436

3537
// Channel is used to provide a source of events originating outside the cluster
3638
// (e.g. GitHub Webhook callback). Channel requires the user to wire the external
3739
// source (eh.g. http handler) to write GenericEvents to the underlying channel.
3840
type Channel struct {
39-
// once ensures the event distribution goroutine will be performed only once
40-
once sync.Once
41-
42-
// Source is the source channel to fetch GenericEvents
43-
Source <-chan event.GenericEvent
41+
Options ChannelOptions
4442

45-
// dest is the destination channels of the added event handlers
46-
dest []chan event.GenericEvent
47-
48-
// DestBufferSize is the specified buffer size of dest channels.
49-
// Default to 1024 if not specified.
50-
DestBufferSize int
43+
// Broadcaster contains the source channel for events.
44+
Broadcaster *ChannelBroadcaster
5145

52-
// destLock is to ensure the destination channels are safely added/removed
53-
destLock sync.Mutex
46+
mu sync.Mutex
47+
// isStarted is true if the source has been started. A source can only be started once.
48+
isStarted bool
5449
}
5550

5651
func (cs *Channel) String() string {
@@ -63,88 +58,70 @@ func (cs *Channel) Start(
6358
handler handler.EventHandler,
6459
queue workqueue.RateLimitingInterface,
6560
prct ...predicate.Predicate) error {
66-
// Source should have been specified by the user.
67-
if cs.Source == nil {
68-
return fmt.Errorf("must specify Channel.Source")
61+
// Broadcaster should have been specified by the user.
62+
if cs.Broadcaster == nil {
63+
return fmt.Errorf("must create Channel with a non-nil Broadcaster")
6964
}
70-
71-
// use default value if DestBufferSize not specified
72-
if cs.DestBufferSize == 0 {
73-
cs.DestBufferSize = defaultBufferSize
65+
if handler == nil {
66+
return fmt.Errorf("must create Channel with a non-nil EventHandler")
7467
}
7568

76-
dst := make(chan event.GenericEvent, cs.DestBufferSize)
77-
78-
cs.destLock.Lock()
79-
cs.dest = append(cs.dest, dst)
80-
cs.destLock.Unlock()
69+
cs.mu.Lock()
70+
defer cs.mu.Unlock()
71+
if cs.isStarted {
72+
return fmt.Errorf("cannot start an already started Channel source")
73+
}
74+
cs.isStarted = true
8175

82-
cs.once.Do(func() {
83-
// Distribute GenericEvents to all EventHandler / Queue pairs Watching this source
84-
go cs.syncLoop(ctx)
85-
})
76+
// Create a destination channel for the event handler
77+
// and add it to the list of destinations
78+
destination := make(chan event.GenericEvent, cs.Options.DestBufferSize)
79+
cs.Broadcaster.AddListener(destination)
8680

8781
go func() {
88-
for evt := range dst {
89-
shouldHandle := true
90-
for _, p := range prct {
91-
if !p.Generic(evt) {
92-
shouldHandle = false
93-
break
94-
}
95-
}
96-
97-
if shouldHandle {
98-
func() {
99-
ctx, cancel := context.WithCancel(ctx)
100-
defer cancel()
101-
handler.Generic(ctx, evt, queue)
102-
}()
103-
}
104-
}
82+
// Remove the listener and wait for the broadcaster
83+
// to stop sending events to the destination channel.
84+
defer cs.Broadcaster.RemoveListener(destination)
85+
86+
cs.processReceivedEvents(
87+
ctx,
88+
destination,
89+
queue,
90+
handler,
91+
prct,
92+
)
10593
}()
10694

10795
return nil
10896
}
10997

110-
func (cs *Channel) doStop() {
111-
cs.destLock.Lock()
112-
defer cs.destLock.Unlock()
113-
114-
for _, dst := range cs.dest {
115-
close(dst)
116-
}
117-
}
118-
119-
func (cs *Channel) distribute(evt event.GenericEvent) {
120-
cs.destLock.Lock()
121-
defer cs.destLock.Unlock()
122-
123-
for _, dst := range cs.dest {
124-
// We cannot make it under goroutine here, or we'll meet the
125-
// race condition of writing message to closed channels.
126-
// To avoid blocking, the dest channels are expected to be of
127-
// proper buffer size. If we still see it blocked, then
128-
// the controller is thought to be in an abnormal state.
129-
dst <- evt
130-
}
131-
}
132-
133-
func (cs *Channel) syncLoop(ctx context.Context) {
98+
func (cs *Channel) processReceivedEvents(
99+
ctx context.Context,
100+
destination <-chan event.GenericEvent,
101+
queue workqueue.RateLimitingInterface,
102+
eventHandler handler.EventHandler,
103+
predicates []predicate.Predicate,
104+
) {
105+
eventloop:
134106
for {
135107
select {
136108
case <-ctx.Done():
137-
// Close destination channels
138-
cs.doStop()
139109
return
140-
case evt, stillOpen := <-cs.Source:
110+
case event, stillOpen := <-destination:
141111
if !stillOpen {
142-
// if the source channel is closed, we're never gonna get
143-
// anything more on it, so stop & bail
144-
cs.doStop()
145112
return
146113
}
147-
cs.distribute(evt)
114+
115+
// Check predicates against the event first
116+
// and continue the outer loop if any of them fail.
117+
for _, p := range predicates {
118+
if !p.Generic(event) {
119+
continue eventloop
120+
}
121+
}
122+
123+
// Call the event handler with the event.
124+
eventHandler.Generic(ctx, event, queue)
148125
}
149126
}
150127
}

0 commit comments

Comments
 (0)