diff --git a/.changelog/26424.txt b/.changelog/26424.txt new file mode 100644 index 00000000000..702b2d114f0 --- /dev/null +++ b/.changelog/26424.txt @@ -0,0 +1,3 @@ +```release-note:bug +services: Fixed a bug where Nomad services were deleted if a node missed heartbeats and recovered before allocs were migrated +``` diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 92ad8b6a623..51209395406 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1524,62 +1524,6 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) { require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr) } -func TestNode_UpdateStatus_ServiceRegistrations(t *testing.T) { - ci.Parallel(t) - - testServer, serverCleanup := TestServer(t, nil) - defer serverCleanup() - testutil.WaitForLeader(t, testServer.RPC) - testutil.WaitForKeyring(t, testServer.RPC, testServer.config.Region) - - // Create a node and upsert this into state. - node := mock.Node() - must.NoError(t, testServer.State().UpsertNode(structs.MsgTypeTestSetup, 10, node)) - - // Generate service registrations, ensuring the nodeID is set to the - // generated node from above. - services := mock.ServiceRegistrations() - - for _, s := range services { - s.NodeID = node.ID - } - - // Upsert the service registrations into state. - must.NoError(t, testServer.State().UpsertServiceRegistrations(structs.MsgTypeTestSetup, 20, services)) - - // Check the service registrations are in state as we expect, so we can - // have confidence in the rest of the test. - ws := memdb.NewWatchSet() - nodeRegs, err := testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - must.NoError(t, err) - must.Len(t, 2, nodeRegs) - must.Eq(t, nodeRegs[0].NodeID, node.ID) - must.Eq(t, nodeRegs[1].NodeID, node.ID) - - // Generate and trigger a node down status update. This mimics what happens - // when the node fails its heart-beating. - args := structs.NodeUpdateStatusRequest{ - NodeID: node.ID, - Status: structs.NodeStatusDown, - WriteRequest: structs.WriteRequest{Region: "global", AuthToken: node.SecretID}, - } - - var reply structs.NodeUpdateResponse - - nodeEndpoint := NewNodeEndpoint(testServer, nil) - must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) - - // Query our state, to ensure the node service registrations have been - // removed. - nodeRegs, err = testServer.State().GetServiceRegistrationsByNodeID(ws, node.ID) - must.NoError(t, err) - must.Len(t, 0, nodeRegs) - - // Re-send the status update, to ensure we get no error if service - // registrations have already been removed - must.NoError(t, nodeEndpoint.UpdateStatus(&args, &reply)) -} - func TestNode_UpdateStatus_Identity(t *testing.T) { ci.Parallel(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index e51d29a6871..1b3d158dd48 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1163,11 +1163,6 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, req *structs.NodeUpdateStatus return fmt.Errorf("index update failed: %v", err) } - // Deregister any services on the node in the same transaction - if copyNode.Status == structs.NodeStatusDown { - s.deleteServiceRegistrationByNodeIDTxn(txn, txn.Index, copyNode.ID) - } - return nil } @@ -4077,10 +4072,8 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *txn, index uint64, alloc * return fmt.Errorf("setting job status failed: %v", err) } - if copyAlloc.ClientTerminalStatus() { - if err := s.deleteServiceRegistrationByAllocIDTxn(txn, index, copyAlloc.ID); err != nil { - return err - } + if err := s.deregisterServicesForTerminalAllocs(txn, index, copyAlloc); err != nil { + return err } return nil @@ -4296,6 +4289,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation return err } + if err := s.deregisterServicesForTerminalAllocs(txn, index, alloc); err != nil { + return err + } + if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } diff --git a/nomad/state/state_store_service_registration.go b/nomad/state/state_store_service_registration.go index 72390e42038..1b53eb84a6c 100644 --- a/nomad/state/state_store_service_registration.go +++ b/nomad/state/state_store_service_registration.go @@ -185,6 +185,20 @@ func (s *StateStore) deleteServiceRegistrationByAllocIDTxn( return nil } +// deregisterServicesForTerminalAllocs deletes service registration instances +// for allocations that have been marked client-terminal. This allows us to +// remove services for an alloc even if the client hooks fail or the node goes +// down. +func (s *StateStore) deregisterServicesForTerminalAllocs( + txn *txn, index uint64, alloc *structs.Allocation) error { + + if !alloc.ClientTerminalStatus() { + return nil + } + + return s.deleteServiceRegistrationByAllocIDTxn(txn, index, alloc.ID) +} + // GetServiceRegistrations returns an iterator that contains all service // registrations stored within state. This is primarily useful when performing // listings which use the namespace wildcard operator. The caller is diff --git a/nomad/state/state_store_service_registration_test.go b/nomad/state/state_store_service_registration_test.go index 36c3793435b..e495d660254 100644 --- a/nomad/state/state_store_service_registration_test.go +++ b/nomad/state/state_store_service_registration_test.go @@ -639,3 +639,69 @@ func TestStateStore_GetServiceRegistrationsByNodeID(t *testing.T) { must.NoError(t, err) must.Len(t, 1, serviceRegs) } + +func TestAlloc_ServiceRegistrationLifecycle(t *testing.T) { + ci.Parallel(t) + store := testStateStore(t) + index, _ := store.LatestIndex() + + alloc0 := mock.Alloc() + alloc1 := mock.Alloc() + + services := mock.ServiceRegistrations() + services[0].AllocID = alloc0.ID + services[1].AllocID = alloc1.ID + + node := mock.Node() + node.ID = services[0].NodeID + alloc0.NodeID = node.ID + + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) + + index++ + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc0, alloc1})) + + index++ + must.NoError(t, store.UpsertServiceRegistrations( + structs.MsgTypeTestSetup, index, services)) + + // node gets marked lost, but this doesn't delete services + node = node.Copy() + node.Status = structs.NodeStatusDown + services, err := store.GetServiceRegistrationsByNodeID(nil, node.ID) + must.NoError(t, err) + must.Len(t, 1, services) + + // client marks alloc complete, so we clear the service + alloc0 = alloc0.Copy() + alloc0.ClientStatus = structs.AllocClientStatusComplete + index++ + must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc0})) + + iter, err := store.GetServiceRegistrationsByAllocID(nil, alloc0.ID) + must.NoError(t, err) + must.Nil(t, iter.Next()) + + // scheduler/plan marks alloc lost, so we clear the service + planAlloc := new(structs.Allocation) + *planAlloc = *alloc1 + planAlloc.DesiredStatus = structs.AllocDesiredStatusStop + planAlloc.ClientStatus = structs.AllocClientStatusLost + diff := structs.AllocationDiff(*planAlloc) + + index++ + must.NoError(t, store.UpsertPlanResults(structs.MsgTypeTestSetup, index, + &structs.ApplyPlanResultsRequest{ + AllocUpdateRequest: structs.AllocUpdateRequest{ + AllocsStopped: []*structs.AllocationDiff{&diff}, + }, + })) + + iter, err = store.GetServiceRegistrationsByAllocID(nil, alloc1.ID) + must.NoError(t, err) + must.Nil(t, iter.Next()) + +}