Skip to content

Commit 1f992f5

Browse files
committed
NRG: Decouple Raft transport layer
The Raft implementation was tightly coupled to the server's internal client and send queue for Raft RPC communication. This made it difficult to test scenarios such as network partitions deterministically. Introduce a raftTransport abstraction and keep the existing internal client/send queue behavior as the default transport. Tests can now inject a mock transport to simulate partitions and observe message delivery. Signed-off-by: Daniele Sciascia <daniele@nats.io>
1 parent b86a885 commit 1f992f5

7 files changed

Lines changed: 397 additions & 72 deletions

File tree

server/jetstream_jwt_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ func TestJetStreamJWTClusterAccountNRG(t *testing.T) {
18101810
// in-account or not.
18111811
for _, rg := range raftNodes {
18121812
rg.Lock()
1813-
rgAcc := rg.acc
1813+
rgAcc := rg.t.Account()
18141814
rg.Unlock()
18151815
switch state {
18161816
case "system":
@@ -1911,7 +1911,7 @@ func TestJetStreamJWTClusterAccountNRGPersistsAfterRestart(t *testing.T) {
19111911

19121912
for _, rg := range raftNodes {
19131913
rg.Lock()
1914-
rgAcc := rg.acc
1914+
rgAcc := rg.t.Account()
19151915
rg.Unlock()
19161916
require_Equal(t, rgAcc.Name, aExpPub)
19171917
require_Equal(t, rza[rg.group].SystemAcc, false)

server/monitor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4257,7 +4257,7 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
42574257
PTerm: n.pterm,
42584258
PIndex: n.pindex,
42594259
SystemAcc: n.IsSystemAccount(),
4260-
TrafficAcc: n.acc.GetName(),
4260+
TrafficAcc: n.t.Account().GetName(),
42614261
IPQPropLen: n.prop.len(),
42624262
IPQEntryLen: n.entry.len(),
42634263
IPQRespLen: n.resp.len(),

server/raft.go

Lines changed: 21 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,6 @@ type raft struct {
155155

156156
created time.Time // Time that the group was created
157157
accName string // Account name of the asset this raft group is for
158-
acc *Account // Account that NRG traffic will be sent/received in
159158
group string // Raft group
160159
sd string // Store directory
161160
id string // Node ID
@@ -202,7 +201,6 @@ type raft struct {
202201
vote string // Our current vote state
203202

204203
s *Server // Reference to top-level server
205-
c *client // Internal client for subscriptions
206204
js *jetStream // JetStream, if running, to see if we are out of resources
207205

208206
hasleader atomic.Bool // Is there a group leader right now?
@@ -221,7 +219,7 @@ type raft struct {
221219
asubj string // Append entries subject
222220
areply string // Append entries responses subject
223221

224-
sq *sendq // Send queue for outbound RPC messages
222+
t raftTransport // Transport that handles Raft messaging
225223
aesub *subscription // Subscription for handleAppendEntry callbacks
226224

227225
wtv []byte // Term and vote to be written
@@ -319,6 +317,8 @@ type RaftConfig struct {
319317
// We need to protect against losing state due to the new peers starting with an empty log.
320318
// Therefore, these empty servers can't try to become leader until they at least have _some_ state.
321319
ScaleUp bool
320+
321+
Transport raftTransportFactory
322322
}
323323

324324
var (
@@ -464,6 +464,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
464464
observer: cfg.Observer,
465465
}
466466

467+
factory := cfg.Transport
468+
if factory == nil {
469+
factory = defaultRaftTransport
470+
}
471+
n.t = factory(s, n)
472+
467473
// Setup our internal subscriptions for proposals, votes and append entries.
468474
// If we fail to do this for some reason then this is fatal — we cannot
469475
// continue setting up or the Raft node may be partially/totally isolated.
@@ -658,7 +664,10 @@ func (n *raft) IsSystemAccount() bool {
658664
func (n *raft) GetTrafficAccountName() string {
659665
n.RLock()
660666
defer n.RUnlock()
661-
return n.acc.GetName()
667+
if n.t == nil {
668+
return (*Account)(nil).GetName()
669+
}
670+
return n.t.Account().GetName()
662671
}
663672

664673
func (n *raft) RecreateInternalSubs() error {
@@ -708,7 +717,7 @@ func (n *raft) recreateInternalSubsLocked() error {
708717
}
709718
}
710719
}
711-
if n.aesub != nil && n.acc == nrgAcc {
720+
if n.aesub != nil && n.t.Account() == nrgAcc {
712721
// Subscriptions already exist and the account NRG state
713722
// hasn't changed.
714723
return nil
@@ -719,33 +728,11 @@ func (n *raft) recreateInternalSubsLocked() error {
719728
// the next step...
720729
n.cancelCatchup()
721730

722-
// If we have an existing client then tear down any existing
723-
// subscriptions and close the internal client.
724-
if c := n.c; c != nil {
725-
c.mu.Lock()
726-
subs := make([]*subscription, 0, len(c.subs))
727-
for _, sub := range c.subs {
728-
subs = append(subs, sub)
729-
}
730-
c.mu.Unlock()
731-
for _, sub := range subs {
732-
n.unsubscribe(sub)
733-
}
734-
c.closeConnection(InternalClient)
735-
}
736-
737-
if n.acc != nrgAcc {
731+
if n.t.Account() != nrgAcc {
738732
n.debug("Subscribing in '%s'", nrgAcc.GetName())
739733
}
740734

741-
c := n.s.createInternalSystemClient()
742-
c.registerWithAccount(nrgAcc)
743-
if nrgAcc.sq == nil {
744-
nrgAcc.sq = n.s.newSendQ(nrgAcc)
745-
}
746-
n.c = c
747-
n.sq = nrgAcc.sq
748-
n.acc = nrgAcc
735+
n.t.Reset(nrgAcc)
749736

750737
// Recreate any internal subscriptions for voting, append
751738
// entries etc in the new account.
@@ -2325,17 +2312,12 @@ func (n *raft) newInbox() string {
23252312
// Our internal subscribe.
23262313
// Lock should be held.
23272314
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
2328-
if n.c == nil {
2329-
return nil, errNoInternalClient
2330-
}
2331-
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
2315+
return n.t.Subscribe(subject, cb)
23322316
}
23332317

23342318
// Lock should be held.
23352319
func (n *raft) unsubscribe(sub *subscription) {
2336-
if n.c != nil && sub != nil {
2337-
n.c.processUnsub(sub.sid)
2338-
}
2320+
n.t.Unsubscribe(sub)
23392321
}
23402322

23412323
// Lock should be held.
@@ -2460,19 +2442,7 @@ runner:
24602442
n.Lock()
24612443
defer n.Unlock()
24622444

2463-
if c := n.c; c != nil {
2464-
var subs []*subscription
2465-
c.mu.Lock()
2466-
for _, sub := range c.subs {
2467-
subs = append(subs, sub)
2468-
}
2469-
c.mu.Unlock()
2470-
for _, sub := range subs {
2471-
n.unsubscribe(sub)
2472-
}
2473-
c.closeConnection(InternalClient)
2474-
n.c = nil
2475-
}
2445+
n.t.Close()
24762446

24772447
// Unregistering ipQueues do not prevent them from push/pop
24782448
// just will remove them from the central monitoring map
@@ -5131,15 +5101,11 @@ func (n *raft) requestVote() {
51315101
}
51325102

51335103
func (n *raft) sendRPC(subject, reply string, msg []byte) {
5134-
if n.sq != nil {
5135-
n.sq.send(subject, reply, nil, msg)
5136-
}
5104+
n.t.Publish(subject, reply, msg)
51375105
}
51385106

51395107
func (n *raft) sendReply(subject string, msg []byte) {
5140-
if n.sq != nil {
5141-
n.sq.send(subject, _EMPTY_, nil, msg)
5142-
}
5108+
n.t.Publish(subject, _EMPTY_, msg)
51435109
}
51445110

51455111
func (n *raft) wonElection(votes int) bool {

server/raft_helpers_test.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,26 @@ func (sg smGroup) lockFollowers() []stateMachine {
126126
// Create a raft group and place on numMembers servers at random.
127127
// Filestore based.
128128
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
129-
return c.createRaftGroupEx(name, numMembers, smf, FileStorage)
129+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, FileStorage)
130130
}
131131

132132
func (c *cluster) createMemRaftGroup(name string, numMembers int, smf smFactory) smGroup {
133-
return c.createRaftGroupEx(name, numMembers, smf, MemoryStorage)
133+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, MemoryStorage)
134134
}
135135

136-
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, st StorageType) smGroup {
136+
func (c *cluster) createMockMemRaftGroup(name string, members int, smf smFactory) (*raftTransportHub, raftTransportFactory, smGroup) {
137+
hub, rtf := mockTransportFactory()
138+
return hub, rtf, c.createRaftGroupEx(name, members, smf, rtf, MemoryStorage)
139+
}
140+
141+
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
137142
c.t.Helper()
138143
if numMembers > len(c.servers) {
139144
c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers))
140145
}
141146
servers := append([]*Server{}, c.servers...)
142147
rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] })
143-
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
148+
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, rtf, st)
144149
}
145150

146151
func (c *cluster) createWAL(name string, st StorageType) WAL {
@@ -189,42 +194,48 @@ func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string,
189194
return sm
190195
}
191196

192-
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
197+
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
193198
c.t.Helper()
194199

195200
var sg smGroup
196201
peers := serverPeerNames(servers)
197202

198203
for _, s := range servers {
199204
cfg := &RaftConfig{
200-
Name: name,
201-
Store: c.t.TempDir(),
202-
Log: c.createWAL(name, st)}
205+
Name: name,
206+
Store: c.t.TempDir(),
207+
Log: c.createWAL(name, st),
208+
Transport: rtf}
203209
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
204210
}
205211
return sg
206212
}
207213

208-
func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
214+
func (c *cluster) addNodeEx(name string, smf smFactory, rtf raftTransportFactory, st StorageType) stateMachine {
209215
c.t.Helper()
210216

211217
server := c.addInNewServer()
212218

213219
cfg := &RaftConfig{
214-
Name: name,
215-
Store: c.t.TempDir(),
216-
Log: c.createWAL(name, st)}
220+
Name: name,
221+
Store: c.t.TempDir(),
222+
Log: c.createWAL(name, st),
223+
Transport: rtf}
217224

218225
peers := serverPeerNames(c.servers)
219226
return c.createStateMachine(server, cfg, peers, smf)
220227
}
221228

222229
func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
223-
return c.addNodeEx(name, smf, FileStorage)
230+
return c.addNodeEx(name, smf, defaultRaftTransport, FileStorage)
224231
}
225232

