Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/jetstream_jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,7 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
// in-account or not.
for _, rg := range raftNodes {
rg.Lock()
rgAcc := rg.acc
rgAcc := rg.t.Account()
rg.Unlock()
switch state {
case "system":
Expand Down Expand Up @@ -1911,7 +1911,7 @@ func TestJetStreamJWTClusterAccountNRGPersistsAfterRestart(t *testing.T) {

for _, rg := range raftNodes {
rg.Lock()
rgAcc := rg.acc
rgAcc := rg.t.Account()
rg.Unlock()
require_Equal(t, rgAcc.Name, aExpPub)
require_Equal(t, rza[rg.group].SystemAcc, false)
Expand Down
2 changes: 1 addition & 1 deletion server/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4257,7 +4257,7 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
PTerm: n.pterm,
PIndex: n.pindex,
SystemAcc: n.IsSystemAccount(),
TrafficAcc: n.acc.GetName(),
TrafficAcc: n.t.Account().GetName(),
IPQPropLen: n.prop.len(),
IPQEntryLen: n.entry.len(),
IPQRespLen: n.resp.len(),
Expand Down
76 changes: 21 additions & 55 deletions server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ type raft struct {

created time.Time // Time that the group was created
accName string // Account name of the asset this raft group is for
acc *Account // Account that NRG traffic will be sent/received in
group string // Raft group
sd string // Store directory
id string // Node ID
Expand Down Expand Up @@ -202,7 +201,6 @@ type raft struct {
vote string // Our current vote state

s *Server // Reference to top-level server
c *client // Internal client for subscriptions
js *jetStream // JetStream, if running, to see if we are out of resources

hasleader atomic.Bool // Is there a group leader right now?
Expand All @@ -221,7 +219,7 @@ type raft struct {
asubj string // Append entries subject
areply string // Append entries responses subject

sq *sendq // Send queue for outbound RPC messages
t raftTransport // Transport that handles Raft messaging
aesub *subscription // Subscription for handleAppendEntry callbacks

wtv []byte // Term and vote to be written
Expand Down Expand Up @@ -319,6 +317,8 @@ type RaftConfig struct {
// We need to protect against losing state due to the new peers starting with an empty log.
// Therefore, these empty servers can't try to become leader until they at least have _some_ state.
ScaleUp bool

Transport raftTransportFactory
}

var (
Expand Down Expand Up @@ -464,6 +464,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
observer: cfg.Observer,
}

factory := cfg.Transport
if factory == nil {
factory = defaultRaftTransport
}
n.t = factory(s, n)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject nil raft transport from factory

After assigning n.t = factory(s, n), initialization proceeds into RecreateInternalSubs, which calls methods on n.t (for example Account, Reset, and later Publish) without a nil guard. If a custom RaftConfig.Transport factory returns nil (a realistic mistake when injecting test transports), startup will panic instead of returning a regular init error, making failures harder to diagnose and recover from.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Providing a custom factory is for testing only, so there's not much risk in that. Should a factory method returnsnil, it means that it needs to be fixed.


// Setup our internal subscriptions for proposals, votes and append entries.
// If we fail to do this for some reason then this is fatal — we cannot
// continue setting up or the Raft node may be partially/totally isolated.
Expand Down Expand Up @@ -658,7 +664,10 @@ func (n *raft) IsSystemAccount() bool {
func (n *raft) GetTrafficAccountName() string {
n.RLock()
defer n.RUnlock()
return n.acc.GetName()
if n.t == nil {
return (*Account)(nil).GetName()
}
return n.t.Account().GetName()
}

func (n *raft) RecreateInternalSubs() error {
Expand Down Expand Up @@ -708,7 +717,7 @@ func (n *raft) recreateInternalSubsLocked() error {
}
}
}
if n.aesub != nil && n.acc == nrgAcc {
if n.aesub != nil && n.t.Account() == nrgAcc {
// Subscriptions already exist and the account NRG state
// hasn't changed.
return nil
Expand All @@ -719,33 +728,11 @@ func (n *raft) recreateInternalSubsLocked() error {
// the next step...
n.cancelCatchup()

// If we have an existing client then tear down any existing
// subscriptions and close the internal client.
if c := n.c; c != nil {
c.mu.Lock()
subs := make([]*subscription, 0, len(c.subs))
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
}

if n.acc != nrgAcc {
if n.t.Account() != nrgAcc {
n.debug("Subscribing in '%s'", nrgAcc.GetName())
}

c := n.s.createInternalSystemClient()
c.registerWithAccount(nrgAcc)
if nrgAcc.sq == nil {
nrgAcc.sq = n.s.newSendQ(nrgAcc)
}
n.c = c
n.sq = nrgAcc.sq
n.acc = nrgAcc
n.t.Reset(nrgAcc)

// Recreate any internal subscriptions for voting, append
// entries etc in the new account.
Expand Down Expand Up @@ -2325,17 +2312,12 @@ func (n *raft) newInbox() string {
// Our internal subscribe.
// Lock should be held.
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
if n.c == nil {
return nil, errNoInternalClient
}
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
return n.t.Subscribe(subject, cb)
}

// Lock should be held.
func (n *raft) unsubscribe(sub *subscription) {
if n.c != nil && sub != nil {
n.c.processUnsub(sub.sid)
}
n.t.Unsubscribe(sub)
}

// Lock should be held.
Expand Down Expand Up @@ -2460,19 +2442,7 @@ runner:
n.Lock()
defer n.Unlock()

if c := n.c; c != nil {
var subs []*subscription
c.mu.Lock()
for _, sub := range c.subs {
subs = append(subs, sub)
}
c.mu.Unlock()
for _, sub := range subs {
n.unsubscribe(sub)
}
c.closeConnection(InternalClient)
n.c = nil
}
n.t.Close()

// Unregistering ipQueues do not prevent them from push/pop
// just will remove them from the central monitoring map
Expand Down Expand Up @@ -5131,15 +5101,11 @@ func (n *raft) requestVote() {
}

func (n *raft) sendRPC(subject, reply string, msg []byte) {
if n.sq != nil {
n.sq.send(subject, reply, nil, msg)
}
n.t.Publish(subject, reply, msg)
}

func (n *raft) sendReply(subject string, msg []byte) {
if n.sq != nil {
n.sq.send(subject, _EMPTY_, nil, msg)
}
n.t.Publish(subject, _EMPTY_, msg)
}

func (n *raft) wonElection(votes int) bool {
Expand Down
39 changes: 25 additions & 14 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,26 @@ func (sg smGroup) lockFollowers() []stateMachine {
// Create a raft group and place on numMembers servers at random.
// Filestore based.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
return c.createRaftGroupEx(name, numMembers, smf, FileStorage)
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, FileStorage)
}

func (c *cluster) createMemRaftGroup(name string, numMembers int, smf smFactory) smGroup {
return c.createRaftGroupEx(name, numMembers, smf, MemoryStorage)
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, MemoryStorage)
}

func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, st StorageType) smGroup {
func (c *cluster) createMockMemRaftGroup(name string, members int, smf smFactory) (*raftTransportHub, raftTransportFactory, smGroup) {
hub, rtf := mockTransportFactory()
return hub, rtf, c.createRaftGroupEx(name, members, smf, rtf, MemoryStorage)
}

func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
c.t.Helper()
if numMembers > len(c.servers) {
c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers))
}
servers := append([]*Server{}, c.servers...)
rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] })
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, rtf, st)
}

func (c *cluster) createWAL(name string, st StorageType) WAL {
Expand Down Expand Up @@ -189,42 +194,48 @@ func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string,
return sm
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
c.t.Helper()

var sg smGroup
peers := serverPeerNames(servers)

for _, s := range servers {
cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st),
Transport: rtf}
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
}
return sg
}

func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
func (c *cluster) addNodeEx(name string, smf smFactory, rtf raftTransportFactory, st StorageType) stateMachine {
c.t.Helper()

server := c.addInNewServer()

cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st),
Transport: rtf}

