Skip to content

Commit 20e791d

Browse files
committed
Wait for Runnables to end before returning in manager Start
1 parent 47744b5 commit 20e791d

File tree

3 files changed

+237
-117
lines changed

3 files changed

+237
-117
lines changed

pkg/manager/internal.go

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const (
4848
defaultRenewDeadline = 10 * time.Second
4949
defaultRetryPeriod = 2 * time.Second
5050

51+
defaultRunnableTearDownTimeout = 10 * time.Second
52+
5153
defaultReadinessEndpoint = "/readyz"
5254
defaultLivenessEndpoint = "/healthz"
5355
)
@@ -114,11 +116,7 @@ type controllerManager struct {
114116
started bool
115117
startedLeader bool
116118
healthzStarted bool
117-
118-
// NB(directxman12): we don't just use an error channel here to avoid the situation where the
119-
// error channel is too small and we end up blocking some goroutines waiting to report their errors.
120-
// errSignal lets us track when we should stop because an error occurred
121-
errSignal *errSignaler
119+
errChan chan error
122120

123121
// internalStop is the stop channel *actually* used by everything involved
124122
// with the manager as a stop channel, so that we can pass a stop channel
@@ -130,6 +128,9 @@ type controllerManager struct {
130128
// It and `internalStop` should point to the same channel.
131129
internalStopper chan<- struct{}
132130

131+
// stop procedure engaged. In other words, we should not add anything else to the manager
132+
stopProcedureEngaged bool
133+
133134
startCache func(stop <-chan struct{}) error
134135

135136
// port is the port that the webhook server serves at.
@@ -152,57 +153,23 @@ type controllerManager struct {
152153
// retryPeriod is the duration the LeaderElector clients should wait
153154
// between tries of actions.
154155
retryPeriod time.Duration
155-
}
156-
157-
type errSignaler struct {
158-
// errSignal indicates that an error occurred, when closed. It shouldn't
159-
// be written to.
160-
errSignal chan struct{}
161-
162-
// err is the received error
163-
err error
164-
165-
mu sync.Mutex
166-
}
167-
168-
func (r *errSignaler) SignalError(err error) {
169-
r.mu.Lock()
170-
defer r.mu.Unlock()
171-
172-
if err == nil {
173-
// non-error, ignore
174-
log.Error(nil, "SignalError called without an (with a nil) error, which should never happen, ignoring")
175-
return
176-
}
177-
178-
if r.err != nil {
179-
// we already have an error, don't try again
180-
return
181-
}
182-
183-
// save the error and report it
184-
r.err = err
185-
close(r.errSignal)
186-
}
187156

188-
func (r *errSignaler) Error() error {
189-
r.mu.Lock()
190-
defer r.mu.Unlock()
157+
// waitForRunnable is holding the number of runnables currently running so that
158+
// we can wait for them to exit before quitting the manager
159+
waitForRunnable sync.WaitGroup
191160

192-
return r.err
193-
}
194-
195-
func (r *errSignaler) GotError() chan struct{} {
196-
r.mu.Lock()
197-
defer r.mu.Unlock()
198-
199-
return r.errSignal
161+
// runnableTearDownTimeout is the duration given to runnable to stop
162+
// before the manager actually returns on stop.
163+
runnableTearDownTimeout time.Duration
200164
}
201165

202166
// Add sets dependencies on i, and adds it to the list of Runnables to start.
203167
func (cm *controllerManager) Add(r Runnable) error {
204168
cm.mu.Lock()
205169
defer cm.mu.Unlock()
170+
if cm.stopProcedureEngaged {
171+
return fmt.Errorf("can't accept new runnable as stop procedure is already engaged")
172+
}
206173

207174
// Set dependencies on the object
208175
if err := cm.SetFields(r); err != nil {
@@ -222,11 +189,7 @@ func (cm *controllerManager) Add(r Runnable) error {
222189

223190
if shouldStart {
224191
// If already started, start the controller
225-
go func() {
226-
if err := r.Start(cm.internalStop); err != nil {
227-
cm.errSignal.SignalError(err)
228-
}
229-
}()
192+
cm.startRunnable(r)
230193
}
231194

232195
return nil
@@ -264,6 +227,9 @@ func (cm *controllerManager) SetFields(i interface{}) error {
264227
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
265228
cm.mu.Lock()
266229
defer cm.mu.Unlock()
230+
if cm.stopProcedureEngaged {
231+
return fmt.Errorf("can't accept new healthCheck as stop procedure is already engaged")
232+
}
267233

268234
if cm.healthzStarted {
269235
return fmt.Errorf("unable to add new checker because healthz endpoint has already been created")
@@ -281,6 +247,9 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)
281247
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
282248
cm.mu.Lock()
283249
defer cm.mu.Unlock()
250+
if cm.stopProcedureEngaged {
251+
return fmt.Errorf("can't accept new ready check as stop procedure is already engaged")
252+
}
284253

285254
if cm.healthzStarted {
286255
return fmt.Errorf("unable to add new checker because readyz endpoint has already been created")
@@ -355,15 +324,15 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
355324
go func() {
356325
log.Info("starting metrics server", "path", metricsPath)
357326
if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed {
358-
cm.errSignal.SignalError(err)
327+
cm.errChan <- err
359328
}
360329
}()
361330

362331
// Shutdown the server when stop is closed
363332
select {
364333
case <-stop:
365334
if err := server.Shutdown(context.Background()); err != nil {
366-
cm.errSignal.SignalError(err)
335+
cm.errChan <- err
367336
}
368337
}
369338
}
@@ -385,7 +354,7 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
385354
// Run server
386355
go func() {
387356
if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed {
388-
cm.errSignal.SignalError(err)
357+
cm.errChan <- err
389358
}
390359
}()
391360
cm.healthzStarted = true
@@ -395,17 +364,18 @@ func (cm *controllerManager) serveHealthProbes(stop <-chan struct{}) {
395364
select {
396365
case <-stop:
397366
if err := server.Shutdown(context.Background()); err != nil {
398-
cm.errSignal.SignalError(err)
367+
cm.errChan <- err
399368
}
400369
}
401370
}
402371

403372
func (cm *controllerManager) Start(stop <-chan struct{}) error {
404-
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
405-
defer close(cm.internalStopper)
373+
// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
374+
stopComplete := make(chan struct{})
375+
defer close(stopComplete)
406376

407377
// initialize this here so that we reset the signal channel state on every start
408-
cm.errSignal = &errSignaler{errSignal: make(chan struct{})}
378+
cm.errChan = make(chan error)
409379

410380
// Metrics should be served whether the controller is leader or not.
411381
// (If we don't serve metrics for non-leaders, prometheus will still scrape
@@ -424,6 +394,9 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
424394
if cm.resourceLock != nil {
425395
err := cm.startLeaderElection()
426396
if err != nil {
397+
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
398+
log.Error(errStop, "some runnables could not be stopped after error occurred in startLeaderElection.")
399+
}
427400
return err
428401
}
429402
} else {
@@ -433,10 +406,50 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
433406
select {
434407
case <-stop:
435408
// We are done
409+
return cm.engageStopProcedure(stopComplete)
410+
case err := <-cm.errChan:
411+
// Error starting or running a runnable
412+
if errStop := cm.engageStopProcedure(stopComplete); errStop != nil {
413+
log.Error(errStop, "some runnables could not be stopped after error occurred starting/running the manager.")
414+
}
415+
return err
416+
}
417+
}
418+
func (cm *controllerManager) engageStopProcedure(stopComplete chan struct{}) error {
419+
cm.mu.Lock()
420+
defer cm.mu.Unlock()
421+
cm.stopProcedureEngaged = true
422+
close(cm.internalStopper)
423+
go func() {
424+
for {
425+
select {
426+
case err, ok := <-cm.errChan:
427+
if ok {
428+
log.Error(err, "error received after stop sequence was engaged")
429+
}
430+
case <-stopComplete:
431+
return
432+
}
433+
}
434+
}()
435+
return cm.waitForRunnableToEnd()
436+
}
437+
438+
func (cm *controllerManager) waitForRunnableToEnd() error {
439+
runnableTearDownTimer := time.NewTimer(cm.runnableTearDownTimeout)
440+
defer runnableTearDownTimer.Stop()
441+
allStopped := make(chan struct{})
442+
443+
go func() {
444+
cm.waitForRunnable.Wait()
445+
close(allStopped)
446+
}()
447+
448+
select {
449+
case <-allStopped:
436450
return nil
437-
case <-cm.errSignal.GotError():
438-
// Error starting a controller
439-
return cm.errSignal.Error()
451+
case <-runnableTearDownTimer.C:
452+
return fmt.Errorf("not all runnables have stopped within the proposed delay of %s", cm.runnableTearDownTimeout.String())
440453
}
441454
}
442455

@@ -453,7 +466,7 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
453466
ctrl := c
454467
go func() {
455468
if err := ctrl.Start(cm.internalStop); err != nil {
456-
cm.errSignal.SignalError(err)
469+
cm.errChan <- err
457470
}
458471
// we use %T here because we don't have a good stand-in for "name",
459472
// and the full runnable might not serialize (mutexes, etc)
@@ -473,14 +486,7 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
473486
// Controllers block, but we want to return an error if any have an error starting.
474487
// Write any Start errors to a channel so we can return them
475488
ctrl := c
476-
go func() {
477-
if err := ctrl.Start(cm.internalStop); err != nil {
478-
cm.errSignal.SignalError(err)
479-
}
480-
// we use %T here because we don't have a good stand-in for "name",
481-
// and the full runnable might not serialize (mutexes, etc)
482-
log.V(1).Info("leader-election runnable finished", "runnable type", fmt.Sprintf("%T", ctrl))
483-
}()
489+
cm.startRunnable(ctrl)
484490
}
485491

486492
cm.startedLeader = true
@@ -497,7 +503,7 @@ func (cm *controllerManager) waitForCache() {
497503
}
498504
go func() {
499505
if err := cm.startCache(cm.internalStop); err != nil {
500-
cm.errSignal.SignalError(err)
506+
cm.errChan <- err
501507
}
502508
}()
503509

@@ -521,7 +527,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
521527
// Most implementations of leader election log.Fatal() here.
522528
// Since Start is wrapped in log.Fatal when called, we can just return
523529
// an error here which will cause the program to exit.
524-
cm.errSignal.SignalError(fmt.Errorf("leader election lost"))
530+
cm.errChan <- fmt.Errorf("leader election lost")
525531
},
526532
},
527533
})
@@ -542,3 +548,13 @@ func (cm *controllerManager) startLeaderElection() (err error) {
542548
go l.Run(ctx)
543549
return nil
544550
}
551+
552+
func (cm *controllerManager) startRunnable(r Runnable) {
553+
cm.waitForRunnable.Add(1)
554+
go func() {
555+
defer cm.waitForRunnable.Done()
556+
if err := r.Start(cm.internalStop); err != nil {
557+
cm.errChan <- err
558+
}
559+
}()
560+
}

pkg/manager/manager.go

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,10 @@ type Options struct {
185185
// Use this to customize the event correlator and spam filter
186186
EventBroadcaster record.EventBroadcaster
187187

188+
// RunnableTearDownTimeout is the duration given to runnable to stop
189+
// before the manager actually returns on stop.
190+
RunnableTearDownTimeout *time.Duration
191+
188192
// Dependency injection for testing
189193
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
190194
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
@@ -289,27 +293,28 @@ func New(config *rest.Config, options Options) (Manager, error) {
289293
stop := make(chan struct{})
290294

291295
return &controllerManager{
292-
config: config,
293-
scheme: options.Scheme,
294-
cache: cache,
295-
fieldIndexes: cache,
296-
client: writeObj,
297-
apiReader: apiReader,
298-
recorderProvider: recorderProvider,
299-
resourceLock: resourceLock,
300-
mapper: mapper,
301-
metricsListener: metricsListener,
302-
internalStop: stop,
303-
internalStopper: stop,
304-
port: options.Port,
305-
host: options.Host,
306-
certDir: options.CertDir,
307-
leaseDuration: *options.LeaseDuration,
308-
renewDeadline: *options.RenewDeadline,
309-
retryPeriod: *options.RetryPeriod,
310-
healthProbeListener: healthProbeListener,
311-
readinessEndpointName: options.ReadinessEndpointName,
312-
livenessEndpointName: options.LivenessEndpointName,
296+
config: config,
297+
scheme: options.Scheme,
298+
cache: cache,
299+
fieldIndexes: cache,
300+
client: writeObj,
301+
apiReader: apiReader,
302+
recorderProvider: recorderProvider,
303+
resourceLock: resourceLock,
304+
mapper: mapper,
305+
metricsListener: metricsListener,
306+
internalStop: stop,
307+
internalStopper: stop,
308+
port: options.Port,
309+
host: options.Host,
310+
certDir: options.CertDir,
311+
leaseDuration: *options.LeaseDuration,
312+
renewDeadline: *options.RenewDeadline,
313+
retryPeriod: *options.RetryPeriod,
314+
healthProbeListener: healthProbeListener,
315+
readinessEndpointName: options.ReadinessEndpointName,
316+
livenessEndpointName: options.LivenessEndpointName,
317+
runnableTearDownTimeout: *options.RunnableTearDownTimeout,
313318
}, nil
314319
}
315320

@@ -409,5 +414,10 @@ func setOptionsDefaults(options Options) Options {
409414
options.newHealthProbeListener = defaultHealthProbeListener
410415
}
411416

417+
if options.RunnableTearDownTimeout == nil {
418+
runnableTearDownTimeout := defaultRunnableTearDownTimeout
419+
options.RunnableTearDownTimeout = &runnableTearDownTimeout
420+
}
421+
412422
return options
413423
}

0 commit comments

Comments
 (0)