Skip to content

🐛 Wait for runnables to stop fix for #350 and #429 #664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 90 additions & 74 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
defaultRenewDeadline = 10 * time.Second
defaultRetryPeriod = 2 * time.Second

defaultRunnableTearDownTimeout = 10 * time.Second

defaultReadinessEndpoint = "/readyz"
defaultLivenessEndpoint = "/healthz"
)
Expand Down Expand Up @@ -114,11 +116,7 @@ type controllerManager struct {
started bool
startedLeader bool
healthzStarted bool

// NB(directxman12): we don't just use an error channel here to avoid the situation where the
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
// errSignal lets us track when we should stop because an error occurred
errSignal *errSignaler
errChan chan error

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
Expand All @@ -130,6 +128,9 @@ type controllerManager struct {
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

// stop procedure engaged. In other words, we should not add anything else to the manager
stopProcedureEngaged bool

startCache func(stop <-chan struct{}) error

// port is the port that the webhook server serves at.
Expand All @@ -152,57 +153,23 @@ type controllerManager struct {
// retryPeriod is the duration the LeaderElector clients should wait
// between tries of actions.
retryPeriod time.Duration
}

type errSignaler struct {
// errSignal indicates that an error occurred, when closed. It shouldn't
// be written to.
errSignal chan struct{}

// err is the received error
err error

mu sync.Mutex
}

func (r *errSignaler) SignalError(err error) {
r.mu.Lock()
defer r.mu.Unlock()

if err == nil {
// non-error, ignore
log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring")
return
}

if r.err != nil {
// we already have an error, don't try again
return
}

// save the error and report it
r.err = err
close(r.errSignal)
}

func (r *errSignaler) Error() error {
r.mu.Lock()
defer r.mu.Unlock()
// waitForRunnable is holding the number of runnables currently running so that
// we can wait for them to exit before quitting the manager
waitForRunnable sync.WaitGroup

return r.err
}

func (r *errSignaler) GotError() chan struct{} {
r.mu.Lock()
defer r.mu.Unlock()

return r.errSignal
// runnableTearDownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
runnableTearDownTimeout time.Duration
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new runnable as stop procedure is already engaged")
}

// Set dependencies on the object
if err := cm.SetFields(r); err != nil {
Expand All @@ -222,11 +189,7 @@ func (cm *controllerManager) Add(r Runnable) error {

if shouldStart {
// If already started, start the controller
go func() {
if err := r.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
}()
cm.startRunnable(r)
}

return nil
Expand Down Expand Up @@ -264,6 +227,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new healthCheck as stop procedure is already engaged")
}

if cm.healthzStarted {
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
Expand All @@ -281,6 +247,9 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
if cm.stopProcedureEngaged {
return fmt.Errorf("can't accept new ready check as stop procedure is already engaged")
}

if cm.healthzStarted {
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
Expand Down Expand Up @@ -355,15 +324,15 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
go func() {
log.Info("starting metrics server", "path", metricsPath)
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
cm.errSignal.SignalError(err)
cm.errChan <- err
}
}()

// Shutdown the server when stop is closed
select {
case <-stop:
if err := server.Shutdown(context.Background()); err != nil {
cm.errSignal.SignalError(err)
cm.errChan <- err
}
}
}
Expand All @@ -385,7 +354,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
// Run server
go func() {
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
cm.errSignal.SignalError(err)
cm.errChan <- err
}
}()
cm.healthzStarted = true
Expand All @@ -395,17 +364,18 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
select {
case <-stop:
if err := server.Shutdown(context.Background()); err != nil {
cm.errSignal.SignalError(err)
cm.errChan <- err
}
}
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
defer close(stopComplete)

// initialize this here so that we reset the signal channel state on every start
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}
cm.errChan = make(chan error)

// Metrics should be served whether the controller is leader or not.
// (If we don't serve metrics for non-leaders, prometheus will still scrape
Expand All @@ -424,6 +394,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
log.Error(errStop, "some runnables could not be stopped after error occurred in startLeaderElection.")
}
return err
}
} else {
Expand All @@ -433,10 +406,50 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
select {
case <-stop:
// We are done
return cm.engageStopProcedure(stopComplete)
case err := <-cm.errChan:
// Error starting or running a runnable
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
log.Error(errStop, "some runnables could not be stopped after error occurred starting/running the manager.")
}
return err
}
}
func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cm.mu.Lock()
defer cm.mu.Unlock()
cm.stopProcedureEngaged = true
close(cm.internalStopper)
go func() {
for {
select {
case err, ok := <-cm.errChan:
if ok {
log.Error(err, "error received after stop sequence was engaged")
}
case <-stopComplete:
return
}
}
}()
return cm.waitForRunnableToEnd()
}

