diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bd29ba9a57..42b94a63a2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -86,6 +86,10 @@ type Controller interface { // GetLogger returns this controller logger prefilled with basic information. GetLogger() logr.Logger + + // Stop stops the controller and all its watches dynamically. + // Note that it will only trigger the stop but will not wait for them all stopped. + Stop() error } // New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 17641e9c02..5733396751 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -17,6 +17,8 @@ limitations under the License. package controllertest import ( + "fmt" + "sync" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +35,12 @@ type FakeInformer struct { // RunCount is incremented each time RunInformersAndControllers is called RunCount int - handlers []cache.ResourceEventHandler + handlers []*listenHandler + mu sync.RWMutex +} + +type listenHandler struct { + cache.ResourceEventHandler } // AddIndexers does nothing. TODO(community): Implement this. @@ -58,8 +65,11 @@ func (f *FakeInformer) HasSynced() bool { // AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { - f.handlers = append(f.handlers, handler) - return nil, nil + f.mu.Lock() + defer f.mu.Unlock() + lh := &listenHandler{ResourceEventHandler: handler} + f.handlers = append(f.handlers, lh) + return &lh, nil } // Run implements the Informer interface. Increments f.RunCount. @@ -69,6 +79,8 @@ func (f *FakeInformer) Run(<-chan struct{}) { // Add fakes an Add event for obj. func (f *FakeInformer) Add(obj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnAdd(obj) } @@ -76,6 +88,8 @@ func (f *FakeInformer) Add(obj metav1.Object) { // Update fakes an Update event for obj. func (f *FakeInformer) Update(oldObj, newObj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnUpdate(oldObj, newObj) } @@ -83,6 +97,8 @@ func (f *FakeInformer) Update(oldObj, newObj metav1.Object) { // Delete fakes an Delete event for obj. func (f *FakeInformer) Delete(obj metav1.Object) { + f.mu.RLock() + defer f.mu.RUnlock() for _, h := range f.handlers { h.OnDelete(obj) } @@ -95,6 +111,21 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve // RemoveEventHandler does nothing. TODO(community): Implement this. func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error { + lh, ok := handle.(*listenHandler) + if !ok { + return fmt.Errorf("invalid key type %t", handle) + } + + f.mu.Lock() + defer f.mu.Unlock() + handlers := make([]*listenHandler, 0, len(f.handlers)) + for _, h := range f.handlers { + if h == lh { + continue + } + handlers = append(handlers, h) + } + f.handlers = handlers return nil } diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 969eeeb7d2..2bdc6678b4 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -18,7 +18,6 @@ package controller import ( "context" - "errors" "fmt" "sync" "time" @@ -58,12 +57,18 @@ type Controller struct { // the Queue for processing Queue workqueue.RateLimitingInterface + // startedSources maintains a list of sources that have already started. + startedSources []source.Source + // mu is used to synchronize Controller setup mu sync.Mutex // Started is true if the Controller has been Started Started bool + // Stopped is true if the Controller has been Stopped + Stopped bool + // ctx is the context that was passed to Start() and used when starting watches. // // According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context, @@ -71,6 +76,9 @@ type Controller struct { // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context + // cancel is the CancelFunc of the ctx, to stop the controller and its watches dynamically. + cancel context.CancelFunc + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. CacheSyncTimeout time.Duration @@ -123,6 +131,10 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc c.mu.Lock() defer c.mu.Unlock() + if c.Stopped { + return fmt.Errorf("can not start watch in a stopped controller") + } + // Controller hasn't started yet, store the watches locally and return. // // These watches are going to be held on the controller struct until the manager or user calls Start(...). @@ -132,7 +144,11 @@ func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prc } c.LogConstructor(nil).Info("Starting EventSource", "source", src) - return src.Start(c.ctx, evthdler, c.Queue, prct...) + err := src.Start(c.ctx, evthdler, c.Queue, prct...) + if err == nil { + c.startedSources = append(c.startedSources, src) + } + return err } // NeedLeaderElection implements the manager.LeaderElectionRunnable interface. @@ -148,23 +164,21 @@ func (c *Controller) Start(ctx context.Context) error { // use an IIFE to get proper lock handling // but lock outside to get proper handling of the queue shutdown c.mu.Lock() + if c.Stopped { + return fmt.Errorf("can not restart a stopped controller, you should create a new one") + } if c.Started { - return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") + return fmt.Errorf("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } c.initMetrics() // Set the internal context. - c.ctx = ctx + c.ctx, c.cancel = context.WithCancel(ctx) c.Queue = c.MakeQueue() - go func() { - <-ctx.Done() - c.Queue.ShutDown() - }() - wg := &sync.WaitGroup{} - err := func() error { + startErr := func() error { defer c.mu.Unlock() // TODO(pwittrock): Reconsider HandleCrash @@ -179,6 +193,7 @@ func (c *Controller) Start(ctx context.Context) error { if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } + c.startedSources = append(c.startedSources, watch.src) } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches @@ -231,8 +246,28 @@ func (c *Controller) Start(ctx context.Context) error { c.Started = true return nil }() - if err != nil { - return err + + defer func() { + c.mu.Lock() + defer c.mu.Unlock() + c.Stopped = true + + c.cancel() + c.Queue.ShutDown() + for _, src := range c.startedSources { + if stoppableSrc, ok := src.(source.StoppableSource); ok { + if err := stoppableSrc.Stop(); err != nil { + c.LogConstructor(nil).Error(err, "Failed to stop watch source when controller stopping", "source", src) + } + } else { + c.LogConstructor(nil).Info("Skip unstoppable watch source when controller stopping", "source", src) + } + } + c.LogConstructor(nil).Info("All watch sources finished") + }() + + if startErr != nil { + return startErr } <-ctx.Done() @@ -242,6 +277,23 @@ func (c *Controller) Start(ctx context.Context) error { return nil } +// Stop implements controller.Controller. +func (c *Controller) Stop() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.Stopped { + return fmt.Errorf("can not stop a stopped controller") + } + if !c.Started { + return fmt.Errorf("can not stop an unstarted controller") + } + c.Stopped = true + c.cancel() + + return nil +} + // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the reconcileHandler. func (c *Controller) processNextWorkItem(ctx context.Context) bool { diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 9024556fd0..aa922a6126 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -311,6 +311,33 @@ var _ = Describe("controller", func() { Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times")) }) + It("should return an error if it gets start after stopped", func() { + stoppedChan := make(chan struct{}) + go func() { + Expect(ctrl.Start(context.TODO())).To(BeNil()) + close(stoppedChan) + }() + + // wait for started + var started bool + for !started { + func() { + ctrl.mu.Lock() + defer ctrl.mu.Unlock() + started = ctrl.Started + }() + } + + err := ctrl.Stop() + Expect(err).NotTo(BeNil()) + <-stoppedChan + Expect(ctrl.Stopped).To(Equal(true)) + + err = ctrl.Start(context.TODO()) + Expect(err).NotTo(BeNil()) + Expect(err.Error()).To(Equal("can not restart a stopped controller, you should create a new one")) + }) + }) Describe("Processing queue items from a Controller", func() { diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 2e765acce8..19cf6ac758 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "sync" "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -28,6 +30,11 @@ type Kind struct { // contain an error, startup and syncing finished. started chan error startCancel func() + + informer cache.Informer + mu sync.Mutex + canceled bool + eventHandlerRegistration toolscache.ResourceEventHandlerRegistration } // Start is internal and should be called only by the Controller to register an EventHandler with the Informer @@ -41,21 +48,25 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return fmt.Errorf("must create Kind with a non-nil cache") } + ks.mu.Lock() + defer ks.mu.Unlock() + // If it has been canceled before start, just ignore it. + if ks.canceled { + return nil + } + // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not // sync that informer (most commonly due to RBAC issues). ctx, ks.startCancel = context.WithCancel(ctx) ks.started = make(chan error) go func() { - var ( - i cache.Informer - lastErr error - ) + var lastErr error // Tries to get an informer until it returns true, // an error or the specified context is cancelled or expired. if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) { // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, lastErr = ks.Cache.GetInformer(ctx, ks.Type) + ks.informer, lastErr = ks.Cache.GetInformer(ctx, ks.Type) if lastErr != nil { kindMatchErr := &meta.NoKindMatchError{} switch { @@ -79,7 +90,8 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } - _, err := i.AddEventHandler(NewEventHandler(ctx, queue, handler, prct)) + var err error + ks.eventHandlerRegistration, err = ks.informer.AddEventHandler(NewEventHandler(ctx, queue, handler, prct)) if err != nil { ks.started <- err return @@ -115,3 +127,32 @@ func (ks *Kind) WaitForSync(ctx context.Context) error { return errors.New("timed out waiting for cache to be synced") } } + +// Stop implements StoppableSource to stop it dynamically. +func (ks *Kind) Stop() error { + ks.mu.Lock() + defer ks.mu.Unlock() + if ks.canceled { + return nil + } + ks.canceled = true + + // Return if it has not been started. + if ks.started == nil { + return nil + } + + // Cancel if it is starting. + select { + case <-ks.started: + default: + ks.startCancel() + // Wait for starting abort + <-ks.started + } + + if ks.eventHandlerRegistration != nil { + return ks.informer.RemoveEventHandler(ks.eventHandlerRegistration) + } + return nil +} diff --git a/pkg/source/source.go b/pkg/source/source.go index 5fb7c439b6..34005f2f30 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -21,6 +21,7 @@ import ( "fmt" "sync" + toolscache "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -57,6 +58,12 @@ type SyncingSource interface { WaitForSync(ctx context.Context) error } +// StoppableSource is a source that can be stopped after starting. +type StoppableSource interface { + Source + Stop() error +} + // Kind creates a KindSource with the given cache provider. func Kind(cache cache.Cache, object client.Object) SyncingSource { return &internal.Kind{Type: object, Cache: cache} @@ -191,6 +198,10 @@ func (cs *Channel) syncLoop(ctx context.Context) { type Informer struct { // Informer is the controller-runtime Informer Informer cache.Informer + + mu sync.Mutex + canceled bool + eventHandlerRegistration toolscache.ResourceEventHandlerRegistration } var _ Source = &Informer{} @@ -204,7 +215,14 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que return fmt.Errorf("must specify Informer.Informer") } - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct)) + is.mu.Lock() + defer is.mu.Unlock() + if is.canceled { + return nil + } + + var err error + is.eventHandlerRegistration, err = is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, handler, prct)) if err != nil { return err } @@ -215,6 +233,18 @@ func (is *Informer) String() string { return fmt.Sprintf("informer source: %p", is.Informer) } +// Stop implements StoppableSource to stop it dynamically. +func (is *Informer) Stop() error { + is.mu.Lock() + defer is.mu.Unlock() + is.canceled = true + + if is.eventHandlerRegistration != nil { + return is.Informer.RemoveEventHandler(is.eventHandlerRegistration) + } + return nil +} + var _ Source = Func(nil) // Func is a function that implements Source. diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 594d3c9a9c..6ae13e0edd 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -186,6 +186,121 @@ var _ = Describe("Source", func() { deleteEvt.Object.SetResourceVersion("") Expect(deleteEvt.Object).To(Equal(deleted)) }) + + It("should not get events after stopped", func() { + var created, updated *appsv1.Deployment + var err error + + // Get the client and Deployment used to create events + client := clientset.AppsV1().Deployments(ns) + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: "deployment-name-2"}, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + }, + }, + } + + // Create an event handler to verify the events + newHandler := func(c chan interface{}) handler.Funcs { + return handler.Funcs{ + CreateFunc: func(ctx context.Context, evt event.CreateEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(Equal(q)) + c <- evt + }, + UpdateFunc: func(ctx context.Context, evt event.UpdateEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(Equal(q)) + c <- evt + }, + DeleteFunc: func(ctx context.Context, evt event.DeleteEvent, rli workqueue.RateLimitingInterface) { + defer GinkgoRecover() + Expect(rli).To(Equal(q)) + c <- evt + }, + } + } + handler1 := newHandler(c1) + handler2 := newHandler(c2) + + // Create 2 instances + Expect(instance1.Start(ctx, handler1, q)).To(Succeed()) + Expect(instance2.Start(ctx, handler2, q)).To(Succeed()) + + By("Creating a Deployment and expecting the CreateEvent.") + created, err = client.Create(ctx, deployment, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + Expect(created).NotTo(BeNil()) + + // Check first CreateEvent + evt := <-c1 + createEvt, ok := evt.(event.CreateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{})) + Expect(createEvt.Object).To(Equal(created)) + + // Check second CreateEvent + evt = <-c2 + createEvt, ok = evt.(event.CreateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.CreateEvent{})) + Expect(createEvt.Object).To(Equal(created)) + + By("Stop the second kind source") + err = instance2.(source.StoppableSource).Stop() + Expect(err).NotTo(HaveOccurred()) + + By("Updating a Deployment and expecting the UpdateEvent.") + updated = created.DeepCopy() + updated.Labels = map[string]string{"biz": "buz"} + updated, err = client.Update(ctx, updated, metav1.UpdateOptions{}) + Expect(err).NotTo(HaveOccurred()) + + // Check first UpdateEvent + evt = <-c1 + updateEvt, ok := evt.(event.UpdateEvent) + Expect(ok).To(BeTrue(), fmt.Sprintf("expect %T to be %T", evt, event.UpdateEvent{})) + + Expect(updateEvt.ObjectNew).To(Equal(updated)) + + Expect(updateEvt.ObjectOld).To(Equal(created)) + + // Check second UpdateEvent should not receive + waitCtx1, cancel1 := context.WithTimeout(context.Background(), time.Second) + defer cancel1() + select { + case <-c2: + Fail("kind2 is expected to be stopped") + case <-waitCtx1.Done(): + } + + By("Stop the first kind source") + err = instance1.(source.StoppableSource).Stop() + Expect(err).NotTo(HaveOccurred()) + + By("Deleting a Deployment and expecting the Delete.") + err = client.Delete(ctx, created.Name, metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + + waitCtx2, cancel2 := context.WithTimeout(context.Background(), time.Second) + defer cancel2() + select { + case <-c1: + Fail("kind1 is expected to be stopped") + case <-waitCtx2.Done(): + } + }) }) // TODO(pwittrock): Write this test