Skip to content

⚠ Support shutdown controllers and watches dynamically #2099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Controller interface {

// GetLogger returns this controller logger prefilled with basic information.
GetLogger() logr.Logger

// Stop stops the controller and all its watches dynamically.
// Note that it will only trigger the stop but will not wait for them all stopped.
Stop() error
Comment on lines +89 to +92
Copy link
Member

@inteon inteon Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FillZpp Why do we add a Stop function here instead of canceling the context that was passed to Start?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Not all of the implementations of Source can stop, e.g., Func or some custom types.
  2. Users have no way to cancel the context passed from internal controller to source, to only stop the single watch.

}

// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
Expand Down
37 changes: 34 additions & 3 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package controllertest

import (
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,7 +35,12 @@ type FakeInformer struct {
// RunCount is incremented each time RunInformersAndControllers is called
RunCount int

handlers []cache.ResourceEventHandler
handlers []*listenHandler
mu sync.RWMutex
}

type listenHandler struct {
cache.ResourceEventHandler
}

// AddIndexers does nothing. TODO(community): Implement this.
Expand All @@ -58,8 +65,11 @@ func (f *FakeInformer) HasSynced() bool {

// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, handler)
return nil, nil
f.mu.Lock()
defer f.mu.Unlock()
lh := &listenHandler{ResourceEventHandler: handler}
f.handlers = append(f.handlers, lh)
return &lh, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand All @@ -69,20 +79,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {

// Add fakes an Add event for obj.
func (f *FakeInformer) Add(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnAdd(obj)
}
}

// Update fakes an Update event for obj.
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnUpdate(oldObj, newObj)
}
}

// Delete fakes an Delete event for obj.
func (f *FakeInformer) Delete(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnDelete(obj)
}
Expand All @@ -95,6 +111,21 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve

// RemoveEventHandler does nothing. TODO(community): Implement this.
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
lh, ok := handle.(*listenHandler)
if !ok {
return fmt.Errorf("invalid key type %t", handle)
}

f.mu.Lock()
defer f.mu.Unlock()
handlers := make([]*listenHandler, 0, len(f.handlers))
for _, h := range f.handlers {
if h == lh {
continue
}
handlers = append(handlers, h)
}
f.handlers = handlers
return nil
}

Expand Down
76 changes: 64 additions & 12 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package controller

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -58,19 +57,28 @@ type Controller struct {
// the Queue for processing
Queue workqueue.RateLimitingInterface

// startedSources maintains a list of sources that have already started.
startedSources []source.Source

// mu is used to synchronize Controller setup
mu sync.Mutex

// Started is true if the Controller has been Started
Started bool

// Stopped is true if the Controller has been Stopped
Stopped bool

// ctx is the context that was passed to Start() and used when starting watches.
//
// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
// while we usually always strive to follow best practices, we consider this a legacy case and it should
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context

// cancel is the CancelFunc of the ctx, to stop the controller and its watches dynamically.
cancel context.CancelFunc

// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
Expand Down Expand Up @@ -123,6 +131,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
c.mu.Lock()
defer c.mu.Unlock()

if c.Stopped {
return fmt.Errorf("can not start watch in a stopped controller")
}

// Controller hasn't started yet, store the watches locally and return.
//
// These watches are going to be held on the controller struct until the manager or user calls Start(...).
Expand All @@ -132,7 +144,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc
}

c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
err := src.Start(c.ctx, evthdler, c.Queue, prct...)
if err == nil {
c.startedSources = append(c.startedSources, src)
}
return err
}

// NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
Expand All @@ -148,23 +164,21 @@ func (c *Controller) Start(ctx context.Context) error {
// use an IIFE to get proper lock handling
// but lock outside to get proper handling of the queue shutdown
c.mu.Lock()
if c.Stopped {
return fmt.Errorf("can not restart a stopped controller, you should create a new one")
}
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
return fmt.Errorf("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}

c.initMetrics()

// Set the internal context.
c.ctx = ctx
c.ctx, c.cancel = context.WithCancel(ctx)

c.Queue = c.MakeQueue()
go func() {
<-ctx.Done()
c.Queue.ShutDown()
}()

wg := &sync.WaitGroup{}
err := func() error {
startErr := func() error {
defer c.mu.Unlock()

// TODO(pwittrock): Reconsider HandleCrash
Expand All @@ -179,6 +193,7 @@ func (c *Controller) Start(ctx context.Context) error {
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
c.startedSources = append(c.startedSources, watch.src)
}

// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
Expand Down Expand Up @@ -231,8 +246,28 @@ func (c *Controller) Start(ctx context.Context) error {
c.Started = true
return nil
}()
if err != nil {
return err

defer func() {
c.mu.Lock()
defer c.mu.Unlock()
c.Stopped = true

c.cancel()
c.Queue.ShutDown()
for _, src := range c.startedSources {
if stoppableSrc, ok := src.(source.StoppableSource); ok {
if err := stoppableSrc.Stop(); err != nil {
c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src)
}
} else {
c.LogConstructor(nil).Info("Skip unstoppable watch source when controller stopping", "source", src)
}
}
c.LogConstructor(nil).Info("All watch sources finished")
}()

if startErr != nil {
return startErr
}

<-ctx.Done()
Expand All @@ -242,6 +277,23 @@ func (c *Controller) Start(ctx context.Context) error {
return nil
}

// Stop implements controller.Controller.
func (c *Controller) Stop() error {
c.mu.Lock()
defer c.mu.Unlock()

if c.Stopped {
return fmt.Errorf("can not stop a stopped controller")
}
if !c.Started {
return fmt.Errorf("can not stop an unstarted controller")
}
c.Stopped = true
c.cancel()

return nil
}

// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the reconcileHandler.
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
Expand Down
27 changes: 27 additions & 0 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,33 @@ var _ = Describe("controller", func() {
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
})

It("should return an error if it gets start after stopped", func() {
stoppedChan := make(chan struct{})
go func() {
Expect(ctrl.Start(context.TODO())).To(BeNil())
close(stoppedChan)
}()

// wait for started
var started bool
for !started {
func() {
ctrl.mu.Lock()
defer ctrl.mu.Unlock()
started = ctrl.Started
}()
}

err := ctrl.Stop()
Expect(err).NotTo(BeNil())
<-stoppedChan
Expect(ctrl.Stopped).To(Equal(true))

err = ctrl.Start(context.TODO())
Expect(err).NotTo(BeNil())
Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one"))
})

})

Describe("Processing queue items from a Controller", func() {
Expand Down
53 changes: 47 additions & 6 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -28,6 +30,11 @@ type Kind struct {
// contain an error, startup and syncing finished.
started chan error
startCancel func()

informer cache.Informer
mu sync.Mutex
canceled bool
eventHandlerRegistration toolscache.ResourceEventHandlerRegistration
}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
Expand All @@ -41,21 +48,25 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return fmt.Errorf("must create Kind with a non-nil cache")
}

ks.mu.Lock()
defer ks.mu.Unlock()
// If it has been canceled before start, just ignore it.
if ks.canceled {
return nil
}

// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
var (
i cache.Informer
lastErr error
)
var lastErr error

// Tries to get an informer until it returns true,
// an error or the specified context is cancelled or expired.
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
ks.informer, lastErr = ks.Cache.GetInformer(ctx, ks.Type)
if lastErr != nil {
kindMatchErr := &meta.NoKindMatchError{}
switch {
Expand All @@ -79,7 +90,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}

_, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct))
var err error
ks.eventHandlerRegistration, err = ks.informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct))
if err != nil {
ks.started <- err
return
Expand Down Expand Up @@ -115,3 +127,32 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
return errors.New("timed out waiting for cache to be synced")
}
}

// Stop implements StoppableSource to stop it dynamically.
func (ks *Kind) Stop() error {
ks.mu.Lock()
defer ks.mu.Unlock()
if ks.canceled {
return nil
}
ks.canceled = true

// Return if it has not been started.
if ks.started == nil {
return nil
}

// Cancel if it is starting.
select {
case <-ks.started:
default:
ks.startCancel()
// Wait for starting abort
<-ks.started
}

if ks.eventHandlerRegistration != nil {
return ks.informer.RemoveEventHandler(ks.eventHandlerRegistration)
}
return nil
}
Loading