@@ -5,6 +5,7 @@ package raft
55
66import (
77 "bytes"
8+ "context"
89 "fmt"
910 "os"
1011 "sync/atomic"
@@ -490,3 +491,98 @@ func TestRaft_RestartFollower_LongInitialHeartbeat(t *testing.T) {
490491 })
491492 }
492493}
494+
495+ // TestRaft_PreVote_LeaderSpam test that when a leader spam the followers
496+ // with pre-vote requests they can still transition to candidate.
497+ // The reason this test need to live in here is that we need the transport heartbeat fast-path
498+ // to use as a trick to avoid heartbeat keeping the cluster stable.
499+ // That fast-path only exists in the net transport.
500+ func TestRaft_PreVote_LeaderSpam (t * testing.T ) {
501+ CheckInteg (t )
502+ conf := DefaultConfig ()
503+ conf .LocalID = ServerID ("first" )
504+ conf .HeartbeatTimeout = 50 * time .Millisecond
505+ conf .ElectionTimeout = 50 * time .Millisecond
506+ conf .LeaderLeaseTimeout = 50 * time .Millisecond
507+ conf .CommitTimeout = 5 * time .Second
508+ conf .SnapshotThreshold = 100
509+ conf .TrailingLogs = 10
510+
511+ // Create a single node
512+ leader := MakeRaft (t , conf , true )
513+ NoErr (WaitFor (leader , Leader ), t )
514+
515+ // Join a few nodes!
516+ var followers []* RaftEnv
517+ for i := 0 ; i < 2 ; i ++ {
518+ conf .LocalID = ServerID (fmt .Sprintf ("next-batch-%d" , i ))
519+ env := MakeRaft (t , conf , false )
520+ addr := env .trans .LocalAddr ()
521+ NoErr (WaitFuture (leader .raft .AddVoter (conf .LocalID , addr , 0 , 0 )), t )
522+ followers = append (followers , env )
523+ }
524+
525+ // Wait for a leader
526+ _ , err := WaitForAny (Leader , append ([]* RaftEnv {leader }, followers ... ))
527+ NoErr (err , t )
528+
529+ CheckConsistent (append ([]* RaftEnv {leader }, followers ... ), t )
530+
531+ leaderT := leader .raft .trans
532+
533+ // spam all the followers with pre-vote requests from the leader
534+ // those requests should be granted as long as the leader haven't changed.
535+ ctx , cancel := context .WithCancel (context .Background ())
536+ defer cancel ()
537+ go func () {
538+ for {
539+ ticker := time .NewTicker (conf .HeartbeatTimeout / 2 )
540+ for _ , f := range followers {
541+ rsp := RequestPreVoteResponse {}
542+ reqPreVote := RequestPreVoteRequest {
543+ RPCHeader : leader .raft .getRPCHeader (),
544+ Term : leader .raft .getCurrentTerm () + 1 ,
545+ LastLogIndex : leader .raft .getLastIndex (),
546+ LastLogTerm : leader .raft .getCurrentTerm (),
547+ }
548+ // We don't need to check the error here because when leader change
549+ // it will start failing with "rejecting pre-vote request since we have a leader"
550+ _ = leaderT .(WithPreVote ).RequestPreVote (f .raft .localID , f .raft .localAddr , & reqPreVote , & rsp )
551+ }
552+ select {
553+ case <- ticker .C :
554+ case <- ctx .Done ():
555+ return
556+ }
557+ }
558+ }()
559+ time .Sleep (time .Second )
560+
561+ // for all followers ignore heartbeat from current leader, so we can transition to candidate state.
562+ // the purpose of this test is to verify that spamming nodes with pre-votes don't cause them to never
563+ // transition to Candidates.
564+ for _ , f := range followers {
565+ //copy f to avoid data race
566+ f1 := f
567+ f1 .trans .SetHeartbeatHandler (func (rpc RPC ) {
568+ if a , ok := rpc .Command .(* AppendEntriesRequest ); ok {
569+ if ServerID (a .GetRPCHeader ().ID ) == leader .raft .localID {
570+ resp := & AppendEntriesResponse {
571+ RPCHeader : f1 .raft .getRPCHeader (),
572+ Term : f1 .raft .getCurrentTerm (),
573+ LastLog : f1 .raft .getLastIndex (),
574+ Success : false ,
575+ NoRetryBackoff : false ,
576+ }
577+ rpc .Respond (resp , nil )
578+ } else {
579+ f .raft .processHeartbeat (rpc )
580+ }
581+ }
582+ })
583+ }
584+ time .Sleep (1 * time .Second )
585+ // New leader should be one of the former followers.
586+ _ , err = WaitForAny (Leader , followers )
587+ NoErr (err , t )
588+ }
0 commit comments