@@ -88,13 +88,20 @@ type Interface interface {
8888 AddClusterEventHandler (handler ClusterNodeEventHandler )
8989}
9090
91+ type Memberlist interface {
92+ Join (existing []string ) (int , error )
93+ Members () []* memberlist.Node
94+ Leave (timeout time.Duration ) error
95+ Shutdown () error
96+ }
97+
9198// Cluster implements ClusterInterface.
9299type Cluster struct {
93100 bindPort int
94101 // Name of local Node. Node name must be unique in the cluster.
95102 nodeName string
96103
97- mList * memberlist. Memberlist
104+ mList Memberlist
98105 // consistentHash hold the consistentHashMap, when a Node join cluster, use method Add() to add a key to the hash.
99106 // when a Node leave the cluster, the consistentHashMap should be update.
100107 consistentHashMap map [string ]* consistenthash.Map
@@ -129,14 +136,15 @@ func NewCluster(
129136 nodeName string ,
130137 nodeInformer coreinformers.NodeInformer ,
131138 externalIPPoolInformer crdinformers.ExternalIPPoolInformer ,
132- transport memberlist. Transport , // Parameterized for testing, could be left nil for production code.
139+ ml Memberlist , // Parameterized for testing, could be left nil for production code.
133140) (* Cluster , error ) {
134141 // The Node join/leave events will be notified via it.
135142 nodeEventCh := make (chan memberlist.NodeEvent , 1024 )
136143 c := & Cluster {
137144 bindPort : clusterBindPort ,
138145 nodeName : nodeName ,
139146 consistentHashMap : make (map [string ]* consistenthash.Map ),
147+ mList : ml ,
140148 nodeEventsCh : nodeEventCh ,
141149 nodeInformer : nodeInformer ,
142150 nodeLister : nodeInformer .Lister (),
@@ -147,21 +155,24 @@ func NewCluster(
147155 queue : workqueue .NewNamedRateLimitingQueue (workqueue .NewItemExponentialFailureRateLimiter (minRetryDelay , maxRetryDelay ), "externalIPPool" ),
148156 }
149157
150- conf := memberlist .DefaultLocalConfig ()
151- conf .Name = c .nodeName
152- conf .Transport = transport
153- conf .BindPort = c .bindPort
154- conf .AdvertisePort = c .bindPort
155- conf .AdvertiseAddr = nodeIP .String ()
156- conf .Events = & memberlist.ChannelEventDelegate {Ch : nodeEventCh }
157- conf .LogOutput = io .Discard
158- klog .V (1 ).InfoS ("New memberlist cluster" , "config" , conf )
159-
160- mList , err := memberlist .Create (conf )
161- if err != nil {
162- return nil , fmt .Errorf ("failed to create memberlist cluster: %v" , err )
158+ if ml == nil {
159+ conf := memberlist .DefaultLocalConfig ()
160+ conf .Name = c .nodeName
161+ conf .BindPort = c .bindPort
162+ conf .AdvertisePort = c .bindPort
163+ conf .AdvertiseAddr = nodeIP .String ()
164+ // Setting it to a non-zero value to allow reclaiming Nodes with different addresses for Node IP update case.
165+ conf .DeadNodeReclaimTime = 10 * time .Millisecond
166+ conf .Events = & memberlist.ChannelEventDelegate {Ch : nodeEventCh }
167+ conf .LogOutput = io .Discard
168+ klog .V (1 ).InfoS ("New memberlist cluster" , "config" , conf )
169+
170+ mList , err := memberlist .Create (conf )
171+ if err != nil {
172+ return nil , fmt .Errorf ("failed to create memberlist cluster: %v" , err )
173+ }
174+ c .mList = mList
163175 }
164- c .mList = mList
165176
166177 nodeInformer .Informer ().AddEventHandlerWithResyncPeriod (
167178 cache.ResourceEventHandlerFuncs {
@@ -190,13 +201,16 @@ func NewCluster(
190201
191202func (c * Cluster ) handleCreateNode (obj interface {}) {
192203 node := obj .(* corev1.Node )
193- if member , err := c .newClusterMember (node ); err == nil {
194- _ , err := c .mList .Join ([]string {member })
195- if err != nil {
196- klog .ErrorS (err , "Processing Node CREATE event error, join cluster failed" , "member" , member )
204+ // Ignore the Node itself.
205+ if node .Name != c .nodeName {
206+ if member , err := c .newClusterMember (node ); err == nil {
207+ _ , err := c .mList .Join ([]string {member })
208+ if err != nil {
209+ klog .ErrorS (err , "Processing Node CREATE event error, join cluster failed" , "member" , member )
210+ }
211+ } else {
212+ klog .ErrorS (err , "Processing Node CREATE event error" , "nodeName" , node .Name )
197213 }
198- } else {
199- klog .ErrorS (err , "Processing Node CREATE event error" , "nodeName" , node .Name )
200214 }
201215
202216 affectedEIPs := c .filterEIPsFromNodeLabels (node )
@@ -263,8 +277,7 @@ func (c *Cluster) enqueueExternalIPPool(obj interface{}) {
263277 c .queue .Add (eip .Name )
264278}
265279
266- // newClusterMember gets the Node's IP and returns a cluster member "<IP>:<clusterMemberlistPort>"
267- // representing that Node in the memberlist cluster.
280+ // newClusterMember gets the Node's IP and returns it as a cluster member for memberlist cluster to join.
268281func (c * Cluster ) newClusterMember (node * corev1.Node ) (string , error ) {
269282 nodeAddrs , err := k8s .GetNodeAddrs (node )
270283 if err != nil {
@@ -279,11 +292,7 @@ func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) {
279292
280293func (c * Cluster ) filterEIPsFromNodeLabels (node * corev1.Node ) sets.String {
281294 pools := sets .NewString ()
282- eips , err := c .externalIPPoolLister .List (labels .Everything ())
283- if err != nil {
284- klog .ErrorS (err , "Filter ExternalIPPools from nodeLabels failed" )
285- return pools
286- }
295+ eips , _ := c .externalIPPoolLister .List (labels .Everything ())
287296 for _ , eip := range eips {
288297 nodeSelector , _ := metav1 .LabelSelectorAsSelector (& eip .Spec .NodeSelector )
289298 if nodeSelector .Matches (labels .Set (node .GetLabels ())) {
@@ -314,14 +323,64 @@ func (c *Cluster) Run(stopCh <-chan struct{}) {
314323 go wait .Until (c .worker , time .Second , stopCh )
315324 }
316325
317- for {
318- select {
319- case <- stopCh :
320- return
321- case nodeEvent := <- c .nodeEventsCh :
322- c .handleClusterNodeEvents (& nodeEvent )
326+ go func () {
327+ for {
328+ select {
329+ case <- stopCh :
330+ return
331+ case nodeEvent := <- c .nodeEventsCh :
332+ c .handleClusterNodeEvents (& nodeEvent )
333+ }
334+ }
335+ }()
336+
337+ // Rejoin Nodes periodically in case some Nodes are removed from the member list because of long downtime.
338+ go func () {
339+ ticker := time .NewTicker (1 * time .Minute )
340+ for {
341+ select {
342+ case <- stopCh :
343+ return
344+ case <- ticker .C :
345+ c .RejoinNodes ()
346+ }
347+ }
348+ }()
349+
350+ <- stopCh
351+ }
352+
353+ // RejoinNodes rejoins Nodes that were removed from the member list by memberlist because they were unreachable for more
354+ // than 15 seconds (the GossipToTheDeadTime we are using). Without it, once there is a network downtime lasting more
355+ // than 15 seconds, the agent wouldn't try to reach any other Node and would think it's the only alive Node until it's
356+ // restarted.
357+ func (c * Cluster ) RejoinNodes () {
358+ nodes , _ := c .nodeLister .List (labels .Everything ())
359+ aliveNodes := c .AliveNodes ()
360+ var membersToJoin []string
361+ for _ , node := range nodes {
362+ if ! aliveNodes .Has (node .Name ) {
363+ member , err := c .newClusterMember (node )
364+ if err != nil {
365+ klog .ErrorS (err , "Failed to generate cluster member to join" , "Node" , node .Name )
366+ continue
367+ }
368+ membersToJoin = append (membersToJoin , member )
323369 }
324370 }
371+ // Every known Node is alive, do nothing.
372+ if len (membersToJoin ) == 0 {
373+ return
374+ }
375+ // The Join method returns an error only when none could be reached.
376+ numSuccess , err := c .mList .Join (membersToJoin )
377+ if err != nil {
378+ klog .ErrorS (err , "Failed to rejoin any members" , "members" , membersToJoin )
379+ } else if numSuccess != len (membersToJoin ) {
380+ klog .ErrorS (err , "Failed to rejoin some members" , "members" , membersToJoin , "numSuccess" , numSuccess )
381+ } else {
382+ klog .InfoS ("Rejoined all members" , "members" , membersToJoin )
383+ }
325384}
326385
327386func (c * Cluster ) worker () {
@@ -421,12 +480,9 @@ func (c *Cluster) handleClusterNodeEvents(nodeEvent *memberlist.NodeEvent) {
421480 // if the Node has failed, ExternalIPPools consistentHash maybe changed, and affected ExternalIPPool should be enqueued.
422481 coreNode , err := c .nodeLister .Get (node .Name )
423482 if err != nil {
424- if apierrors .IsNotFound (err ) {
425- // Node has been deleted, and deleteNode handler has been executed.
426- klog .ErrorS (err , "Processing Node event, not found" , "eventType" , event )
427- return
428- }
429- klog .ErrorS (err , "Processing Node event, get Node failed" , "eventType" , event )
483+ // It means the Node has been deleted, no further processing is needed as handleDeleteNode has enqueued
484+ // related ExternalIPPools.
485+ klog .InfoS ("Received a Node event but did not find the Node object" , "eventType" , mapNodeEventType [event ], "nodeName" , node .Name )
430486 return
431487 }
432488 affectedEIPs := c .filterEIPsFromNodeLabels (coreNode )
0 commit comments