Skip to content

Commit 7728c9e

Browse files
committed
Support shutdown controllers and watches dynamically
Signed-off-by: FillZpp <[email protected]>
1 parent f3f16a2 commit 7728c9e

File tree

7 files changed

+322
-22
lines changed

7 files changed

+322
-22
lines changed

pkg/controller/controller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ type Controller interface {
8686

8787
// GetLogger returns this controller logger prefilled with basic information.
8888
GetLogger() logr.Logger
89+
90+
// Stop stops the controller and all its watches dynamically.
91+
// Note that it will only trigger the stop but will not wait for them all stopped.
92+
Stop() error
8993
}
9094

9195
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have

pkg/controller/controllertest/util.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package controllertest
1818

1919
import (
20+
"fmt"
21+
"sync"
2022
"time"
2123

2224
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,7 +35,12 @@ type FakeInformer struct {
3335
// RunCount is incremented each time RunInformersAndControllers is called
3436
RunCount int
3537

36-
handlers []cache.ResourceEventHandler
38+
handlers []*listenHandler
39+
mu sync.RWMutex
40+
}
41+
42+
type listenHandler struct {
43+
cache.ResourceEventHandler
3744
}
3845

3946
// AddIndexers does nothing. TODO(community): Implement this.
@@ -58,8 +65,11 @@ func (f *FakeInformer) HasSynced() bool {
5865

5966
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
6067
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
61-
f.handlers = append(f.handlers, handler)
62-
return nil, nil
68+
f.mu.Lock()
69+
defer f.mu.Unlock()
70+
lh := &listenHandler{ResourceEventHandler: handler}
71+
f.handlers = append(f.handlers, lh)
72+
return &lh, nil
6373
}
6474

6575
// Run implements the Informer interface. Increments f.RunCount.
@@ -69,20 +79,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {
6979

7080
// Add fakes an Add event for obj.
7181
func (f *FakeInformer) Add(obj metav1.Object) {
82+
f.mu.RLock()
83+
defer f.mu.RUnlock()
7284
for _, h := range f.handlers {
7385
h.OnAdd(obj)
7486
}
7587
}
7688

7789
// Update fakes an Update event for obj.
7890
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
91+
f.mu.RLock()
92+
defer f.mu.RUnlock()
7993
for _, h := range f.handlers {
8094
h.OnUpdate(oldObj, newObj)
8195
}
8296
}
8397

8498
// Delete fakes an Delete event for obj.
8599
func (f *FakeInformer) Delete(obj metav1.Object) {
100+
f.mu.RLock()
101+
defer f.mu.RUnlock()
86102
for _, h := range f.handlers {
87103
h.OnDelete(obj)
88104
}
@@ -95,6 +111,21 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
95111

96112
// RemoveEventHandler does nothing. TODO(community): Implement this.
97113
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
114+
lh, ok := handle.(*listenHandler)
115+
if !ok {
116+
return fmt.Errorf("invalid key type %t", handle)
117+
}
118+
119+
f.mu.Lock()
120+
defer f.mu.Unlock()
121+
handlers := make([]*listenHandler, 0, len(f.handlers))
122+
for _, h := range f.handlers {
123+
if h == lh {
124+
continue
125+
}
126+
handlers = append(handlers, h)
127+
}
128+
f.handlers = handlers
98129
return nil
99130
}
100131

pkg/internal/controller/controller.go

Lines changed: 64 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controller
1818

1919
import (
2020
"context"
21-
"errors"
2221
"fmt"
2322
"sync"
2423
"time"
@@ -58,19 +57,28 @@ type Controller struct {
5857
// the Queue for processing
5958
Queue workqueue.RateLimitingInterface
6059

60+
// startedSources maintains a list of sources that have already started.
61+
startedSources []source.Source
62+
6163
// mu is used to synchronize Controller setup
6264
mu sync.Mutex
6365

6466
// Started is true if the Controller has been Started
6567
Started bool
6668

69+
// Stopped is true if the Controller has been Stopped
70+
Stopped bool
71+
6772
// ctx is the context that was passed to Start() and used when starting watches.
6873
//
6974
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
7075
// while we usually always strive to follow best practices, we consider this a legacy case and it should
7176
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
7277
ctx context.Context
7378

79+
// cancel is the CancelFunc of the ctx, to stop the controller and its watches dynamically.
80+
cancel context.CancelFunc
81+
7482
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
7583
// Defaults to 2 minutes if not set.
7684
CacheSyncTimeout time.Duration
@@ -123,6 +131,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
123131
c.mu.Lock()
124132
defer c.mu.Unlock()
125133

134+
if c.Stopped {
135+
return fmt.Errorf("can not start watch in a stopped controller")
136+
}
137+
126138
// Controller hasn't started yet, store the watches locally and return.
127139
//
128140
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
@@ -132,7 +144,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
132144
}
133145

134146
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
135-
return src.Start(c.ctx, evthdler, c.Queue, prct...)
147+
err := src.Start(c.ctx, evthdler, c.Queue, prct...)
148+
if err == nil {
149+
c.startedSources = append(c.startedSources, src)
150+
}
151+
return err
136152
}
137153

138154
// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
@@ -148,23 +164,21 @@ func (c *Controller) Start(ctx context.Context) error {
148164
// use an IIFE to get proper lock handling
149165
// but lock outside to get proper handling of the queue shutdown
150166
c.mu.Lock()
167+
if c.Stopped {
168+
return fmt.Errorf("can not restart a stopped controller, you should create a new one")
169+
}
151170
if c.Started {
152-
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
171+
return fmt.Errorf("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
153172
}
154173

155174
c.initMetrics()
156175

157176
// Set the internal context.
158-
c.ctx = ctx
177+
c.ctx, c.cancel = context.WithCancel(ctx)
159178

160179
c.Queue = c.MakeQueue()
161-
go func() {
162-
<-ctx.Done()
163-
c.Queue.ShutDown()
164-
}()
165-
166180
wg := &sync.WaitGroup{}
167-
err := func() error {
181+
startErr := func() error {
168182
defer c.mu.Unlock()
169183

170184
// TODO(pwittrock): Reconsider HandleCrash
@@ -179,6 +193,7 @@ func (c *Controller) Start(ctx context.Context) error {
179193
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
180194
return err
181195
}
196+
c.startedSources = append(c.startedSources, watch.src)
182197
}
183198

184199
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
@@ -231,8 +246,28 @@ func (c *Controller) Start(ctx context.Context) error {
231246
c.Started = true
232247
return nil
233248
}()
234-
if err != nil {
235-
return err
249+
250+
defer func() {
251+
c.mu.Lock()
252+
defer c.mu.Unlock()
253+
c.Stopped = true
254+
255+
c.cancel()
256+
c.Queue.ShutDown()
257+
for _, src := range c.startedSources {
258+
if stoppableSrc, ok := src.(source.StoppableSource); ok {
259+
if err := stoppableSrc.Stop(); err != nil {
260+
c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src)
261+
}
262+
} else {
263+
c.LogConstructor(nil).Info("Skip unstoppable watch source when controller stopping", "source", src)
264+
}
265+
}
266+
c.LogConstructor(nil).Info("All watch sources finished")
267+
}()
268+
269+
if startErr != nil {
270+
return startErr
236271
}
237272

238273
<-ctx.Done()
@@ -242,6 +277,23 @@ func (c *Controller) Start(ctx context.Context) error {
242277
return nil
243278
}
244279

280+
// Stop implements controller.Controller.
281+
func (c *Controller) Stop() error {
282+
c.mu.Lock()
283+
defer c.mu.Unlock()
284+
285+
if c.Stopped {
286+
return fmt.Errorf("can not stop a stopped controller")
287+
}
288+
if !c.Started {
289+
return fmt.Errorf("can not stop an unstarted controller")
290+
}
291+
c.Stopped = true
292+
c.cancel()
293+
294+
return nil
295+
}
296+
245297
// processNextWorkItem will read a single work item off the workqueue and
246298
// attempt to process it, by calling the reconcileHandler.
247299
func (c *Controller) processNextWorkItem(ctx context.Context) bool {

pkg/internal/controller/controller_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,33 @@ var _ = Describe("controller", func() {
311311
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
312312
})
313313

314+
It("should return an error if it gets start after stopped", func() {
315+
stoppedChan := make(chan struct{})
316+
go func() {
317+
Expect(ctrl.Start(context.TODO())).To(BeNil())
318+
close(stoppedChan)
319+
}()
320+
321+
// wait for started
322+
var started bool
323+
for !started {
324+
func() {
325+
ctrl.mu.Lock()
326+
defer ctrl.mu.Unlock()
327+
started = ctrl.Started
328+
}()
329+
}
330+
331+
err := ctrl.Stop()
332+
Expect(err).NotTo(BeNil())
333+
<-stoppedChan
334+
Expect(ctrl.Stopped).To(Equal(true))
335+
336+
err = ctrl.Start(context.TODO())
337+
Expect(err).NotTo(BeNil())
338+
Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one"))
339+
})
340+
314341
})
315342

316343
Describe("Processing queue items from a Controller", func() {

pkg/internal/source/kind.go

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"k8s.io/apimachinery/pkg/api/meta"
1011
"k8s.io/apimachinery/pkg/runtime"
1112
"k8s.io/apimachinery/pkg/util/wait"
13+
toolscache "k8s.io/client-go/tools/cache"
1214
"k8s.io/client-go/util/workqueue"
1315
"sigs.k8s.io/controller-runtime/pkg/cache"
1416
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -28,6 +30,11 @@ type Kind struct {
2830
// contain an error, startup and syncing finished.
2931
started chan error
3032
startCancel func()
33+
34+
informer cache.Informer
35+
mu sync.Mutex
36+
canceled bool
37+
eventHandlerRegistration toolscache.ResourceEventHandlerRegistration
3138
}
3239

3340
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
@@ -41,21 +48,25 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
4148
return fmt.Errorf("must create Kind with a non-nil cache")
4249
}
4350

51+
ks.mu.Lock()
52+
defer ks.mu.Unlock()
53+
// If it has been canceled before start, just ignore it.
54+
if ks.canceled {
55+
return nil
56+
}
57+
4458
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
4559
// sync that informer (most commonly due to RBAC issues).
4660
ctx, ks.startCancel = context.WithCancel(ctx)
4761
ks.started = make(chan error)
4862
go func() {
49-
var (
50-
i cache.Informer
51-
lastErr error
52-
)
63+
var lastErr error
5364

5465
// Tries to get an informer until it returns true,
5566
// an error or the specified context is cancelled or expired.
5667
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
5768
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
58-
i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
69+
ks.informer, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
5970
if lastErr != nil {
6071
kindMatchErr := &meta.NoKindMatchError{}
6172
switch {
@@ -79,7 +90,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
7990
return
8091
}
8192

82-
_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct))
93+
var err error
94+
ks.eventHandlerRegistration, err = ks.informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct))
8395
if err != nil {
8496
ks.started <- err
8597
return
@@ -115,3 +127,32 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
115127
return errors.New("timed out waiting for cache to be synced")
116128
}
117129
}
130+
131+
// Stop implements StoppableSource to stop it dynamically.
132+
func (ks *Kind) Stop() error {
133+
ks.mu.Lock()
134+
defer ks.mu.Unlock()
135+
if ks.canceled {
136+
return nil
137+
}
138+
ks.canceled = true
139+
140+
// Return if it has not been started.
141+
if ks.started == nil {
142+
return nil
143+
}
144+
145+
// Cancel if it is starting.
146+
select {
147+
case <-ks.started:
148+
default:
149+
ks.startCancel()
150+
// Wait for starting abort
151+
<-ks.started
152+
}
153+
154+
if ks.eventHandlerRegistration != nil {
155+
return ks.informer.RemoveEventHandler(ks.eventHandlerRegistration)
156+
}
157+
return nil
158+
}

0 commit comments

Comments
 (0)