Skip to content

Commit c316099

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related kubernetes-sigs#607
1 parent c10975e commit c316099

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

pkg/manager/internal.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const (
5454
defaultRenewDeadline = 10 * time.Second
5555
defaultRetryPeriod = 2 * time.Second
5656
defaultGracefulShutdownPeriod = 30 * time.Second
57+
defaultHookPeriod = 15 * time.Second
5758

5859
defaultReadinessEndpoint = "/readyz"
5960
defaultLivenessEndpoint = "/healthz"
@@ -163,6 +164,13 @@ type controllerManager struct {
163164
// internalProceduresStop channel is used internally to the manager when coordinating
164165
// the proper shutdown of servers. This channel is also used for dependency injection.
165166
internalProceduresStop chan struct{}
167+
168+
// prestartHooks are functions that are run immediately before calling the Start functions
169+
// of the leader election runnables.
170+
prestartHooks []Runnable
171+
172+
// hookTimeout is the duration given to each hook to return successfully.
173+
hookTimeout time.Duration
166174
}
167175

168176
type hasCache interface {
@@ -241,6 +249,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
241249
return cm.cluster.GetHTTPClient()
242250
}
243251

252+
// AddHook allows you to add hooks.
253+
func (cm *controllerManager) AddHook(hook HookType, runnable Runnable) error {
254+
cm.Lock()
255+
defer cm.Unlock()
256+
257+
if cm.started {
258+
return fmt.Errorf("unable to add new hook because the manager has already been started")
259+
}
260+
261+
switch hook {
262+
case HookPrestartType:
263+
cm.prestartHooks = append(cm.prestartHooks, runnable)
264+
}
265+
266+
return nil
267+
}
268+
244269
func (cm *controllerManager) GetConfig() *rest.Config {
245270
return cm.cluster.GetConfig()
246271
}
@@ -580,6 +605,19 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
580605
}
581606

582607
func (cm *controllerManager) startLeaderElectionRunnables() error {
608+
cm.logger.Info("Starting prestart hooks")
609+
for _, hook := range cm.prestartHooks {
610+
ctx, cancel := context.WithTimeout(cm.internalCtx, cm.hookTimeout)
611+
if err := hook.Start(ctx); err != nil {
612+
cancel()
613+
return err
614+
}
615+
cancel()
616+
}
617+
618+
// All the prestart hooks have been run, clear the slice to free the underlying resources.
619+
cm.prestartHooks = nil
620+
583621
return cm.runnables.LeaderElection.Start(cm.internalCtx)
584622
}
585623

pkg/manager/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ type Manager interface {
7777
// AddReadyzCheck allows you to add Readyz checker
7878
AddReadyzCheck(name string, check healthz.Checker) error
7979

80+
// AddHook allows to add Runnables as hooks to modify the behavior of the controller.
81+
AddHook(hook HookType, runnable Runnable) error
82+
8083
// Start starts all registered Controllers and blocks until the context is cancelled.
8184
// Returns an error if there is an error starting any controller.
8285
//
@@ -336,6 +339,10 @@ type Options struct {
336339
// +optional
337340
Controller config.Controller
338341

342+
// HookTimeout is the duration given to each hook to return successfully.
343+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
344+
HookTimeout *time.Duration
345+
339346
// makeBroadcaster allows deferring the creation of the broadcaster to
340347
// avoid leaking goroutines if we never call Start on this manager. It also
341348
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -349,6 +356,15 @@ type Options struct {
349356
newHealthProbeListener func(addr string) (net.Listener, error)
350357
}
351358

359+
// HookType defines hooks for use with AddHook.
360+
type HookType int
361+
362+
const (
363+
// HookPrestartType defines a hook that is run after leader election and immediately before
364+
// calling Start on the runnalbes that needed leader election.
365+
HookPrestartType HookType = iota
366+
)
367+
352368
// BaseContextFunc is a function used to provide a base Context to Runnables
353369
// managed by a Manager.
354370
type BaseContextFunc func() context.Context
@@ -482,6 +498,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
482498
readinessEndpointName: options.ReadinessEndpointName,
483499
livenessEndpointName: options.LivenessEndpointName,
484500
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
501+
hookTimeout: *options.HookTimeout,
485502
internalProceduresStop: make(chan struct{}),
486503
leaderElectionStopped: make(chan struct{}),
487504
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -691,6 +708,11 @@ func setOptionsDefaults(options Options) Options {
691708
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
692709
}
693710

711+
if options.HookTimeout == nil {
712+
hookTimeout := defaultHookPeriod
713+
options.HookTimeout = &hookTimeout
714+
}
715+
694716
if options.Logger.GetSink() == nil {
695717
options.Logger = log.Log
696718
}

pkg/manager/manager_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1134,6 +1134,86 @@ var _ = Describe("manger.Manager", func() {
11341134
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
11351135
})
11361136

1137+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1138+
m, err := New(cfg, options)
1139+
Expect(err).NotTo(HaveOccurred())
1140+
for _, cb := range callbacks {
1141+
cb(m)
1142+
}
1143+
1144+
runnableRan := make(chan struct{})
1145+
1146+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1147+
close(runnableRan)
1148+
return nil
1149+
})))
1150+
1151+
Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1152+
Expect(m.Elected()).ShouldNot(BeClosed())
1153+
Consistently(runnableRan).ShouldNot(BeClosed())
1154+
return nil
1155+
})))
1156+
1157+
ctx, cancel := context.WithCancel(context.Background())
1158+
defer cancel()
1159+
go func() {
1160+
defer GinkgoRecover()
1161+
Expect(m.Elected()).ShouldNot(BeClosed())
1162+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1163+
}()
1164+
1165+
<-m.Elected()
1166+
})
1167+
1168+
It("should run prestart hooks with timeout", func() {
1169+
m, err := New(cfg, options)
1170+
Expect(err).NotTo(HaveOccurred())
1171+
for _, cb := range callbacks {
1172+
cb(m)
1173+
}
1174+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1175+
1176+
Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1177+
select {
1178+
case <-ctx.Done():
1179+
return ctx.Err()
1180+
case <-time.After(1 * time.Second):
1181+
return errors.New("prestart hook timeout exceeded expected")
1182+
}
1183+
})))
1184+
1185+
ctx, cancel := context.WithCancel(context.Background())
1186+
defer cancel()
1187+
1188+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1189+
})
1190+
1191+
It("should not run leader election runnables if prestart hooks fail", func() {
1192+
m, err := New(cfg, options)
1193+
Expect(err).NotTo(HaveOccurred())
1194+
for _, cb := range callbacks {
1195+
cb(m)
1196+
}
1197+
1198+
runnableRan := make(chan struct{})
1199+
1200+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1201+
close(runnableRan)
1202+
return nil
1203+
})))
1204+
1205+
Expect(m.AddHook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1206+
Expect(m.Elected()).ShouldNot(BeClosed())
1207+
Consistently(runnableRan).ShouldNot(BeClosed())
1208+
return errors.New("prestart hook failed")
1209+
})))
1210+
1211+
ctx, cancel := context.WithCancel(context.Background())
1212+
defer cancel()
1213+
1214+
Expect(m.Elected()).ShouldNot(BeClosed())
1215+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1216+
})
11371217
}
11381218

11391219
Context("with defaults", func() {

0 commit comments

Comments
 (0)