Skip to content

Commit 74bc3ce

Browse files
Backport of: wait for service registration cleanup until allocs marked lost (#26424) (#26448)
When a node misses a heartbeat and is marked down, Nomad deletes service registration instances for that node. But if the node then successfully heartbeats before its allocations are marked lost, the services are never restored. The node is unaware that it has missed a heartbeat and there's no anti-entropy on the node in any case. We already delete services when the plan applier marks allocations as stopped, so deleting the services when the node goes down is only an optimization to more quickly divert service traffic. But because the state after a plan apply is the "canonical" view of allocation health, this breaks correctness. Remove the code path that deletes services from nodes when nodes go down. Retain the state store code that deletes services when allocs are marked terminal by the plan applier. Also add a path in the state store to delete services when allocs are marked terminal by the client. This gets back some of the optimization but avoids the correctness bug because marking the allocation client-terminal is a one way operation. Fixes: #16983 Co-authored-by: Tim Gross <[email protected]>
1 parent 218c6ca commit 74bc3ce

File tree

5 files changed

+89
-64
lines changed

5 files changed

+89
-64
lines changed

.changelog/26424.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
services: Fixed a bug where Nomad services were deleted if a node missed heartbeats and recovered before allocs were migrated
3+
```

nomad/node_endpoint_test.go

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1249,61 +1249,6 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) {
12491249
require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr)
12501250
}
12511251

1252-
func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {
1253-
ci.Parallel(t)
1254-
1255-
testServer, serverCleanup := TestServer(t, nil)
1256-
defer serverCleanup()
1257-
testutil.WaitForLeader(t, testServer.RPC)
1258-
1259-
// Create a node and upsert this into state.
1260-
node := mock.Node()
1261-
must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node))
1262-
1263-
// Generate service registrations, ensuring the nodeID is set to the
1264-
// generated node from above.
1265-
services := mock.ServiceRegistrations()
1266-
1267-
for _, s := range services {
1268-
s.NodeID = node.ID
1269-
}
1270-
1271-
// Upsert the service registrations into state.
1272-
must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))
1273-
1274-
// Check the service registrations are in state as we expect, so we can
1275-
// have confidence in the rest of the test.
1276-
ws := memdb.NewWatchSet()
1277-
nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
1278-
must.NoError(t, err)
1279-
must.Len(t, 2, nodeRegs)
1280-
must.Eq(t, nodeRegs[0].NodeID, node.ID)
1281-
must.Eq(t, nodeRegs[1].NodeID, node.ID)
1282-
1283-
// Generate and trigger a node down status update. This mimics what happens
1284-
// when the node fails its heart-beating.
1285-
args := structs.NodeUpdateStatusRequest{
1286-
NodeID: node.ID,
1287-
Status: structs.NodeStatusDown,
1288-
WriteRequest: structs.WriteRequest{Region: "global", AuthToken: node.SecretID},
1289-
}
1290-
1291-
var reply structs.NodeUpdateResponse
1292-
1293-
nodeEndpoint := NewNodeEndpoint(testServer, nil)
1294-
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
1295-
1296-
// Query our state, to ensure the node service registrations have been
1297-
// removed.
1298-
nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
1299-
must.NoError(t, err)
1300-
must.Len(t, 0, nodeRegs)
1301-
1302-
// Re-send the status update, to ensure we get no error if service
1303-
// registrations have already been removed
1304-
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
1305-
}
1306-
13071252
// TestClientEndpoint_UpdateDrain asserts the ability to initiate drain
13081253
// against a node and cancel that drain. It also asserts:
13091254
// * an evaluation is created when the node becomes eligible

nomad/state/state_store.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1150,11 +1150,6 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update
11501150
return fmt.Errorf("index update failed: %v", err)
11511151
}
11521152

1153-
// Deregister any services on the node in the same transaction
1154-
if copyNode.Status == structs.NodeStatusDown {
1155-
s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID)
1156-
}
1157-
11581153
return nil
11591154
}
11601155

@@ -4064,10 +4059,8 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
40644059
return fmt.Errorf("setting job status failed: %v", err)
40654060
}
40664061

4067-
if copyAlloc.ClientTerminalStatus() {
4068-
if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil {
4069-
return err
4070-
}
4062+
if err := s.deregisterServicesForTerminalAllocs(txn, index, copyAlloc); err != nil {
4063+
return err
40714064
}
40724065

40734066
return nil
@@ -4283,6 +4276,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
42834276
return err
42844277
}
42854278

4279+
if err := s.deregisterServicesForTerminalAllocs(txn, index, alloc); err != nil {
4280+
return err
4281+
}
4282+
42864283
if err := txn.Insert("allocs", alloc); err != nil {
42874284
return fmt.Errorf("alloc insert failed: %v", err)
42884285
}

nomad/state/state_store_service_registration.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,20 @@ func (s *StateStore) deleteServiceRegistrationByAllocIDTxn(
185185
return nil
186186
}
187187

188+
// deregisterServicesForTerminalAllocs deletes service registration instances
189+
// for allocations that have been marked client-terminal. This allows us to
190+
// remove services for an alloc even if the client hooks fail or the node goes
191+
// down.
192+
func (s *StateStore) deregisterServicesForTerminalAllocs(
193+
txn *txn, index uint64, alloc *structs.Allocation) error {
194+
195+
if !alloc.ClientTerminalStatus() {
196+
return nil
197+
}
198+
199+
return s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc.ID)
200+
}
201+
188202
// GetServiceRegistrations returns an iterator that contains all service
189203
// registrations stored within state. This is primarily useful when performing
190204
// listings which use the namespace wildcard operator. The caller is

nomad/state/state_store_service_registration_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,3 +639,69 @@ func TestStateStore_GetServiceRegistrationsByNodeID(t *testing.T) {
639639
must.NoError(t, err)
640640
must.Len(t, 1, serviceRegs)
641641
}
642+
643+
func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) {
644+
ci.Parallel(t)
645+
store := testStateStore(t)
646+
index, _ := store.LatestIndex()
647+
648+
alloc0 := mock.Alloc()
649+
alloc1 := mock.Alloc()
650+
651+
services := mock.ServiceRegistrations()
652+
services[0].AllocID = alloc0.ID
653+
services[1].AllocID = alloc1.ID
654+
655+
node := mock.Node()
656+
node.ID = services[0].NodeID
657+
alloc0.NodeID = node.ID
658+
659+
index++
660+
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))
661+
662+
index++
663+
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index,
664+
[]*structs.Allocation{alloc0, alloc1}))
665+
666+
index++
667+
must.NoError(t, store.UpsertServiceRegistrations(
668+
structs.MsgTypeTestSetup, index, services))
669+
670+
// node gets marked lost, but this doesn't delete services
671+
node = node.Copy()
672+
node.Status = structs.NodeStatusDown
673+
services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID)
674+
must.NoError(t, err)
675+
must.Len(t, 1, services)
676+
677+
// client marks alloc complete, so we clear the service
678+
alloc0 = alloc0.Copy()
679+
alloc0.ClientStatus = structs.AllocClientStatusComplete
680+
index++
681+
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index,
682+
[]*structs.Allocation{alloc0}))
683+
684+
iter, err := store.GetServiceRegistrationsByAllocID(nil, alloc0.ID)
685+
must.NoError(t, err)
686+
must.Nil(t, iter.Next())
687+
688+
// scheduler/plan marks alloc lost, so we clear the service
689+
planAlloc := new(structs.Allocation)
690+
*planAlloc = *alloc1
691+
planAlloc.DesiredStatus = structs.AllocDesiredStatusStop
692+
planAlloc.ClientStatus = structs.AllocClientStatusLost
693+
diff := structs.AllocationDiff(*planAlloc)
694+
695+
index++
696+
must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index,
697+
&structs.ApplyPlanResultsRequest{
698+
AllocUpdateRequest: structs.AllocUpdateRequest{
699+
AllocsStopped: []*structs.AllocationDiff{&diff},
700+
},
701+
}))
702+
703+
iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID)
704+
must.NoError(t, err)
705+
must.Nil(t, iter.Next())
706+
707+
}

0 commit comments

Comments
 (0)