Skip to content

Commit 01d7292

Browse files
alvaroalemandbenque
andcommitted
✨ Implement graceful shutdown
Co-authored-by: david.benque <[email protected]>
1 parent 2f1457b commit 01d7292

File tree

3 files changed

+407
-151
lines changed

3 files changed

+407
-151
lines changed

pkg/manager/internal.go

Lines changed: 130 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ package manager
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"net"
2324
"net/http"
25+
"os"
2426
"sync"
2527
"time"
2628

@@ -44,9 +46,10 @@ import (
4446

4547
const (
4648
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
47-
defaultLeaseDuration = 15 * time.Second
48-
defaultRenewDeadline = 10 * time.Second
49-
defaultRetryPeriod = 2 * time.Second
49+
defaultLeaseDuration = 15 * time.Second
50+
defaultRenewDeadline = 10 * time.Second
51+
defaultRetryPeriod = 2 * time.Second
52+
defaultGracefulShutdownPeriod = 30 * time.Second
5053

5154
defaultReadinessEndpoint = "/readyz"
5255
defaultLivenessEndpoint = "/healthz"
@@ -118,11 +121,7 @@ type controllerManager struct {
118121
started bool
119122
startedLeader bool
120123
healthzStarted bool
121-
122-
// NB(directxman12): we don't just use an error channel here to avoid the situation where the
123-
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
124-
// errSignal lets us track when we should stop because an error occurred
125-
errSignal *errSignaler
124+
errChan chan error
126125

127126
// internalStop is the stop channel *actually* used by everything involved
128127
// with the manager as a stop channel, so that we can pass a stop channel
@@ -134,6 +133,9 @@ type controllerManager struct {
134133
// It and `internalStop` should point to the same channel.
135134
internalStopper chan<- struct{}
136135

136+
// stop procedure engaged. In other words, we should not add anything else to the manager
137+
stopProcedureEngaged bool
138+
137139
// elected is closed when this manager becomes the leader of a group of
138140
// managers, either because it won a leader election or because no leader
139141
// election was configured.
@@ -161,57 +163,27 @@ type controllerManager struct {
161163
// retryPeriod is the duration the LeaderElector clients should wait
162164
// between tries of actions.
163165
retryPeriod time.Duration
164-
}
165166

166-
type errSignaler struct {
167-
// errSignal indicates that an error occurred, when closed. It shouldn't
168-
// be written to.
169-
errSignal chan struct{}
167+
// waitForRunnable is holding the number of runnables currently running so that
168+
// we can wait for them to exit before quitting the manager
169+
waitForRunnable sync.WaitGroup
170170

171-
// err is the received error
172-
err error
171+
// gracefulShutdownTimeout is the duration given to runnable to stop
172+
// before the manager actually returns on stop.
173+
gracefulShutdownTimeout time.Duration
173174

174-
mu sync.Mutex
175-
}
176-
177-
func (r *errSignaler) SignalError(err error) {
178-
r.mu.Lock()
179-
defer r.mu.Unlock()
180-
181-
if err == nil {
182-
// non-error, ignore
183-
log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring")
184-
return
185-
}
186-
187-
if r.err != nil {
188-
// we already have an error, don't try again
189-
return
190-
}
191-
192-
// save the error and report it
193-
r.err = err
194-
close(r.errSignal)
195-
}
196-
197-
func (r *errSignaler) Error() error {
198-
r.mu.Lock()
199-
defer r.mu.Unlock()
200-
201-
return r.err
202-
}
203-
204-
func (r *errSignaler) GotError() chan struct{} {
205-
r.mu.Lock()
206-
defer r.mu.Unlock()
207-
208-
return r.errSignal
175+
// onStoppedLeading is callled when the leader election lease is lost.
176+
// It can be overridden for tests.
177+
onStoppedLeading func()
209178
}
210179

211180
// Add sets dependencies on i, and adds it to the list of Runnables to start.
212181
func (cm *controllerManager) Add(r Runnable) error {
213182
cm.mu.Lock()
214183
defer cm.mu.Unlock()
184+
if cm.stopProcedureEngaged {
185+
return errors.New("can't accept new runnable as stop procedure is already engaged")
186+
}
215187

216188
// Set dependencies on the object
217189
if err := cm.SetFields(r); err != nil {
@@ -231,11 +203,7 @@ func (cm *controllerManager) Add(r Runnable) error {
231203

232204
if shouldStart {
233205
// If already started, start the controller
234-
go func() {
235-
if err := r.Start(cm.internalStop); err != nil {
236-
cm.errSignal.SignalError(err)
237-
}
238-
}()
206+
cm.startRunnable(r)
239207
}
240208

241209
return nil
@@ -293,6 +261,10 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
293261
cm.mu.Lock()
294262
defer cm.mu.Unlock()
295263

264+
if cm.stopProcedureEngaged {
265+
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
266+
}
267+
296268
if cm.healthzStarted {
297269
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
298270
}
@@ -310,6 +282,10 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker)
310282
cm.mu.Lock()
311283
defer cm.mu.Unlock()
312284

285+
if cm.stopProcedureEngaged {
286+
return errors.New("can't accept new ready check as stop procedure is already engaged")
287+
}
288+
313289
if cm.healthzStarted {
314290
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
315291
}
@@ -389,17 +365,18 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
389365
Handler: mux,
390366
}
391367
// Run the server
392-
go func() {
368+
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
393369
log.Info("starting metrics server", "path", defaultMetricsEndpoint)
394370
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
395-
cm.errSignal.SignalError(err)
371+
return err
396372
}
397-
}()
373+
return nil
374+
}))
398375

399376
// Shutdown the server when stop is closed
400377
<-stop
401378
if err := server.Shutdown(context.Background()); err != nil {
402-
cm.errSignal.SignalError(err)
379+
cm.errChan <- err
403380
}
404381
}
405382

