Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25726.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where draining a node with canaries could result in a stuck deployment
```
127 changes: 119 additions & 8 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ func TestServiceSched_JobModify(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -2417,7 +2417,7 @@ func TestServiceSched_JobModify_Datacenters(t *testing.T) {
job2.Datacenters = []string{"dc1", "dc2"}
require.NoError(h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -2490,7 +2490,7 @@ func TestServiceSched_JobModify_IncrCount_NodeLimit(t *testing.T) {
job2.TaskGroups[0].Count = 3
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -2601,7 +2601,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {
job2.TaskGroups[0].Count = 0
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -2699,7 +2699,7 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -2930,7 +2930,7 @@ func TestServiceSched_JobModify_Canaries(t *testing.T) {
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -3073,7 +3073,7 @@ func TestServiceSched_JobModify_InPlace(t *testing.T) {
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -3714,7 +3714,7 @@ func TestServiceSched_NodeDown(t *testing.T) {
allocs := []*structs.Allocation{alloc}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Create a mock evaluation to deal with drain
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Expand Down Expand Up @@ -4173,6 +4173,117 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)

n1 := mock.Node()
n2 := mock.DrainNode()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n1))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n2))

job := mock.Job()
job.TaskGroups[0].Count = 2
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// previous version allocations
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n1.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
t.Logf("prev alloc=%q", alloc.ID)
}

// canaries on draining node
job = job.Copy()
job.Meta["owner"] = "changed"
job.Version++
var canaries []string
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
}
alloc.DesiredTransition = structs.DesiredTransition{
Migrate: pointer.Of(true),
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("stopped canary alloc=%q", alloc.ID)
}

// first canary placed from previous drainer eval
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID
alloc.Name = fmt.Sprintf("my-job.web[0]")
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.PreviousAllocation = canaries[0]
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("new canary alloc=%q", alloc.ID)

must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

deployment := mock.Deployment()
deployment.JobID = job.ID
deployment.JobVersion = job.Version
deployment.JobCreateIndex = job.CreateIndex
deployment.JobSpecModifyIndex = job.JobModifyIndex
deployment.TaskGroups["web"] = &structs.DeploymentState{
AutoRevert: false,
AutoPromote: false,
Promoted: false,
PlacedCanaries: canaries,
DesiredCanaries: 2,
DesiredTotal: 2,
PlacedAllocs: 3,
HealthyAllocs: 0,
UnhealthyAllocs: 0,
}
must.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))

eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: n2.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 1, h.Plans)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
must.MapLen(t, 0, h.Plans[0].NodeAllocation)
must.MapLen(t, 1, h.Plans[0].NodeUpdate)
must.Len(t, 2, h.Plans[0].NodeUpdate[n2.ID])

for _, alloc := range h.Plans[0].NodeUpdate[n2.ID] {
must.SliceContains(t, canaries, alloc.ID)
}
}

func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
ci.Parallel(t)

Expand Down
21 changes: 12 additions & 9 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3226,14 +3226,16 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
allocs = append(allocs, alloc)
}

n := mock.DrainNode()

// Create two canaries for the new job
handled := make(map[string]allocUpdateType)
for i := 0; i < 2; i++ {
// Create one canary
canary := mock.Alloc()
canary.Job = job
canary.JobID = job.ID
canary.NodeID = uuid.Generate()
canary.NodeID = n.ID
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.DeploymentID = d.ID
Expand All @@ -3244,8 +3246,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {

// Build a map of tainted nodes that contains the last canary
tainted := make(map[string]*structs.Node, 1)
n := mock.DrainNode()
n.ID = allocs[11].NodeID

// This is what drainer sets for draining allocations
allocs[10].DesiredTransition.Migrate = pointer.Of(true)
allocs[11].DesiredTransition.Migrate = pointer.Of(true)
tainted[n.ID] = n

Expand All @@ -3258,18 +3261,18 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
place: 2,
inplace: 0,
stop: 1,
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Canary: 1,
Ignore: 11,
Canary: 2,
Ignore: 10,
},
},
})
assertNamesHaveIndexes(t, intRange(1, 1), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(1, 1), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place))
}

// Tests the reconciler handles migrating a canary correctly on a lost node
Expand Down
8 changes: 8 additions & 0 deletions scheduler/reconcile_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,14 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
continue
}

// Terminal canaries that have been marked for migration need to be
// migrated, otherwise we block deployments from progressing by
// counting them as running canaries.
if alloc.DeploymentStatus.IsCanary() && alloc.DesiredTransition.ShouldMigrate() {
migrate[alloc.ID] = alloc
continue
}

// Terminal allocs, if not reconnect, are always untainted as they
// should never be migrated.
untainted[alloc.ID] = alloc
Expand Down