@@ -18,6 +18,7 @@ package manager
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
2223 "net"
2324 "net/http"
@@ -34,6 +35,7 @@ import (
3435 "sigs.k8s.io/controller-runtime/pkg/cache"
3536 "sigs.k8s.io/controller-runtime/pkg/client"
3637 logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38+ crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
3739 "sigs.k8s.io/controller-runtime/pkg/metrics"
3840 "sigs.k8s.io/controller-runtime/pkg/recorder"
3941 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -82,8 +84,16 @@ type controllerManager struct {
8284 // (and EventHandlers, Sources and Predicates).
8385 recorderProvider recorder.Provider
8486
85- // resourceLock forms the basis for leader election
86- resourceLock resourcelock.Interface
87+ // leaderElectionNamespace determines the namespace in which the leader
88+ // election configmap will be created.
89+ leaderElectionNamespace string
90+
91+ // leaderElectionID determines the name of the configmap that leader election
92+ // will use for holding the leader lock.
93+ leaderElectionID string
94+
95+ // resourceLocksMap maps leader election ID to resource lock
96+ resourceLocksMap map [string ]resourcelock.Interface
8797
8898 // mapper is used to map resources to kind, and map kind and version.
8999 mapper meta.RESTMapper
@@ -123,6 +133,9 @@ type controllerManager struct {
123133 // retryPeriod is the duration the LeaderElector clients should wait
124134 // between tries of actions.
125135 retryPeriod time.Duration
136+
137+ // Dependency injection for testing
138+ newResourceLock func (config * rest.Config , recorderProvider recorder.Provider , options crleaderelection.Options ) (resourcelock.Interface , error )
126139}
127140
128141// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -265,8 +278,18 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265278
266279 go cm .startNonLeaderElectionRunnables ()
267280
268- if cm .resourceLock != nil {
269- err := cm .startLeaderElection ()
281+ if resourceLock := cm .resourceLocksMap [cm .leaderElectionID ]; resourceLock != nil {
282+ err := cm .startLeaderElection (cm .leaderElectionID , leaderelection.LeaderCallbacks {
283+ OnStartedLeading : func (_ context.Context ) {
284+ cm .startLeaderElectionRunnables ()
285+ },
286+ OnStoppedLeading : func () {
287+ // Most implementations of leader election log.Fatal() here.
288+ // Since Start is wrapped in log.Fatal when called, we can just return
289+ // an error here which will cause the program to exit.
290+ cm .errChan <- fmt .Errorf ("leader election lost" )
291+ },
292+ })
270293 if err != nil {
271294 return err
272295 }
@@ -313,7 +336,46 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
313336 // Write any Start errors to a channel so we can return them
314337 ctrl := c
315338 go func () {
316- cm .errChan <- ctrl .Start (cm .internalStop )
339+ if leRunnable , ok := ctrl .(LeaderElectionRunnable ); ok {
340+ leID := leRunnable .GetID ()
341+
342+ // Check that leader election ID is defined
343+ if leID == "" {
344+ cm .errChan <- errors .New ("LeaderElectionID must be configured" )
345+ return
346+ }
347+
348+ // Check that leader election ID is unique
349+ if _ , exists := cm .resourceLocksMap [leID ]; exists {
350+ cm .errChan <- errors .New ("LeaderElectionID must be unique" )
351+ return
352+ }
353+
354+ // Create resource lock
355+ resourceLock , err := cm .newResourceLock (cm .config , cm .recorderProvider , crleaderelection.Options {
356+ LeaderElection : true ,
357+ LeaderElectionID : leID ,
358+ LeaderElectionNamespace : cm .leaderElectionNamespace ,
359+ })
360+ if err != nil {
361+ cm .errChan <- err
362+ }
363+
364+ cm .resourceLocksMap [leID ] = resourceLock
365+ err = cm .startLeaderElection (leID , leaderelection.LeaderCallbacks {
366+ OnStartedLeading : func (_ context.Context ) {
367+ cm .errChan <- ctrl .Start (cm .internalStop )
368+ },
369+ OnStoppedLeading : func () {
370+ cm .errChan <- fmt .Errorf ("runnable leader election lost" )
371+ },
372+ })
373+ if err != nil {
374+ cm .errChan <- err
375+ }
376+ } else {
377+ cm .errChan <- ctrl .Start (cm .internalStop )
378+ }
317379 }()
318380 }
319381}
@@ -339,23 +401,13 @@ func (cm *controllerManager) waitForCache() {
339401 cm .started = true
340402}
341403
342- func (cm * controllerManager ) startLeaderElection () (err error ) {
404+ func (cm * controllerManager ) startLeaderElection (leaderElectionID string , callbacks leaderelection. LeaderCallbacks ) (err error ) {
343405 l , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
344- Lock : cm .resourceLock ,
406+ Lock : cm .resourceLocksMap [ leaderElectionID ] ,
345407 LeaseDuration : cm .leaseDuration ,
346408 RenewDeadline : cm .renewDeadline ,
347409 RetryPeriod : cm .retryPeriod ,
348- Callbacks : leaderelection.LeaderCallbacks {
349- OnStartedLeading : func (_ context.Context ) {
350- cm .startLeaderElectionRunnables ()
351- },
352- OnStoppedLeading : func () {
353- // Most implementations of leader election log.Fatal() here.
354- // Since Start is wrapped in log.Fatal when called, we can just return
355- // an error here which will cause the program to exit.
356- cm .errChan <- fmt .Errorf ("leader election lost" )
357- },
358- },
410+ Callbacks : callbacks ,
359411 })
360412 if err != nil {
361413 return err
0 commit comments