Skip to content

Commit 3c057ad

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related kubernetes-sigs#607
1 parent a24b949 commit 3c057ad

File tree

3 files changed

+140
-0
lines changed

3 files changed

+140
-0
lines changed

pkg/manager/internal.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ const (
5555
defaultRenewDeadline = 10 * time.Second
5656
defaultRetryPeriod = 2 * time.Second
5757
defaultGracefulShutdownPeriod = 30 * time.Second
58+
defaultHookPeriod = 15 * time.Second
5859

5960
defaultReadinessEndpoint = "/readyz"
6061
defaultLivenessEndpoint = "/healthz"
@@ -167,6 +168,13 @@ type controllerManager struct {
167168
// internalProceduresStop channel is used internally to the manager when coordinating
168169
// the proper shutdown of servers. This channel is also used for dependency injection.
169170
internalProceduresStop chan struct{}
171+
172+
// prestartHooks are functions that are run immediately before calling the Start functions
173+
// of the leader election runnables.
174+
prestartHooks []Runnable
175+
176+
// hookTimeout is the duration given to each hook to return successfully.
177+
hookTimeout time.Duration
170178
}
171179

172180
type hasCache interface {
@@ -245,6 +253,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
245253
return cm.cluster.GetHTTPClient()
246254
}
247255

256+
// Hook allows you to add hooks.
257+
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
258+
cm.Lock()
259+
defer cm.Unlock()
260+
261+
if cm.started {
262+
return fmt.Errorf("unable to add new hook because the manager has already been started")
263+
}
264+
265+
switch hook {
266+
case HookPrestartType:
267+
cm.prestartHooks = append(cm.prestartHooks, runnable)
268+
}
269+
270+
return nil
271+
}
272+
248273
func (cm *controllerManager) GetConfig() *rest.Config {
249274
return cm.cluster.GetConfig()
250275
}
@@ -609,6 +634,19 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
609634
}
610635

611636
func (cm *controllerManager) startLeaderElectionRunnables() error {
637+
cm.logger.Info("Starting prestart hooks")
638+
for _, hook := range cm.prestartHooks {
639+
ctx, cancel := context.WithTimeout(cm.internalCtx, cm.hookTimeout)
640+
if err := hook.Start(ctx); err != nil {
641+
cancel()
642+
return err
643+
}
644+
cancel()
645+
}
646+
647+
// All the prestart hooks have been run, clear the slice to free the underlying resources.
648+
cm.prestartHooks = nil
649+
612650
return cm.runnables.LeaderElection.Start(cm.internalCtx)
613651
}
614652

pkg/manager/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ type Manager interface {
7777
// AddReadyzCheck allows you to add Readyz checker
7878
AddReadyzCheck(name string, check healthz.Checker) error
7979

80+
// Hook allows to add Runnables as hooks to modify the behavior of the controller.
81+
Hook(hook HookType, runnable Runnable) error
82+
8083
// Start starts all registered Controllers and blocks until the context is cancelled.
8184
// Returns an error if there is an error starting any controller.
8285
//
@@ -343,6 +346,10 @@ type Options struct {
343346
// +optional
344347
Controller config.Controller
345348

349+
// HookTimeout is the duration given to each hook to return successfully.
350+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
351+
HookTimeout *time.Duration
352+
346353
// makeBroadcaster allows deferring the creation of the broadcaster to
347354
// avoid leaking goroutines if we never call Start on this manager. It also
348355
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -357,6 +364,15 @@ type Options struct {
357364
newPprofListener func(addr string) (net.Listener, error)
358365
}
359366

367+
// HookType defines hooks for use with AddHook.
368+
type HookType int
369+
370+
const (
371+
// HookPrestartType defines a hook that is run after leader election and immediately before
372+
// calling Start on the runnalbes that needed leader election.
373+
HookPrestartType HookType = iota
374+
)
375+
360376
// BaseContextFunc is a function used to provide a base Context to Runnables
361377
// managed by a Manager.
362378
type BaseContextFunc func() context.Context
@@ -498,6 +514,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
498514
livenessEndpointName: options.LivenessEndpointName,
499515
pprofListener: pprofListener,
500516
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
517+
hookTimeout: *options.HookTimeout,
501518
internalProceduresStop: make(chan struct{}),
502519
leaderElectionStopped: make(chan struct{}),
503520
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -724,6 +741,11 @@ func setOptionsDefaults(options Options) Options {
724741
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
725742
}
726743

744+
if options.HookTimeout == nil {
745+
hookTimeout := defaultHookPeriod
746+
options.HookTimeout = &hookTimeout
747+
}
748+
727749
if options.Logger.GetSink() == nil {
728750
options.Logger = log.Log
729751
}

pkg/manager/manager_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,86 @@ var _ = Describe("manger.Manager", func() {
11381138
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
11391139
})
11401140

1141+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1142+
m, err := New(cfg, options)
1143+
Expect(err).NotTo(HaveOccurred())
1144+
for _, cb := range callbacks {
1145+
cb(m)
1146+
}
1147+
1148+
runnableRan := make(chan struct{})
1149+
1150+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1151+
close(runnableRan)
1152+
return nil
1153+
})))
1154+
1155+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1156+
Expect(m.Elected()).ShouldNot(BeClosed())
1157+
Consistently(runnableRan).ShouldNot(BeClosed())
1158+
return nil
1159+
})))
1160+
1161+
ctx, cancel := context.WithCancel(context.Background())
1162+
defer cancel()
1163+
go func() {
1164+
defer GinkgoRecover()
1165+
Expect(m.Elected()).ShouldNot(BeClosed())
1166+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1167+
}()
1168+
1169+
<-m.Elected()
1170+
})
1171+
1172+
It("should run prestart hooks with timeout", func() {
1173+
m, err := New(cfg, options)
1174+
Expect(err).NotTo(HaveOccurred())
1175+
for _, cb := range callbacks {
1176+
cb(m)
1177+
}
1178+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1179+
1180+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1181+
select {
1182+
case <-ctx.Done():
1183+
return ctx.Err()
1184+
case <-time.After(1 * time.Second):
1185+
return errors.New("prestart hook timeout exceeded expected")
1186+
}
1187+
})))
1188+
1189+
ctx, cancel := context.WithCancel(context.Background())
1190+
defer cancel()
1191+
1192+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1193+
})
1194+
1195+
It("should not run leader election runnables if prestart hooks fail", func() {
1196+
m, err := New(cfg, options)
1197+
Expect(err).NotTo(HaveOccurred())
1198+
for _, cb := range callbacks {
1199+
cb(m)
1200+
}
1201+
1202+
runnableRan := make(chan struct{})
1203+
1204+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1205+
close(runnableRan)
1206+
return nil
1207+
})))
1208+
1209+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1210+
Expect(m.Elected()).ShouldNot(BeClosed())
1211+
Consistently(runnableRan).ShouldNot(BeClosed())
1212+
return errors.New("prestart hook failed")
1213+
})))
1214+
1215+
ctx, cancel := context.WithCancel(context.Background())
1216+
defer cancel()
1217+
1218+
Expect(m.Elected()).ShouldNot(BeClosed())
1219+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1220+
})
11411221
}
11421222

11431223
Context("with defaults", func() {

0 commit comments

Comments
 (0)