Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26424.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
services: Fixed a bug where Nomad services were deleted if a node missed heartbeats and recovered before allocs were migrated
```
56 changes: 0 additions & 56 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1524,62 +1524,6 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) {
require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr)
}

func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) {
ci.Parallel(t)

testServer, serverCleanup := TestServer(t, nil)
defer serverCleanup()
testutil.WaitForLeader(t, testServer.RPC)
testutil.WaitForKeyring(t, testServer.RPC, testServer.config.Region)

// Create a node and upsert this into state.
node := mock.Node()
must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node))

// Generate service registrations, ensuring the nodeID is set to the
// generated node from above.
services := mock.ServiceRegistrations()

for _, s := range services {
s.NodeID = node.ID
}

// Upsert the service registrations into state.
must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services))

// Check the service registrations are in state as we expect, so we can
// have confidence in the rest of the test.
ws := memdb.NewWatchSet()
nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
must.NoError(t, err)
must.Len(t, 2, nodeRegs)
must.Eq(t, nodeRegs[0].NodeID, node.ID)
must.Eq(t, nodeRegs[1].NodeID, node.ID)

// Generate and trigger a node down status update. This mimics what happens
// when the node fails its heart-beating.
args := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global", AuthToken: node.SecretID},
}

var reply structs.NodeUpdateResponse

nodeEndpoint := NewNodeEndpoint(testServer, nil)
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))

// Query our state, to ensure the node service registrations have been
// removed.
nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID)
must.NoError(t, err)
must.Len(t, 0, nodeRegs)

// Re-send the status update, to ensure we get no error if service
// registrations have already been removed
must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply))
}

func TestNode_UpdateStatus_Identity(t *testing.T) {
ci.Parallel(t)

Expand Down
15 changes: 6 additions & 9 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1163,11 +1163,6 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, req *structs.NodeUpdateStatus
return fmt.Errorf("index update failed: %v", err)
}

// Deregister any services on the node in the same transaction
if copyNode.Status == structs.NodeStatusDown {
s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID)
}

return nil
}

Expand Down Expand Up @@ -4077,10 +4072,8 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc *
return fmt.Errorf("setting job status failed: %v", err)
}

if copyAlloc.ClientTerminalStatus() {
if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil {
return err
}
if err := s.deregisterServicesForTerminalAllocs(txn, index, copyAlloc); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -4296,6 +4289,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation
return err
}

if err := s.deregisterServicesForTerminalAllocs(txn, index, alloc); err != nil {
return err
}

if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
Expand Down
14 changes: 14 additions & 0 deletions nomad/state/state_store_service_registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,20 @@ func (s *StateStore) deleteServiceRegistrationByAllocIDTxn(
return nil
}

// deregisterServicesForTerminalAllocs deletes service registration instances
// for allocations that have been marked client-terminal. This allows us to
// remove services for an alloc even if the client hooks fail or the node goes
// down.
func (s *StateStore) deregisterServicesForTerminalAllocs(
txn *txn, index uint64, alloc *structs.Allocation) error {

if !alloc.ClientTerminalStatus() {
return nil
}

return s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc.ID)
}

// GetServiceRegistrations returns an iterator that contains all service
// registrations stored within state. This is primarily useful when performing
// listings which use the namespace wildcard operator. The caller is
Expand Down
66 changes: 66 additions & 0 deletions nomad/state/state_store_service_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,3 +639,69 @@ func TestStateStore_GetServiceRegistrationsByNodeID(t *testing.T) {
must.NoError(t, err)
must.Len(t, 1, serviceRegs)
}

func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) {
ci.Parallel(t)
store := testStateStore(t)
index, _ := store.LatestIndex()

alloc0 := mock.Alloc()
alloc1 := mock.Alloc()

services := mock.ServiceRegistrations()
services[0].AllocID = alloc0.ID
services[1].AllocID = alloc1.ID

node := mock.Node()
node.ID = services[0].NodeID
alloc0.NodeID = node.ID

index++
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))

index++
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc0, alloc1}))

index++
must.NoError(t, store.UpsertServiceRegistrations(
structs.MsgTypeTestSetup, index, services))

// node gets marked lost, but this doesn't delete services
node = node.Copy()
node.Status = structs.NodeStatusDown
services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID)
must.NoError(t, err)
must.Len(t, 1, services)

// client marks alloc complete, so we clear the service
alloc0 = alloc0.Copy()
alloc0.ClientStatus = structs.AllocClientStatusComplete
index++
must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index,
[]*structs.Allocation{alloc0}))

iter, err := store.GetServiceRegistrationsByAllocID(nil, alloc0.ID)
must.NoError(t, err)
must.Nil(t, iter.Next())

// scheduler/plan marks alloc lost, so we clear the service
planAlloc := new(structs.Allocation)
*planAlloc = *alloc1
planAlloc.DesiredStatus = structs.AllocDesiredStatusStop
planAlloc.ClientStatus = structs.AllocClientStatusLost
diff := structs.AllocationDiff(*planAlloc)

index++
must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index,
&structs.ApplyPlanResultsRequest{
AllocUpdateRequest: structs.AllocUpdateRequest{
AllocsStopped: []*structs.AllocationDiff{&diff},
},
}))

iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID)
must.NoError(t, err)
must.Nil(t, iter.Next())

}