Skip to content

Commit 6c35525

Browse files
committed
feat(manager): add prestart hook support
When implementing a controller that uses leader election, there maybe be work that needs to be done after winning the election but before processing enqueued requests. For example, a controller may need to build up an internal mapping of the current state of the cluster before it can begin reconciling. This changeset adds support for adding prestart hooks to controller-runtime's manager implementation. This hook runs after the manager has been elected leader, immediately before the leader election controllers are started. Related kubernetes-sigs#607
1 parent 6747c42 commit 6c35525

File tree

3 files changed

+144
-0
lines changed

3 files changed

+144
-0
lines changed

pkg/manager/internal.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const (
5353
defaultRenewDeadline = 10 * time.Second
5454
defaultRetryPeriod = 2 * time.Second
5555
defaultGracefulShutdownPeriod = 30 * time.Second
56+
defaultHookPeriod = 15 * time.Second
5657

5758
defaultReadinessEndpoint = "/readyz"
5859
defaultLivenessEndpoint = "/healthz"
@@ -161,6 +162,13 @@ type controllerManager struct {
161162
// internalProceduresStop channel is used internally to the manager when coordinating
162163
// the proper shutdown of servers. This channel is also used for dependency injection.
163164
internalProceduresStop chan struct{}
165+
166+
// prestartHooks are functions that are run immediately before calling the Start functions
167+
// of the leader election runnables.
168+
prestartHooks []Runnable
169+
170+
// hookTimeout is the duration given to each hook to return successfully.
171+
hookTimeout time.Duration
164172
}
165173

166174
type hasCache interface {
@@ -217,6 +225,23 @@ func (cm *controllerManager) GetHTTPClient() *http.Client {
217225
return cm.cluster.GetHTTPClient()
218226
}
219227

228+
// Hook allows you to add hooks.
229+
func (cm *controllerManager) Hook(hook HookType, runnable Runnable) error {
230+
cm.Lock()
231+
defer cm.Unlock()
232+
233+
if cm.started {
234+
return fmt.Errorf("unable to add new hook because the manager has already been started")
235+
}
236+
237+
switch hook {
238+
case HookPrestartType:
239+
cm.prestartHooks = append(cm.prestartHooks, runnable)
240+
}
241+
242+
return nil
243+
}
244+
220245
func (cm *controllerManager) GetConfig() *rest.Config {
221246
return cm.cluster.GetConfig()
222247
}
@@ -554,6 +579,19 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
554579
}
555580

556581
func (cm *controllerManager) startLeaderElectionRunnables() error {
582+
cm.logger.Info("Starting prestart hooks")
583+
for _, hook := range cm.prestartHooks {
584+
ctx, cancel := context.WithTimeout(cm.internalCtx, cm.hookTimeout)
585+
if err := hook.Start(ctx); err != nil {
586+
cancel()
587+
return err
588+
}
589+
cancel()
590+
}
591+
592+
// All the prestart hooks have been run, clear the slice to free the underlying resources.
593+
cm.prestartHooks = nil
594+
557595
return cm.runnables.LeaderElection.Start(cm.internalCtx)
558596
}
559597

pkg/manager/manager.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,10 @@ type Options struct {
263263
// +optional
264264
Controller config.Controller
265265

266+
// HookTimeout is the duration given to each hook to return successfully.
267+
// To use hooks without timeout, set to a negative duration, e.g. time.Duration(-1)
268+
HookTimeout *time.Duration
269+
266270
// makeBroadcaster allows deferring the creation of the broadcaster to
267271
// avoid leaking goroutines if we never call Start on this manager. It also
268272
// returns whether or not this is a "owned" broadcaster, and as such should be
@@ -277,6 +281,22 @@ type Options struct {
277281
newPprofListener func(addr string) (net.Listener, error)
278282
}
279283

284+
// Hookable is implemented by Managers that support running hooks during
285+
// controller lifecycle phases.
286+
type Hookable interface {
287+
// Hook allows to add Runnables as hooks to modify the behavior.
288+
Hook(hook HookType, runnable Runnable) error
289+
}
290+
291+
// HookType defines hooks for use with AddHook.
292+
type HookType int
293+
294+
const (
295+
// HookPrestartType defines a hook that is run after leader election and immediately before
296+
// calling Start on the runnalbes that needed leader election.
297+
HookPrestartType HookType = iota
298+
)
299+
280300
// BaseContextFunc is a function used to provide a base Context to Runnables
281301
// managed by a Manager.
282302
type BaseContextFunc func() context.Context
@@ -432,6 +452,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
432452
livenessEndpointName: options.LivenessEndpointName,
433453
pprofListener: pprofListener,
434454
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
455+
hookTimeout: *options.HookTimeout,
435456
internalProceduresStop: make(chan struct{}),
436457
leaderElectionStopped: make(chan struct{}),
437458
leaderElectionReleaseOnCancel: options.LeaderElectionReleaseOnCancel,
@@ -653,6 +674,11 @@ func setOptionsDefaults(options Options) Options {
653674
options.GracefulShutdownTimeout = &gracefulShutdownTimeout
654675
}
655676

677+
if options.HookTimeout == nil {
678+
hookTimeout := defaultHookPeriod
679+
options.HookTimeout = &hookTimeout
680+
}
681+
656682
if options.Logger.GetSink() == nil {
657683
options.Logger = log.Log
658684
}

pkg/manager/manager_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,86 @@ var _ = Describe("manger.Manager", func() {
12071207
Expect(time.Since(beforeDone)).To(BeNumerically(">=", 1500*time.Millisecond))
12081208
})
12091209

1210+
It("should run prestart hooks before calling Start on leader election runnables", func() {
1211+
m, err := New(cfg, options)
1212+
Expect(err).NotTo(HaveOccurred())
1213+
for _, cb := range callbacks {
1214+
cb(m)
1215+
}
1216+
1217+
runnableRan := make(chan struct{})
1218+
1219+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1220+
close(runnableRan)
1221+
return nil
1222+
}))).ToNot(HaveOccurred())
1223+
1224+
Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1225+
Expect(m.Elected()).ShouldNot(BeClosed())
1226+
Consistently(runnableRan).ShouldNot(BeClosed())
1227+
return nil
1228+
}))).ToNot(HaveOccurred())
1229+
1230+
ctx, cancel := context.WithCancel(context.Background())
1231+
defer cancel()
1232+
go func() {
1233+
defer GinkgoRecover()
1234+
Expect(m.Elected()).ShouldNot(BeClosed())
1235+
Expect(m.Start(ctx)).NotTo(HaveOccurred())
1236+
}()
1237+
1238+
<-m.Elected()
1239+
})
1240+
1241+
It("should run prestart hooks with timeout", func() {
1242+
m, err := New(cfg, options)
1243+
Expect(err).NotTo(HaveOccurred())
1244+
for _, cb := range callbacks {
1245+
cb(m)
1246+
}
1247+
m.(*controllerManager).hookTimeout = 1 * time.Nanosecond
1248+
1249+
Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1250+
select {
1251+
case <-ctx.Done():
1252+
return ctx.Err()
1253+
case <-time.After(1 * time.Second):
1254+
return errors.New("prestart hook timeout exceeded expected")
1255+
}
1256+
}))).ToNot(HaveOccurred())
1257+
1258+
ctx, cancel := context.WithCancel(context.Background())
1259+
defer cancel()
1260+
1261+
Expect(m.Start(ctx)).Should(MatchError(context.DeadlineExceeded))
1262+
})
1263+
1264+
It("should not run leader election runnables if prestart hooks fail", func() {
1265+
m, err := New(cfg, options)
1266+
Expect(err).NotTo(HaveOccurred())
1267+
for _, cb := range callbacks {
1268+
cb(m)
1269+
}
1270+
1271+
runnableRan := make(chan struct{})
1272+
1273+
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
1274+
close(runnableRan)
1275+
return nil
1276+
}))).ToNot(HaveOccurred())
1277+
1278+
Expect(m.(Hookable).Hook(HookPrestartType, RunnableFunc(func(ctx context.Context) error {
1279+
Expect(m.Elected()).ShouldNot(BeClosed())
1280+
Consistently(runnableRan).ShouldNot(BeClosed())
1281+
return errors.New("prestart hook failed")
1282+
}))).ToNot(HaveOccurred())
1283+
1284+
ctx, cancel := context.WithCancel(context.Background())
1285+
defer cancel()
1286+
1287+
Expect(m.Elected()).ShouldNot(BeClosed())
1288+
Expect(m.Start(ctx)).Should(MatchError(ContainSubstring("prestart hook failed")))
1289+
})
12101290
}
12111291

12121292
Context("with defaults", func() {

0 commit comments

Comments
 (0)