@@ -420,27 +397,39 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
420397
Handler: mux,
421398
}
422399
// Run server
423-
go func() {
400+
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
424401
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
425-
cm.errSignal.SignalError(err)
402+
return err
426403
}
427-
}()
404+
return nil
405+
}))
428406
cm.healthzStarted = true
429407
cm.mu.Unlock()
430408

431409
// Shutdown the server when stop is closed
432410
<-stop
433411
if err := server.Shutdown(context.Background()); err != nil {
434-
cm.errSignal.SignalError(err)
412+
cm.errChan <- err
435413
}
436414
}
437415

438-
func (cm *controllerManager) Start(stop <-chan struct{}) error {
439-
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
440-
defer close(cm.internalStopper)
416+
func (cm *controllerManager) Start(stop <-chan struct{}) (err error) {
417+
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
418+
stopComplete := make(chan struct{})
419+
defer close(stopComplete)
420+
defer func() {
421+
stopErr := cm.engageStopProcedure(stopComplete)
422+
if stopErr != nil {
423+
if err != nil {
424+
err = fmt.Errorf("%w, afterwards waiting for graceful shtudown failed with err %v", err, stopErr)
425+
} else {
426+
err = stopErr
427+
}
428+
}
429+
}()
441430

442431
// initialize this here so that we reset the signal channel state on every start
443-
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}
432+
cm.errChan = make(chan error)
444433

445434
// Metrics should be served whether the controller is leader or not.
446435
// (If we don't serve metrics for non-leaders, prometheus will still scrape
@@ -471,9 +460,51 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
471460
case <-stop:
472461
// We are done
473462
return nil
474-
case <-cm.errSignal.GotError():
475-
// Error starting a controller
476-
return cm.errSignal.Error()
463+
case err := <-cm.errChan:
464+
// Error starting or running a runnable
465+
return err
466+
}
467+
}
468+
469+
// engageStopProcedure signals all runnables to stop, reads potential errors
470+
// from the errChan and waits for them to end. It must not be called more than once.
471+
func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error {
472+
close(cm.internalStopper)
473+
cm.mu.Lock()
474+
defer cm.mu.Unlock()
475+
cm.stopProcedureEngaged = true
476+
go func() {
477+
for {
478+
select {
479+
case err, ok := <-cm.errChan:
480+
if ok {
481+
log.Error(err, "error received after stop sequence was engaged")
482+
}
483+
case <-stopComplete:
484+
return
485+
}
486+
}
487+
}()
488+
return cm.waitForRunnableToEnd()
489+
}
490+
491+
// waitForRunnableToEnd blocks until all runnables ended or the
492+
// tearDownTimeout was reached. In the latter case, an error is returned.
493+
func (cm *controllerManager) waitForRunnableToEnd() error {
494+
gracefulShutdownTimer := time.NewTimer(cm.gracefulShutdownTimeout)
495+
defer gracefulShutdownTimer.Stop()
496+
allStopped := make(chan struct{})
497+
498+
go func() {
499+
cm.waitForRunnable.Wait()
500+
close(allStopped)
501+
}()
502+
503+
select {
504+
case <-allStopped:
505+
return nil
506+
case <-gracefulShutdownTimer.C:
507+
return fmt.Errorf("not all runnables have stopped within the grace period of %s", cm.gracefulShutdownTimeout.String())
477508
}
478509
}
479510

