Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/26292.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
scheduler: For service and batch jobs, the scheduler treats a group.count=0 identically to removing the task group from the job, and will stop all non-terminal allocations.
```

```release-note:improvement
scheduler: For service and batch jobs, the scheduler no longer includes stops for already-stopped canaries in plans it submits.
```
129 changes: 61 additions & 68 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2554,32 +2554,34 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {

h := tests.NewHarness(t)

// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
for range 10 {
node := mock.Node()
nodes = append(nodes, node)
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Generate a fake job with allocations
job := mock.Job()
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// allocations w/ DesiredStatus=run that we expect to stop
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
for i := range 10 {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = structs.AllocName(alloc.JobID, alloc.TaskGroup, uint(i))
if i%2 == 0 {
alloc.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, alloc)
}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))

// Add a few terminal status allocations, these should be ignored
// Add a few server-terminal status allocations, these should be ignored
var terminal []*structs.Allocation
for i := 0; i < 5; i++ {
for i := range 5 {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
Expand Down Expand Up @@ -2609,44 +2611,31 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) {

// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
must.NoError(t, err)

// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
must.Len(t, 1, h.Plans)
plan := h.Plans[0]

// Ensure the plan evicted all allocs
// Ensure the plan evicted all non-server-terminal allocs
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(update) != len(allocs) {
t.Fatalf("bad: %#v", plan)
}
must.Eq(t, len(allocs), len(update), must.Sprintf("expected all stopped: %#v", plan))

// Ensure the plan didn't allocated
// Ensure the plan didn't place any allocations
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 0 {
t.Fatalf("bad: %#v", plan)
}
must.Len(t, 0, planned, must.Sprintf("expected no placements: %#v", plan))

// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)

// Ensure all allocations placed
out, _ = structs.FilterTerminalAllocs(out)
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
must.Len(t, 0, out, must.Sprintf("expected no non-terminal allocs: %#v", out))

h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
Expand Down Expand Up @@ -4164,22 +4153,21 @@ func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
ci.Parallel(t)
h := tests.NewHarness(t)

n1 := mock.Node()
n2 := mock.DrainNode()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n1))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), n2))
node := mock.Node()
drainedNode := mock.DrainNode()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), drainedNode))

job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].Update = &structs.UpdateStrategy{Canary: 2}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// previous version allocations
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n1.ID
for i := range 2 {
alloc := mock.MinAllocForJob(job)
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
t.Logf("prev alloc=%q", alloc.ID)
Expand All @@ -4190,14 +4178,11 @@ func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
job.Meta["owner"] = "changed"
job.Version++
var canaries []string
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID

for i := range 2 {
alloc := mock.MinAllocForJob(job)
alloc.NodeID = drainedNode.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
Expand All @@ -4207,24 +4192,30 @@ func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("stopped canary alloc=%q", alloc.ID)
t.Logf("canary on draining node=%q", alloc.ID)
}

// first canary placed from previous drainer eval
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = n2.ID
alloc.Name = fmt.Sprintf("my-job.web[0]")
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.PreviousAllocation = canaries[0]
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
deadCanary := allocs[2]
deadCanary.DesiredStatus = structs.AllocDesiredStatusStop
deadCanary.ClientStatus = structs.AllocClientStatusComplete

canaryToDrain := allocs[3]
canaryToDrain.DesiredStatus = structs.AllocDesiredStatusRun
canaryToDrain.ClientStatus = structs.AllocClientStatusRunning

// replacement canary placed from previous eval
replacement := mock.MinAllocForJob(job)
replacement.NodeID = node.ID
replacement.Name = fmt.Sprintf("my-job.web[0]")
replacement.ClientStatus = structs.AllocClientStatusRunning
replacement.PreviousAllocation = canaries[0]
replacement.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: pointer.Of(false),
Canary: true,
}
allocs = append(allocs, alloc)
canaries = append(canaries, alloc.ID)
t.Logf("new canary alloc=%q", alloc.ID)
allocs = append(allocs, replacement)
canaries = append(canaries, replacement.ID)
t.Logf("replacement canary alloc=%q", replacement.ID)

must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
Expand All @@ -4248,27 +4239,29 @@ func TestServiceSched_NodeDrain_Canaries(t *testing.T) {
must.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))

eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: n2.ID,
Status: structs.EvalStatusPending,
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: drainedNode.ID,
Status: structs.EvalStatusPending,
AnnotatePlan: true,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
h.NextIndex(), []*structs.Evaluation{eval}))

must.NoError(t, h.Process(NewServiceScheduler, eval))
must.Len(t, 1, h.Plans)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
must.MapLen(t, 0, h.Plans[0].NodeAllocation)
must.MapLen(t, 1, h.Plans[0].NodeUpdate)
must.Len(t, 2, h.Plans[0].NodeUpdate[n2.ID])

for _, alloc := range h.Plans[0].NodeUpdate[n2.ID] {
must.SliceContains(t, canaries, alloc.ID)
}
must.MapLen(t, 1, h.Plans[0].NodeAllocation)
must.Len(t, 1, h.Plans[0].NodeAllocation[node.ID])
must.Eq(t, 1, h.Plans[0].Annotations.DesiredTGUpdates["web"].Canary)

must.MapLen(t, 1, h.Plans[0].NodeUpdate)
must.Len(t, 1, h.Plans[0].NodeUpdate[drainedNode.ID])
must.Eq(t, canaryToDrain.ID, h.Plans[0].NodeUpdate[drainedNode.ID][0].ID)
}

func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions scheduler/reconciler/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ func filterAndStopAll(set allocSet, cs ClusterState) (uint64, []AllocStopResult)
return uint64(len(set)), allocsToStop
}

func filterServerTerminalAllocs(all allocSet) (remaining allocSet) {
remaining = make(map[string]*structs.Allocation)
for id, alloc := range all {
if !alloc.ServerTerminalStatus() {
remaining[id] = alloc
}
}
return
}

// filterByTerminal filters out terminal allocs
func filterByTerminal(untainted allocSet) (nonTerminal allocSet) {
nonTerminal = make(map[string]*structs.Allocation)
Expand Down Expand Up @@ -83,6 +93,7 @@ func filterOldTerminalAllocs(a ReconcilerState, all allocSet) (filtered, ignore
// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown
// 5. Those that are on a node that has reconnected.
// 6. Those that are in a state that results in a noop.
// 7. Those that are disconnected and need to be marked lost (and possibly replaced)
func filterByTainted(a allocSet, state ClusterState) (untainted, migrate, lost, disconnecting, reconnecting, ignore, expiring allocSet) {
untainted = make(map[string]*structs.Allocation)
migrate = make(map[string]*structs.Allocation)
Expand Down
19 changes: 12 additions & 7 deletions scheduler/reconciler/reconcile_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,16 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
// that the task group no longer exists
tg := a.jobState.Job.LookupTaskGroup(group)

// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
// If the task group is nil or scaled-to-zero, then the task group has been
// removed so all we need to do is stop everything
if tg == nil || tg.Count == 0 {
all = filterServerTerminalAllocs(all)
result.DesiredTGUpdates[group].Stop, result.Stop = filterAndStopAll(all, a.clusterState)
return result, true
}

all = filterServerTerminalAllocs(all)

dstate, existingDeployment := a.initializeDeploymentState(group, tg)

// Filter allocations that do not need to be considered because they are
Expand Down Expand Up @@ -525,8 +528,9 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
untainted = untainted.union(untaintedDisconnecting)
rescheduleLater = append(rescheduleLater, laterDisconnecting...)

// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
// Find delays for any disconnecting allocs that have
// disconnect.lost_after, create followup evals, and update the
// ClientStatus to unknown.
var followupEvals []*structs.Evaluation
timeoutLaterEvals, followupEvals = a.createTimeoutLaterEvals(disconnecting, tg.Name)
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followupEvals...)
Expand All @@ -538,7 +542,7 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
result.DesiredTGUpdates[tg.Name].RescheduleNow = uint64(len(rescheduleNow))
}

// Find delays for any lost allocs that have stop_after_client_disconnect
// Find delays for any lost allocs that have disconnect.stop_on_client_after
lostLaterEvals := map[string]string{}
lostLater := []*delayedRescheduleInfo{}

Expand All @@ -549,7 +553,7 @@ func (a *AllocReconciler) computeGroup(group string, all allocSet) (*ReconcileRe
result.DesiredFollowupEvals[tg.Name] = append(result.DesiredFollowupEvals[tg.Name], followupEvals...)
}

// Merge disconnecting with the stop_after_client_disconnect set into the
// Merge disconnecting with the disconnect.stop_on_client_after set into the
// lostLaterEvals so that computeStop can add them to the stop set.
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)

Expand Down Expand Up @@ -805,6 +809,7 @@ func (a *AllocReconciler) cancelUnneededCanaries(original allocSet, desiredChang

canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost, _, _, _, _ := filterByTainted(canaries, a.clusterState)

// We don't add these stops to desiredChanges because the deployment is
// still active. DesiredChanges is used to report deployment progress/final
// state. These transient failures aren't meaningful.
Expand Down
Loading
Loading