Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26662.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler: fixes a bug selecting nodes for updated jobs with ephemeral disks when nodepool changes
```
6 changes: 6 additions & 0 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,12 @@ func (s *GenericScheduler) findPreferredNode(place reconciler.PlacementResult) (
if prev == nil {
return nil, nil
}

// if node pool was updated, don't set the preferred node
if prev.Job != nil && prev.Job.NodePool != s.job.NodePool {
return nil, nil
}

if place.TaskGroup().EphemeralDisk.Sticky || place.TaskGroup().EphemeralDisk.Migrate {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
Expand Down
248 changes: 173 additions & 75 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,98 +127,196 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}

func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
func TestServiceSched_JobRegister_EphemeralDisk(t *testing.T) {
ci.Parallel(t)

h := tests.NewHarness(t)
t.Run("sticky ephemeral allocs in same node pool", func(t *testing.T) {

// Create some nodes
for i := 0; i < 10; i++ {
h := tests.NewHarness(t)

// Create some nodes
for range 10 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Create a job
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = true
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Process the evaluation
must.NoError(t, h.Process(NewServiceScheduler, eval))

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

// Ensure we have created only one new allocation
// Ensure a single plan
if len(h1.Plans) != 1 {
t.Fatalf("bad: %#v", h1.Plans)
}
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 10 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
old, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the same node
must.Eq(t, new.NodeID, old.NodeID)
}
})

t.Run("ephemeral alloc should change node if node pool changes", func(t *testing.T) {
h := tests.NewHarness(t)

// Create some nodes
for range 5 {
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

testNodePool := "test"
node := mock.Node()
node.NodePool = testNodePool
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}

// Create a job
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = true
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create test node pools with different scheduler algorithms.
testPool := mock.NodePool()
testPool.Name = "test"

// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
nodePools := []*structs.NodePool{
testPool,
}
h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)

// Process the evaluation
if err := h.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Create a job
job := mock.Job()
job.TaskGroups[0].EphemeralDisk.Sticky = true
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))

// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))

// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))
// Process the evaluation
must.NoError(t, h.Process(NewServiceScheduler, eval))

// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
if err := h1.Process(NewServiceScheduler, eval); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure the plan allocated
plan := h.Plans[0]
planned := make(map[string]*structs.Allocation)
for _, allocList := range plan.NodeAllocation {
for _, alloc := range allocList {
planned[alloc.ID] = alloc
}
}
if len(planned) != 10 {
t.Fatalf("bad: %#v", plan)
}

// Ensure we have created only one new allocation
// Ensure a single plan
if len(h1.Plans) != 1 {
t.Fatalf("bad: %#v", h1.Plans)
}
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
if len(newPlanned) != 10 {
t.Fatalf("bad plan: %#v", plan)
}
// Ensure that the new allocations were placed on the same node as the older
// ones
for _, new := range newPlanned {
if new.PreviousAllocation == "" {
t.Fatalf("new alloc %q doesn't have a previous allocation", new.ID)
// Update the job to force a rolling upgrade
updated := job.Copy()
updated.TaskGroups[0].Tasks[0].Resources.CPU += 10
updated.NodePool = "test"
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, updated))

// Create a mock evaluation to handle the update
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
h1 := tests.NewHarnessWithState(t, h.State)
must.NoError(t, h1.Process(NewServiceScheduler, eval))

old, ok := planned[new.PreviousAllocation]
if !ok {
t.Fatalf("new alloc %q previous allocation doesn't match any prior placed alloc (%q)", new.ID, new.PreviousAllocation)
// Ensure we have created only one new allocation
// Ensure a single plan
if len(h1.Plans) != 1 {
t.Fatalf("bad: %#v", h1.Plans)
}
if new.NodeID != old.NodeID {
t.Fatalf("new alloc and old alloc node doesn't match; got %q; want %q", new.NodeID, old.NodeID)
plan = h1.Plans[0]
var newPlanned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
newPlanned = append(newPlanned, allocList...)
}
}
if len(newPlanned) != 10 {
t.Fatalf("bad plan: %#v", plan)
}

// ensure new allocation has expected fields
for _, new := range newPlanned {
// new alloc should have a previous allocation
must.NotEq(t, new.PreviousAllocation, "")

// new allocs PreviousAllocation must be a valid previously placed alloc
_, ok := planned[new.PreviousAllocation]
must.True(t, ok)

// new alloc should be placed in the correct node pool
must.Eq(t, new.Job.NodePool, testNodePool)
}
})
}

func TestServiceSched_JobRegister_StickyHostVolumes(t *testing.T) {
Expand Down