func (cm *controllerManager) waitForRunnableToEnd() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this whole section needs an overview comment of the stop procedure stuff

runnableTearDownTimer := time.NewTimer(cm.runnableTearDownTimeout)
defer runnableTearDownTimer.Stop()
allStopped := make(chan struct{})

go func() {
cm.waitForRunnable.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this'll leak through a timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure there's a good way around it though

close(allStopped)
}()

select {
case <-allStopped:
return nil
case <-cm.errSignal.GotError():
// Error starting a controller
return cm.errSignal.Error()
case <-runnableTearDownTimer.C:
return fmt.Errorf("not all runnables have stopped within the proposed delay of %s", cm.runnableTearDownTimeout.String())
}
}

Expand All @@ -453,7 +466,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
ctrl := c
go func() {
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
cm.errChan <- err
}
// we use %T here because we don't have a good stand-in for "name",
// and the full runnable might not serialize (mutexes, etc)
Expand All @@ -473,14 +486,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
if err := ctrl.Start(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
}
// we use %T here because we don't have a good stand-in for "name",
// and the full runnable might not serialize (mutexes, etc)
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
}()
cm.startRunnable(ctrl)
}

cm.startedLeader = true
Expand All @@ -497,7 +503,7 @@ func (cm *controllerManager) waitForCache() {
}
go func() {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errSignal.SignalError(err)
cm.errChan <- err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda feel like we should never have anything writing to the error channel directly like this, and instead just wrap everything in a runnable to avoid accidentally forgetting to increment the runnable counter.

}
}()

Expand All @@ -521,7 +527,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
// an error here which will cause the program to exit.
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
cm.errChan <- fmt.Errorf("leader election lost")
},
},
})
Expand All @@ -542,3 +548,13 @@ func (cm *controllerManager) startLeaderElection() (err error) {
go l.Run(ctx)
return nil
}

func (cm *controllerManager) startRunnable(r Runnable) {
cm.waitForRunnable.Add(1)
go func() {
defer cm.waitForRunnable.Done()
if err := r.Start(cm.internalStop); err != nil {
cm.errChan <- err
}
}()
}
52 changes: 31 additions & 21 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ type Options struct {
// Use this to customize the event correlator and spam filter
EventBroadcaster record.EventBroadcaster

// RunnableTearDownTimeout is the duration given to runnable to stop
// before the manager actually returns on stop.
RunnableTearDownTimeout *time.Duration

// Dependency injection for testing
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
Expand Down Expand Up @@ -289,27 +293,28 @@ func New(config *rest.Config, options Options) (Manager, error) {
stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
runnableTearDownTimeout: *options.RunnableTearDownTimeout,
}, nil
}

Expand Down Expand Up @@ -409,5 +414,10 @@ func setOptionsDefaults(options Options) Options {
options.newHealthProbeListener = defaultHealthProbeListener
}

if options.RunnableTearDownTimeout == nil {
runnableTearDownTimeout := defaultRunnableTearDownTimeout
options.RunnableTearDownTimeout = &runnableTearDownTimeout
}

return options
}
Loading