Skip to content

Commit 3ea050d

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related #607
1 parent 9bc967a commit 3ea050d

File tree

3 files changed

+183
-0
lines changed

3 files changed

+183
-0
lines changed

pkg/manager/internal.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const (
5353
defaultRenewDeadline = 10 * time.Second
5454
defaultRetryPeriod = 2 * time.Second
5555
defaultGracefulShutdownPeriod = 30 * time.Second
56+
defaultHookPeriod = 15 * time.Second
5657

5758
defaultReadinessEndpoint = "/readyz"
5859
defaultLivenessEndpoint = "/healthz"
@@ -161,6 +162,13 @@ type controllerManager struct {
161162
// internalProceduresStop channel is used internally to the manager when coordinating
162163
// the proper shutdown of servers. This channel is also used for dependency injection.
163164
internalProceduresStop chan struct{}
165+
166+
// prestartHooks are functions that are run immediately before calling the Start functions
167+
// of the leader election runnables.
168+
prestartHooks []Runnable
169+
170+
// hookTimeout is the duration given to each hook to return successfully.
171+
hookTimeout time.Duration
164172
}
165173

166174
type hasCache interface {
@@ -217,6 +225,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
217225
return cm.cluster.GetHTTPClient()
218226
}
219227

228+
// Hook allows you to add hooks.
229+
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
230+
cm.Lock()
231+
defer cm.Unlock()
232+
233+
if cm.started {
234+
return fmt.Errorf("unable to add new hook because the manager has already been started")
235+
}
236+
237+
switch hook {
238+
case HookPrestartType:
239+
cm.prestartHooks = append(cm.prestartHooks, runnable)
240+
}
241+
242+
return nil
243+
}
244+
220245
func (cm *controllerManager) GetConfig() *rest.Config {
221246
return cm.cluster.GetConfig()
222247
}
@@ -547,6 +572,27 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
547572
}
548573

549574
func (cm *controllerManager) startLeaderElectionRunnables() error {
575+
cm.logger.Info("Running prestart hooks")
576+
for _, hook := range cm.prestartHooks {
577+
var ctx context.Context
578+
var cancel context.CancelFunc
579+
580+
if cm.hookTimeout < 0 {
581+
ctx, cancel = context.WithCancel(cm.internalCtx)
582+
} else {
583+
ctx, cancel = context.WithTimeout(cm.internalCtx, cm.hookTimeout)
584+
}
585+
586+
if err := hook.Start(ctx); err != nil {
587+
cancel()
588+
return err
589+
}
590+
cancel()
591+
}
592+
593+
// All the prestart hooks have been run, clear the slice to free the underlying resources.
594+
cm.prestartHooks = nil
595+
550596
return cm.runnables.LeaderElection.Start(cm.internalCtx)
551597
}
552598