226233
func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
227-
return c.addNodeEx(name, smf, MemoryStorage)
234+
return c.addNodeEx(name, smf, defaultRaftTransport, MemoryStorage)
235+
}
236+
237+
func (c *cluster) addMockMemRaftNode(name string, rtf raftTransportFactory, smf smFactory) stateMachine {
238+
return c.addNodeEx(name, smf, rtf, MemoryStorage)
228239
}
229240

230241
// Driver program for the state machine.

server/raft_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"reflect"
2727
"strings"
2828
"sync"
29+
"sync/atomic"
2930
"testing"
3031
"time"
3132

@@ -6221,3 +6222,62 @@ func TestNRGReset(t *testing.T) {
62216222
// Append entry cache cleared.
62226223
require_Equal(t, len(n.pae), 0)
62236224
}
6225+
6226+
func TestNRGPartitionedPeerRemove(t *testing.T) {
6227+
c := createJetStreamClusterExplicit(t, "R2S", 2)
6228+
defer c.shutdown()
6229+
6230+
hub, _, rg := c.createMockMemRaftGroup("MOCK", 2, newStateAdder)
6231+
defer hub.healPartitions()
6232+
6233+
leader := rg.waitOnLeader().node()
6234+
require_Equal(t, leader.ClusterSize(), 2)
6235+
require_Equal(t, len(rg.followers()), 1)
6236+
follower := rg.followers()[0].node()
6237+
6238+
var hookCalls atomic.Int64
6239+
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
6240+
hookCalls.Add(1)
6241+
})
6242+
6243+
// Remove the follower while the leader is partitioned away
6244+
hub.partition(leader.ID(), 1)
6245+
leader.ProposeRemovePeer(follower.ID())
6246+
6247+
// Follower can't get elected as leader, but let's try anyway
6248+
follower.CampaignImmediately()
6249+
6250+
// Expect progress on the leader side
6251+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
6252+
if leader.ClusterSize() != 1 {
6253+
return errors.New("node removal still in progress")
6254+
}
6255+
return nil
6256+
})
6257+
6258+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
6259+
if leader.MembershipChangeInProgress() {
6260+
return errors.New("membership still in progress")
6261+
}
6262+
return nil
6263+
})
6264+
6265+
require_Equal(t, leader.ClusterSize(), 1)
6266+
require_False(t, leader.MembershipChangeInProgress())
6267+
require_True(t, hookCalls.Load() > 0)
6268+
6269+
// Follower has not changed
6270+
require_Equal(t, follower.State(), Follower)
6271+
require_Equal(t, follower.ClusterSize(), 2)
6272+
require_False(t, follower.MembershipChangeInProgress())
6273+
6274+
// Heal the partition, and expect the follower to get the bad news...
6275+
hub.heal(leader.ID())
6276+
6277+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
6278+
if follower.ClusterSize() != 1 {
6279+
return errors.New("node removal still in progress")
6280+
}
6281+
return nil
6282+
})
6283+
}

0 commit comments

Comments
 (0)