diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 5ece5ef7eb..249de73072 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -48,6 +48,8 @@ const ( defaultRenewDeadline = 10 * time.Second defaultRetryPeriod = 2 * time.Second + defaultRunnableTearDownTimeout = 10 * time.Second + defaultReadinessEndpoint = "/readyz" defaultLivenessEndpoint = "/healthz" ) @@ -114,11 +116,7 @@ type controllerManager struct { started bool startedLeader bool healthzStarted bool - - // NB(directxman12): we don't just use an error channel here to avoid the situation where the - // error channel is too small and we end up blocking some goroutines waiting to report their errors. - // errSignal lets us track when we should stop because an error occurred - errSignal *errSignaler + errChan chan error // internalStop is the stop channel *actually* used by everything involved // with the manager as a stop channel, so that we can pass a stop channel @@ -130,6 +128,9 @@ type controllerManager struct { // It and `internalStop` should point to the same channel. internalStopper chan<- struct{} + // stop procedure engaged. In other words, we should not add anything else to the manager + stopProcedureEngaged bool + startCache func(stop <-chan struct{}) error // port is the port that the webhook server serves at. @@ -152,57 +153,23 @@ type controllerManager struct { // retryPeriod is the duration the LeaderElector clients should wait // between tries of actions. retryPeriod time.Duration -} - -type errSignaler struct { - // errSignal indicates that an error occurred, when closed. It shouldn't - // be written to. - errSignal chan struct{} - - // err is the received error - err error - - mu sync.Mutex -} - -func (r *errSignaler) SignalError(err error) { - r.mu.Lock() - defer r.mu.Unlock() - - if err == nil { - // non-error, ignore - log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring") - return - } - - if r.err != nil { - // we already have an error, don't try again - return - } - - // save the error and report it - r.err = err - close(r.errSignal) -} -func (r *errSignaler) Error() error { - r.mu.Lock() - defer r.mu.Unlock() + // waitForRunnable is holding the number of runnables currently running so that + // we can wait for them to exit before quitting the manager + waitForRunnable sync.WaitGroup - return r.err -} - -func (r *errSignaler) GotError() chan struct{} { - r.mu.Lock() - defer r.mu.Unlock() - - return r.errSignal + // runnableTearDownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + runnableTearDownTimeout time.Duration } // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new runnable as stop procedure is already engaged") + } // Set dependencies on the object if err := cm.SetFields(r); err != nil { @@ -222,11 +189,7 @@ func (cm *controllerManager) Add(r Runnable) error { if shouldStart { // If already started, start the controller - go func() { - if err := r.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - }() + cm.startRunnable(r) } return nil @@ -264,6 +227,9 @@ func (cm *controllerManager) SetFields(i interface{}) error { func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new healthCheck as stop procedure is already engaged") + } if cm.healthzStarted { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") @@ -281,6 +247,9 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { + return fmt.Errorf("can't accept new ready check as stop procedure is already engaged") + } if cm.healthzStarted { return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") @@ -355,7 +324,7 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { go func() { log.Info("starting metrics server", "path", metricsPath) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + cm.errChan <- err } }() @@ -363,7 +332,7 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) { select { case <-stop: if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + cm.errChan <- err } } } @@ -385,7 +354,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { // Run server go func() { if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - cm.errSignal.SignalError(err) + cm.errChan <- err } }() cm.healthzStarted = true @@ -395,17 +364,18 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) { select { case <-stop: if err := server.Shutdown(context.Background()); err != nil { - cm.errSignal.SignalError(err) + cm.errChan <- err } } } func (cm *controllerManager) Start(stop <-chan struct{}) error { - // join the passed-in stop channel as an upstream feeding into cm.internalStopper - defer close(cm.internalStopper) + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request + stopComplete := make(chan struct{}) + defer close(stopComplete) // initialize this here so that we reset the signal channel state on every start - cm.errSignal = &errSignaler{errSignal: make(chan struct{})} + cm.errChan = make(chan error) // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape @@ -424,6 +394,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { + if errStop := cm.engageStopProcedure(stopComplete); errStop != nil { + log.Error(errStop, "some runnables could not be stopped after error occurred in startLeaderElection.") + } return err } } else { @@ -433,10 +406,50 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error { select { case <-stop: // We are done + return cm.engageStopProcedure(stopComplete) + case err := <-cm.errChan: + // Error starting or running a runnable + if errStop := cm.engageStopProcedure(stopComplete); errStop != nil { + log.Error(errStop, "some runnables could not be stopped after error occurred starting/running the manager.") + } + return err + } +} +func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error { + cm.mu.Lock() + defer cm.mu.Unlock() + cm.stopProcedureEngaged = true + close(cm.internalStopper) + go func() { + for { + select { + case err, ok := <-cm.errChan: + if ok { + log.Error(err, "error received after stop sequence was engaged") + } + case <-stopComplete: + return + } + } + }() + return cm.waitForRunnableToEnd() +} + +func (cm *controllerManager) waitForRunnableToEnd() error { + runnableTearDownTimer := time.NewTimer(cm.runnableTearDownTimeout) + defer runnableTearDownTimer.Stop() + allStopped := make(chan struct{}) + + go func() { + cm.waitForRunnable.Wait() + close(allStopped) + }() + + select { + case <-allStopped: return nil - case <-cm.errSignal.GotError(): - // Error starting a controller - return cm.errSignal.Error() + case <-runnableTearDownTimer.C: + return fmt.Errorf("not all runnables have stopped within the proposed delay of %s", cm.runnableTearDownTimeout.String()) } } @@ -453,7 +466,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { ctrl := c go func() { if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) + cm.errChan <- err } // we use %T here because we don't have a good stand-in for "name", // and the full runnable might not serialize (mutexes, etc) @@ -473,14 +486,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() { // Controllers block, but we want to return an error if any have an error starting. // Write any Start errors to a channel so we can return them ctrl := c - go func() { - if err := ctrl.Start(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) - } - // we use %T here because we don't have a good stand-in for "name", - // and the full runnable might not serialize (mutexes, etc) - log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl)) - }() + cm.startRunnable(ctrl) } cm.startedLeader = true @@ -497,7 +503,7 @@ func (cm *controllerManager) waitForCache() { } go func() { if err := cm.startCache(cm.internalStop); err != nil { - cm.errSignal.SignalError(err) + cm.errChan <- err } }() @@ -521,7 +527,7 @@ func (cm *controllerManager) startLeaderElection() (err error) { // Most implementations of leader election log.Fatal() here. // Since Start is wrapped in log.Fatal when called, we can just return // an error here which will cause the program to exit. - cm.errSignal.SignalError(fmt.Errorf("leader election lost")) + cm.errChan <- fmt.Errorf("leader election lost") }, }, }) @@ -542,3 +548,13 @@ func (cm *controllerManager) startLeaderElection() (err error) { go l.Run(ctx) return nil } + +func (cm *controllerManager) startRunnable(r Runnable) { + cm.waitForRunnable.Add(1) + go func() { + defer cm.waitForRunnable.Done() + if err := r.Start(cm.internalStop); err != nil { + cm.errChan <- err + } + }() +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index d30c4f0d37..ba0dce26f6 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -185,6 +185,10 @@ type Options struct { // Use this to customize the event correlator and spam filter EventBroadcaster record.EventBroadcaster + // RunnableTearDownTimeout is the duration given to runnable to stop + // before the manager actually returns on stop. + RunnableTearDownTimeout *time.Duration + // Dependency injection for testing newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) @@ -289,27 +293,28 @@ func New(config *rest.Config, options Options) (Manager, error) { stop := make(chan struct{}) return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, - recorderProvider: recorderProvider, - resourceLock: resourceLock, - mapper: mapper, - metricsListener: metricsListener, - internalStop: stop, - internalStopper: stop, - port: options.Port, - host: options.Host, - certDir: options.CertDir, - leaseDuration: *options.LeaseDuration, - renewDeadline: *options.RenewDeadline, - retryPeriod: *options.RetryPeriod, - healthProbeListener: healthProbeListener, - readinessEndpointName: options.ReadinessEndpointName, - livenessEndpointName: options.LivenessEndpointName, + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + resourceLock: resourceLock, + mapper: mapper, + metricsListener: metricsListener, + internalStop: stop, + internalStopper: stop, + port: options.Port, + host: options.Host, + certDir: options.CertDir, + leaseDuration: *options.LeaseDuration, + renewDeadline: *options.RenewDeadline, + retryPeriod: *options.RetryPeriod, + healthProbeListener: healthProbeListener, + readinessEndpointName: options.ReadinessEndpointName, + livenessEndpointName: options.LivenessEndpointName, + runnableTearDownTimeout: *options.RunnableTearDownTimeout, }, nil } @@ -409,5 +414,10 @@ func setOptionsDefaults(options Options) Options { options.newHealthProbeListener = defaultHealthProbeListener } + if options.RunnableTearDownTimeout == nil { + runnableTearDownTimeout := defaultRunnableTearDownTimeout + options.RunnableTearDownTimeout = &runnableTearDownTimeout + } + return options } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 738cac7c8a..cda42ebbc9 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -21,6 +21,8 @@ import ( "io/ioutil" "net" "net/http" + "sync" + "time" "github.com/go-logr/logr" . "github.com/onsi/ginkgo" @@ -241,17 +243,17 @@ var _ = Describe("manger.Manager", func() { It("should Start each Component", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) + var wgRunnableStarted sync.WaitGroup + wgRunnableStarted.Add(2) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + wgRunnableStarted.Done() return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) + wgRunnableStarted.Done() return nil }))).To(Succeed()) @@ -259,8 +261,8 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Start(stop)).NotTo(HaveOccurred()) }() - <-c1 - <-c2 + + wgRunnableStarted.Wait() close(done) }) @@ -291,41 +293,133 @@ var _ = Describe("manger.Manager", func() { It("should return an error if any Components fail to Start", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) - c1 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c1) + <-s return nil }))).To(Succeed()) - c2 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c2) return fmt.Errorf("expected error") }))).To(Succeed()) - c3 := make(chan struct{}) Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { defer GinkgoRecover() - close(c3) return nil }))).To(Succeed()) - go func() { + defer GinkgoRecover() + Expect(m.Start(stop)).NotTo(Succeed()) //.To(HaveOccurred()) + + close(done) + }) + + It("should wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(2) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() defer GinkgoRecover() - // NB(directxman12): this should definitely return an error. If it doesn't happen, - // it means someone was signaling "stop: error" with a nil "error". - Expect(m.Start(stop)).NotTo(Succeed()) - close(done) + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + time.Sleep(300 * time.Millisecond) //slow closure simulation + return nil + }))).To(Succeed()) + + defer GinkgoRecover() + s := make(chan struct{}) + + var wgManagerRunning sync.WaitGroup + wgManagerRunning.Add(1) + go func() { + defer wgManagerRunning.Done() + Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) }() - <-c1 - <-c2 - <-c3 + wgRunnableRunning.Wait() // ensure that runnable are running + close(s) + + wgManagerRunning.Wait() // wait for the manager clean exit before closing the test + close(done) + }) + + It("should return an error if any Components fail to Start and wait for runnables to stop", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + defer GinkgoRecover() + var lock sync.Mutex + runnableDoneCount := 0 + runnableDoneFunc := func() { + lock.Lock() + defer lock.Unlock() + runnableDoneCount++ + } + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + defer runnableDoneFunc() + return fmt.Errorf("expected error") + }))).To(Succeed()) + + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + defer runnableDoneFunc() + <-s + return nil + }))).To(Succeed()) + + Expect(m.Start(stop)).To(HaveOccurred()) + Expect(runnableDoneCount).To(Equal(2)) + + close(done) }) - It("should return an error if any non-leaderelection Components fail to Start", func() { - // TODO(mengqiy): implement this after resolving https://github.com/kubernetes-sigs/controller-runtime/issues/429 + It("should refuse to add runnable if stop procedure is already engaged", func(done Done) { + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + defer GinkgoRecover() + + var wgRunnableRunning sync.WaitGroup + wgRunnableRunning.Add(1) + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + wgRunnableRunning.Done() + defer GinkgoRecover() + <-s + return nil + }))).To(Succeed()) + + s := make(chan struct{}) + go func() { + Expect(m.Start(s)).NotTo(HaveOccurred()) + }() + wgRunnableRunning.Wait() + close(s) + time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager + Expect(m.Add(RunnableFunc(func(s <-chan struct{}) error { + defer GinkgoRecover() + return nil + }))).NotTo(Succeed()) + + close(done) }) }