@@ -18,6 +18,7 @@ package manager
1818
1919import (
2020 "context"
21+ "errors"
2122 "fmt"
2223 "net"
2324 "net/http"
@@ -34,6 +35,7 @@ import (
3435 "sigs.k8s.io/controller-runtime/pkg/cache"
3536 "sigs.k8s.io/controller-runtime/pkg/client"
3637 logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
38+ crleaderelection "sigs.k8s.io/controller-runtime/pkg/leaderelection"
3739 "sigs.k8s.io/controller-runtime/pkg/metrics"
3840 "sigs.k8s.io/controller-runtime/pkg/recorder"
3941 "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
@@ -58,9 +60,9 @@ type controllerManager struct {
5860 // to scheme.scheme.
5961 scheme * runtime.Scheme
6062
61- // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts .
63+ // leaderElectionRunnables is the map that groups runnables that use same leader election ID .
6264 // These Runnables are managed by lead election.
63- leaderElectionRunnables []Runnable
65+ leaderElectionRunnables map [ string ] []Runnable
6466 // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts.
6567 // These Runnables will not be blocked by lead election.
6668 nonLeaderElectionRunnables []Runnable
@@ -82,8 +84,9 @@ type controllerManager struct {
8284 // (and EventHandlers, Sources and Predicates).
8385 recorderProvider recorder.Provider
8486
85- // resourceLock forms the basis for leader election
86- resourceLock resourcelock.Interface
87+ // leaderElectionNamespace determines the namespace in which the leader
88+ // election configmaps will be created.
89+ leaderElectionNamespace string
8790
8891 // mapper is used to map resources to kind, and map kind and version.
8992 mapper meta.RESTMapper
@@ -123,6 +126,9 @@ type controllerManager struct {
123126 // retryPeriod is the duration the LeaderElector clients should wait
124127 // between tries of actions.
125128 retryPeriod time.Duration
129+
130+ // Dependency injection for testing
131+ newResourceLock func (config * rest.Config , recorderProvider recorder.Provider , options crleaderelection.Options ) (resourcelock.Interface , error )
126132}
127133
128134// Add sets dependencies on i, and adds it to the list of Runnables to start.
@@ -135,11 +141,27 @@ func (cm *controllerManager) Add(r Runnable) error {
135141 return err
136142 }
137143
144+ if cm .leaderElectionRunnables == nil {
145+ cm .leaderElectionRunnables = make (map [string ][]Runnable )
146+ }
147+
138148 // Add the runnable to the leader election or the non-leaderelection list
139- if leRunnable , ok := r .(LeaderElectionRunnable ); ok && ! leRunnable .NeedLeaderElection () {
140- cm .nonLeaderElectionRunnables = append (cm .nonLeaderElectionRunnables , r )
149+ if leRunnable , ok := r .(LeaderElectionRunnable ); ok && leRunnable .NeedLeaderElection () {
150+ runnables := []Runnable {r }
151+ leID := leRunnable .GetID ()
152+
153+ // Check that leader election ID is defined
154+ if leID == "" {
155+ return errors .New ("LeaderElectionID must be configured" )
156+ }
157+
158+ if rs , exists := cm .leaderElectionRunnables [leID ]; exists {
159+ runnables = append (runnables , rs ... )
160+ }
161+
162+ cm .leaderElectionRunnables [leID ] = runnables
141163 } else {
142- cm .leaderElectionRunnables = append (cm .leaderElectionRunnables , r )
164+ cm .nonLeaderElectionRunnables = append (cm .nonLeaderElectionRunnables , r )
143165 }
144166
145167 if cm .started {
@@ -265,14 +287,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
265287
266288 go cm .startNonLeaderElectionRunnables ()
267289
268- if cm .resourceLock != nil {
269- err := cm .startLeaderElection ()
270- if err != nil {
271- return err
272- }
273- } else {
274- go cm .startLeaderElectionRunnables ()
275- }
290+ go cm .startLeaderElectionRunnables ()
276291
277292 select {
278293 case <- stop :
@@ -308,12 +323,38 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
308323 cm .waitForCache ()
309324
310325 // Start the leader election Runnables after the cache has synced
311- for _ , c := range cm .leaderElectionRunnables {
326+ for leID , rs := range cm .leaderElectionRunnables {
312327 // Controllers block, but we want to return an error if any have an error starting.
313328 // Write any Start errors to a channel so we can return them
314- ctrl := c
329+ leaderElectionID := leID
330+ runnables := rs
315331 go func () {
316- cm .errChan <- ctrl .Start (cm .internalStop )
332+ // Create resource lock
333+ resourceLock , err := cm .newResourceLock (cm .config , cm .recorderProvider , crleaderelection.Options {
334+ LeaderElection : true ,
335+ LeaderElectionID : leaderElectionID ,
336+ LeaderElectionNamespace : cm .leaderElectionNamespace ,
337+ })
338+ if err != nil {
339+ cm .errChan <- err
340+ }
341+
342+ err = cm .startLeaderElection (resourceLock , leaderelection.LeaderCallbacks {
343+ OnStartedLeading : func (_ context.Context ) {
344+ for _ , r := range runnables {
345+ runnable := r
346+ go func () {
347+ cm .errChan <- runnable .Start (cm .internalStop )
348+ }()
349+ }
350+ },
351+ OnStoppedLeading : func () {
352+ cm .errChan <- fmt .Errorf ("runnable leader election lost" )
353+ },
354+ })
355+ if err != nil {
356+ cm .errChan <- err
357+ }
317358 }()
318359 }
319360}
@@ -339,23 +380,13 @@ func (cm *controllerManager) waitForCache() {
339380 cm .started = true
340381}
341382
342- func (cm * controllerManager ) startLeaderElection () (err error ) {
383+ func (cm * controllerManager ) startLeaderElection (resourceLock resourcelock. Interface , callbacks leaderelection. LeaderCallbacks ) (err error ) {
343384 l , err := leaderelection .NewLeaderElector (leaderelection.LeaderElectionConfig {
344- Lock : cm . resourceLock ,
385+ Lock : resourceLock ,
345386 LeaseDuration : cm .leaseDuration ,
346387 RenewDeadline : cm .renewDeadline ,
347388 RetryPeriod : cm .retryPeriod ,
348- Callbacks : leaderelection.LeaderCallbacks {
349- OnStartedLeading : func (_ context.Context ) {
350- cm .startLeaderElectionRunnables ()
351- },
352- OnStoppedLeading : func () {
353- // Most implementations of leader election log.Fatal() here.
354- // Since Start is wrapped in log.Fatal when called, we can just return
355- // an error here which will cause the program to exit.
356- cm .errChan <- fmt .Errorf ("leader election lost" )
357- },
358- },
389+ Callbacks : callbacks ,
359390 })
360391 if err != nil {
361392 return err
0 commit comments