@@ -487,15 +518,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
487518
for _, c := range cm.nonLeaderElectionRunnables {
488519
// Controllers block, but we want to return an error if any have an error starting.
489520
// Write any Start errors to a channel so we can return them
490-
ctrl := c
491-
go func() {
492-
if err := ctrl.Start(cm.internalStop); err != nil {
493-
cm.errSignal.SignalError(err)
494-
}
495-
// we use %T here because we don't have a good stand-in for "name",
496-
// and the full runnable might not serialize (mutexes, etc)
497-
log.V(1).Info("non-leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
498-
}()
521+
cm.startRunnable(c)
499522
}
500523
}
501524

@@ -509,15 +532,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
509532
for _, c := range cm.leaderElectionRunnables {
510533
// Controllers block, but we want to return an error if any have an error starting.
511534
// Write any Start errors to a channel so we can return them
512-
ctrl := c
513-
go func() {
514-
if err := ctrl.Start(cm.internalStop); err != nil {
515-
cm.errSignal.SignalError(err)
516-
}
517-
// we use %T here because we don't have a good stand-in for "name",
518-
// and the full runnable might not serialize (mutexes, etc)
519-
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
520-
}()
535+
cm.startRunnable(c)
521536
}
522537

523538
cm.startedLeader = true
@@ -532,19 +547,30 @@ func (cm *controllerManager) waitForCache() {
532547
if cm.startCache == nil {
533548
cm.startCache = cm.cache.Start
534549
}
535-
go func() {
536-
if err := cm.startCache(cm.internalStop); err != nil {
537-
cm.errSignal.SignalError(err)
538-
}
539-
}()
550+
cm.startRunnable(RunnableFunc(func(stop <-chan struct{}) error {
551+
return cm.startCache(stop)
552+
}))
540553

541554
// Wait for the caches to sync.
542555
// TODO(community): Check the return value and write a test
543556
cm.cache.WaitForCacheSync(cm.internalStop)
557+
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
558+
// cm.started as check if we already started the cache so it must always become true.
559+
// Making sure that the cache doesn't get started twice is needed to not get a "close
560+
// of closed channel" panic
544561
cm.started = true
545562
}
546563

547564
func (cm *controllerManager) startLeaderElection() (err error) {
565+
if cm.onStoppedLeading == nil {
566+
cm.onStoppedLeading = func() {
567+
// We have to exit here, otherwise the graceful shutdown or anything
568+
// else that keeps the binary running might allow controllers that
569+
// need leader election to run after we lost the lease.
570+
log.Info("leader election lost, exiting")
571+
os.Exit(1)
572+
}
573+
}
548574
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
549575
Lock: cm.resourceLock,
550576
LeaseDuration: cm.leaseDuration,
@@ -555,12 +581,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
555581
close(cm.elected)
556582
cm.startLeaderElectionRunnables()
557583
},
558-
OnStoppedLeading: func() {
559-
// Most implementations of leader election log.Fatal() here.
560-
// Since Start is wrapped in log.Fatal when called, we can just return
561-
// an error here which will cause the program to exit.
562-
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
563-
},
584+
OnStoppedLeading: cm.onStoppedLeading,
564585
},
565586
})
566587
if err != nil {
@@ -584,3 +605,13 @@ func (cm *controllerManager) startLeaderElection() (err error) {
584605
func (cm *controllerManager) Elected() <-chan struct{} {
585606
return cm.elected
586607
}
608+
609+
func (cm *controllerManager) startRunnable(r Runnable) {
610+
cm.waitForRunnable.Add(1)
611+
go func() {
612+
defer cm.waitForRunnable.Done()
613+
if err := r.Start(cm.internalStop); err != nil {
614+
cm.errChan <- err
615+
}
616+
}()
617+
}

0 commit comments

Comments
 (0)