peers := serverPeerNames(c.servers)
return c.createStateMachine(server, cfg, peers, smf)
}

func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, FileStorage)
return c.addNodeEx(name, smf, defaultRaftTransport, FileStorage)
}

func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, MemoryStorage)
return c.addNodeEx(name, smf, defaultRaftTransport, MemoryStorage)
}

func (c *cluster) addMockMemRaftNode(name string, rtf raftTransportFactory, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, rtf, MemoryStorage)
}

// Driver program for the state machine.
Expand Down
60 changes: 60 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -6221,3 +6222,62 @@ func TestNRGReset(t *testing.T) {
// Append entry cache cleared.
require_Equal(t, len(n.pae), 0)
}

func TestNRGPartitionedPeerRemove(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R2S", 2)
defer c.shutdown()

hub, _, rg := c.createMockMemRaftGroup("MOCK", 2, newStateAdder)
defer hub.healPartitions()

leader := rg.waitOnLeader().node()
require_Equal(t, leader.ClusterSize(), 2)
require_Equal(t, len(rg.followers()), 1)
follower := rg.followers()[0].node()

var hookCalls atomic.Int64
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
hookCalls.Add(1)
})

// Remove the follower while the leader is partitioned away
hub.partition(leader.ID(), 1)
leader.ProposeRemovePeer(follower.ID())

// Follower can't get elected as leader, but let's try anyway
follower.CampaignImmediately()

// Expect progress on the leader side
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if leader.ClusterSize() != 1 {
return errors.New("node removal still in progress")
}
return nil
})

checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if leader.MembershipChangeInProgress() {
return errors.New("membership still in progress")
}
return nil
})

require_Equal(t, leader.ClusterSize(), 1)
require_False(t, leader.MembershipChangeInProgress())
require_True(t, hookCalls.Load() > 0)

// Follower has not changed
require_Equal(t, follower.State(), Follower)
require_Equal(t, follower.ClusterSize(), 2)
require_False(t, follower.MembershipChangeInProgress())

// Heal the partition, and expect the follower to get the bad news...
hub.heal(leader.ID())

checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if follower.ClusterSize() != 1 {
return errors.New("node removal still in progress")
}
return nil
})
}
Loading
Loading