pkg/manager/manager.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,9 @@ type Manager interface {
7070
// AddReadyzCheck allows you to add Readyz checker
7171
AddReadyzCheck(name string, check healthz.Checker) error
7272

73+
// Hook allows to add Runnables as hooks to modify the behavior.
74+
Hook(hook HookType, runnable Runnable) error
75+
7376
// Start starts all registered Controllers and blocks until the context is cancelled.
7477
// Returns an error if there is an error starting any controller.
7578
//
@@ -260,6 +263,10 @@ type Options struct {
260263
// +optional
261264
Controller config.Controller
262265

266+
// HookTimeout is the duration given to each hook to return successfully.
267+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
268+
HookTimeout *time.Duration
269+
263270
// makeBroadcaster allows deferring the creation of the broadcaster to
264271
// avoid leaking goroutines if we never call Start on this manager. It also
265272
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -274,6 +281,15 @@ type Options struct {
274281
newPprofListener func(addr string) (net.Listener, error)
275282
}
276283

284+
// HookType defines hooks for use with AddHook.
285+
type HookType int
286+
287+
const (
288+
// HookPrestartType defines a hook that is run after leader election and immediately before
289+
// calling Start on the runnables that needed leader election.
290+
HookPrestartType HookType = iota
291+
)
292+
277293
// BaseContextFunc is a function used to provide a base Context to Runnables
278294
// managed by a Manager.
279295
type BaseContextFunc func() context.Context
@@ -429,6 +445,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
429445
livenessEndpointName: options.LivenessEndpointName,
430446
pprofListener: pprofListener,
431447
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
448+
hookTimeout: *options.HookTimeout,
432449
internalProceduresStop: make(chan struct{}),
433450
leaderElectionStopped: make(chan struct{}),
434451
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -530,6 +547,11 @@ func setOptionsDefaults(options Options) Options {
530547
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
531548
}
532549

550+
if options.HookTimeout == nil {
551+
hookTimeout := defaultHookPeriod
552+
options.HookTimeout = &hookTimeout
553+
}
554+
533555
if options.Logger.GetSink() == nil {
534556
options.Logger = log.Log
535557
}

pkg/manager/manager_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,6 +1145,121 @@ var _ = Describe("manger.Manager", func() {
11451145
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
11461146
})
11471147

1148+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1149+
m, err := New(cfg, options)
1150+
Expect(err).NotTo(HaveOccurred())
1151+
for _, cb := range callbacks {
1152+
cb(m)
1153+
}
1154+
1155+
runnableRan := make(chan struct{})
1156+
1157+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1158+
close(runnableRan)
1159+
return nil
1160+
}))).ToNot(HaveOccurred())
1161+
1162+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1163+
Expect(m.Elected()).ShouldNot(BeClosed())
1164+
Consistently(runnableRan).ShouldNot(BeClosed())
1165+
return nil
1166+
}))).ToNot(HaveOccurred())
1167+
1168+
ctx, cancel := context.WithCancel(context.Background())
1169+
defer cancel()
1170+
go func() {
1171+
defer GinkgoRecover()
1172+
Expect(m.Elected()).ShouldNot(BeClosed())
1173+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1174+
}()
1175+
1176+
<-m.Elected()
1177+
})
1178+
1179+
It("should run prestart hooks with timeout", func() {
1180+
m, err := New(cfg, options)
1181+
Expect(err).NotTo(HaveOccurred())
1182+
for _, cb := range callbacks {
1183+
cb(m)
1184+
}
1185+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1186+
1187+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1188+
select {
1189+
case <-ctx.Done():
1190+
return ctx.Err()
1191+
case <-time.After(1 * time.Second):
1192+
return errors.New("prestart hook timeout exceeded expected")
1193+
}
1194+
}))).ToNot(HaveOccurred())
1195+
1196+
ctx, cancel := context.WithCancel(context.Background())
1197+
defer cancel()
1198+
1199+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1200+
})
1201+
1202+
It("should run prestart hooks without timeout", func() {
1203+
m, err := New(cfg, options)
1204+
Expect(err).NotTo(HaveOccurred())
1205+
for _, cb := range callbacks {
1206+
cb(m)
1207+
}
1208+
m.(*controllerManager).hookTimeout = -1 * time.Second
1209+
1210+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1211+
fmt.Println("runnable returning")
1212+
return nil
1213+
}))).ToNot(HaveOccurred())
1214+
1215+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1216+
select {
1217+
case <-ctx.Done():
1218+
return ctx.Err()
1219+
case <-time.After(1 * time.Second):
1220+
fmt.Println("prestart hook returning")
1221+
return nil
1222+
}
1223+
}))).ToNot(HaveOccurred())
1224+
1225+
ctx, cancel := context.WithCancel(context.Background())
1226+
defer cancel()
1227+
1228+
go func() {
1229+
defer GinkgoRecover()
1230+
Expect(m.Elected()).ShouldNot(BeClosed())
1231+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1232+
}()
1233+
1234+
<-m.Elected()
1235+
})
1236+
1237+
It("should not run leader election runnables if prestart hooks fail", func() {
1238+
m, err := New(cfg, options)
1239+
Expect(err).NotTo(HaveOccurred())
1240+
for _, cb := range callbacks {
1241+
cb(m)
1242+
}
1243+
1244+
runnableRan := make(chan struct{})
1245+
1246+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1247+
close(runnableRan)
1248+
return nil
1249+
}))).ToNot(HaveOccurred())
1250+
1251+
Expect(m.Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1252+
Expect(m.Elected()).ShouldNot(BeClosed())
1253+
Consistently(runnableRan).ShouldNot(BeClosed())
1254+
return errors.New("prestart hook failed")
1255+
}))).ToNot(HaveOccurred())
1256+
1257+
ctx, cancel := context.WithCancel(context.Background())
1258+
defer cancel()
1259+
1260+
Expect(m.Elected()).ShouldNot(BeClosed())
1261+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1262+
})
11481263
}
11491264

11501265
Context("with defaults", func() {

0 commit comments

Comments
 (0)