Skip to content

Commit 6af511c

Browse files
committed
Only error on constraints if no allocs are running
When running `nomad job run <JOB>` multiple times with constraints defined, there should be no error as a result of filtering out nodes that do not/have not ever satsified the constraints. When running a systems job with constraint, any run after an initial startup returns an exit(2) and a warning about unplaced allocations due to constraints. An error that is not encountered on the initial run, though the constraint stays the same. This is because the node that satisfies the condition is already running the allocation, and the placement is ignored. Another placement is attempted, but the only node(s) left are the ones that do not satisfy the constraint. Nomad views this case (no allocations that were attempted to placed could be placed successfully) as an error, and reports it as such. In reality, no allocations should be placed or updated in this case, but it should not be treated as an error. This change uses the `ignored` placements from diffSystemAlloc to attempt to determine if the case encountered is an error (no ignored placements means that nothing is already running, and is an error), or is not one (an ignored placement means that the task is already running somewhere on a node). It does this at the point where `failedTGAlloc` is populated, so placement functionality isn't changed, just the field that populates error. There is functionality that should be preserved which (correctly) notifies a user if a job is attempted that cannot be run on any node due to the constraints filtering out all available nodes. This should still behave as expected.
1 parent 79d35f0 commit 6af511c

File tree

3 files changed

+237
-13
lines changed

3 files changed

+237
-13
lines changed

scheduler/scheduler_sysbatch_test.go

Lines changed: 114 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1013,11 +1013,6 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
10131013
require.Nil(t, h.Process(NewSysBatchScheduler, eval3))
10141014
require.Equal(t, "complete", h.Evals[2].Status)
10151015

1016-
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
1017-
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
1018-
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
1019-
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")
1020-
10211016
require.Len(t, h.Plans, 2)
10221017
require.Len(t, h.Plans[1].NodeAllocation, 1)
10231018
// Ensure all NodeAllocations are from first Eval
@@ -1041,6 +1036,120 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
10411036
require.Len(t, allocsNodeThree, 1)
10421037
}
10431038

1039+
func TestSysBatch_JobConstraint_AllFiltered(t *testing.T) {
1040+
ci.Parallel(t)
1041+
h := NewHarness(t)
1042+
1043+
// Create two nodes, one with a custom class
1044+
node := mock.Node()
1045+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
1046+
node2 := mock.Node()
1047+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
1048+
1049+
// Create a job with a constraint
1050+
job := mock.SystemBatchJob()
1051+
job.Priority = structs.JobDefaultPriority
1052+
fooConstraint := &structs.Constraint{
1053+
LTarget: "${node.unique.name}",
1054+
RTarget: "something-else",
1055+
Operand: "==",
1056+
}
1057+
job.Constraints = []*structs.Constraint{fooConstraint}
1058+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
1059+
1060+
// Create a mock evaluation to start the job, which will run on the foo node
1061+
eval := &structs.Evaluation{
1062+
Namespace: structs.DefaultNamespace,
1063+
ID: uuid.Generate(),
1064+
Priority: job.Priority,
1065+
TriggeredBy: structs.EvalTriggerJobRegister,
1066+
JobID: job.ID,
1067+
Status: structs.EvalStatusPending,
1068+
}
1069+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
1070+
1071+
// Process the evaluation
1072+
err := h.Process(NewSystemScheduler, eval)
1073+
must.NoError(t, err)
1074+
1075+
// Ensure a single eval
1076+
must.Len(t, 1, h.Evals)
1077+
eval = h.Evals[0]
1078+
1079+
// Ensure that the eval reports failed allocation
1080+
must.Eq(t, len(eval.FailedTGAllocs), 1)
1081+
// Ensure that the failed allocation is due to constraint on both nodes
1082+
must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2)
1083+
}
1084+
1085+
func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) {
1086+
ci.Parallel(t)
1087+
h := NewHarness(t)
1088+
1089+
// Create two nodes, one with a custom class
1090+
fooNode := mock.Node()
1091+
fooNode.Name = "foo"
1092+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode))
1093+
1094+
barNode := mock.Node()
1095+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode))
1096+
1097+
// Create a job with a constraint
1098+
job := mock.SystemBatchJob()
1099+
job.Priority = structs.JobDefaultPriority
1100+
fooConstraint := &structs.Constraint{
1101+
LTarget: "${node.unique.name}",
1102+
RTarget: fooNode.Name,
1103+
Operand: "==",
1104+
}
1105+
job.Constraints = []*structs.Constraint{fooConstraint}
1106+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
1107+
1108+
// Create a mock evaluation to start the job, which will run on the foo node
1109+
eval := &structs.Evaluation{
1110+
Namespace: structs.DefaultNamespace,
1111+
ID: uuid.Generate(),
1112+
Priority: job.Priority,
1113+
TriggeredBy: structs.EvalTriggerJobRegister,
1114+
JobID: job.ID,
1115+
Status: structs.EvalStatusPending,
1116+
}
1117+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
1118+
1119+
// Process the evaluation
1120+
err := h.Process(NewSystemScheduler, eval)
1121+
must.NoError(t, err)
1122+
1123+
// Create a mock evaluation to run the job again, which will not place any
1124+
// new allocations (fooNode is already running, barNode is constrained), but
1125+
// will not report failed allocations
1126+
eval2 := &structs.Evaluation{
1127+
Namespace: structs.DefaultNamespace,
1128+
ID: uuid.Generate(),
1129+
Priority: job.Priority,
1130+
TriggeredBy: structs.EvalTriggerJobRegister,
1131+
JobID: job.ID,
1132+
Status: structs.EvalStatusPending,
1133+
}
1134+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2}))
1135+
1136+
err = h.Process(NewSystemScheduler, eval2)
1137+
must.NoError(t, err)
1138+
1139+
// Ensure a single plan
1140+
must.Len(t, 1, h.Plans)
1141+
1142+
// Ensure that no evals report a failed allocation
1143+
for _, eval := range h.Evals {
1144+
must.Eq(t, 0, len(eval.FailedTGAllocs))
1145+
}
1146+
1147+
// Ensure that plan includes allocation running on fooNode
1148+
must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID])
1149+
// Ensure that plan does not include allocation running on barNode
1150+
must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID])
1151+
}
1152+
10441153
// No errors reported when no available nodes prevent placement
10451154
func TestSysBatch_ExistingAllocNoNodes(t *testing.T) {
10461155
ci.Parallel(t)

scheduler/scheduler_system.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
319319
}
320320

321321
// Compute the placements
322-
return s.computePlacements(diff.place)
322+
return s.computePlacements(diff.place, diff.ignore)
323323
}
324324

325325
func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
@@ -347,12 +347,17 @@ func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
347347
}
348348

349349
// computePlacements computes placements for allocations
350-
func (s *SystemScheduler) computePlacements(place []allocTuple) error {
350+
func (s *SystemScheduler) computePlacements(place, ignored []allocTuple) error {
351351
nodeByID := make(map[string]*structs.Node, len(s.nodes))
352352
for _, node := range s.nodes {
353353
nodeByID[node.ID] = node
354354
}
355355

356+
ignoredByTaskGroup := make(map[string][]allocTuple, len(ignored))
357+
for _, ignoredAlloc := range ignored {
358+
ignoredByTaskGroup[ignoredAlloc.TaskGroup.Name] = append(ignoredByTaskGroup[ignoredAlloc.TaskGroup.Name], ignoredAlloc)
359+
}
360+
356361
// track node filtering, to only report an error if all nodes have been filtered
357362
var filteredMetrics map[string]*structs.AllocMetric
358363

@@ -389,7 +394,9 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
389394
}
390395
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())
391396

392-
if queued <= 0 {
397+
// If no tasks have been placed and there aren't any previously
398+
// running (ignored) tasks on the node, mark the alloc as failed to be placed
399+
if queued <= 0 && len(ignoredByTaskGroup[tgName]) == 0 {
393400
if s.failedTGAllocs == nil {
394401
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
395402
}

scheduler/scheduler_system_test.go

Lines changed: 113 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,11 +1437,6 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
14371437
require.Nil(t, h.Process(NewSystemScheduler, eval3))
14381438
require.Equal(t, "complete", h.Evals[2].Status)
14391439

1440-
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
1441-
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
1442-
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
1443-
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")
1444-
14451440
require.Len(t, h.Plans, 2)
14461441
require.Len(t, h.Plans[1].NodeAllocation, 1)
14471442
// Ensure all NodeAllocations are from first Eval
@@ -1465,6 +1460,119 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
14651460
require.Len(t, allocsNodeThree, 1)
14661461
}
14671462

1463+
func TestSystemSched_JobConstraint_AllFiltered(t *testing.T) {
1464+
ci.Parallel(t)
1465+
h := NewHarness(t)
1466+
1467+
// Create two nodes, one with a custom class
1468+
node := mock.Node()
1469+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
1470+
node2 := mock.Node()
1471+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
1472+
1473+
// Create a job with a constraint
1474+
job := mock.SystemJob()
1475+
job.Priority = structs.JobDefaultPriority
1476+
fooConstraint := &structs.Constraint{
1477+
LTarget: "${node.unique.name}",
1478+
RTarget: "something-else",
1479+
Operand: "==",
1480+
}
1481+
job.Constraints = []*structs.Constraint{fooConstraint}
1482+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
1483+
1484+
// Create a mock evaluation to start the job, which will run on the foo node
1485+
eval := &structs.Evaluation{
1486+
Namespace: structs.DefaultNamespace,
1487+
ID: uuid.Generate(),
1488+
Priority: job.Priority,
1489+
TriggeredBy: structs.EvalTriggerJobRegister,
1490+
JobID: job.ID,
1491+
Status: structs.EvalStatusPending,
1492+
}
1493+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
1494+
1495+
// Process the evaluation
1496+
err := h.Process(NewSystemScheduler, eval)
1497+
must.NoError(t, err)
1498+
1499+
// Ensure a single eval
1500+
must.Len(t, 1, h.Evals)
1501+
eval = h.Evals[0]
1502+
1503+
// Ensure that no evals report a failed allocation
1504+
must.Eq(t, len(eval.FailedTGAllocs), 1)
1505+
must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2)
1506+
}
1507+
1508+
func TestSystemSched_JobConstraint_RunMultiple(t *testing.T) {
1509+
ci.Parallel(t)
1510+
h := NewHarness(t)
1511+
1512+
// Create two nodes, one with a custom class
1513+
fooNode := mock.Node()
1514+
fooNode.Name = "foo"
1515+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode))
1516+
1517+
barNode := mock.Node()
1518+
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode))
1519+
1520+
// Create a job with a constraint
1521+
job := mock.SystemJob()
1522+
job.Priority = structs.JobDefaultPriority
1523+
fooConstraint := &structs.Constraint{
1524+
LTarget: "${node.unique.name}",
1525+
RTarget: fooNode.Name,
1526+
Operand: "==",
1527+
}
1528+
job.Constraints = []*structs.Constraint{fooConstraint}
1529+
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
1530+
1531+
// Create a mock evaluation to start the job, which will run on the foo node
1532+
eval := &structs.Evaluation{
1533+
Namespace: structs.DefaultNamespace,
1534+
ID: uuid.Generate(),
1535+
Priority: job.Priority,
1536+
TriggeredBy: structs.EvalTriggerJobRegister,
1537+
JobID: job.ID,
1538+
Status: structs.EvalStatusPending,
1539+
}
1540+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
1541+
1542+
// Process the evaluation
1543+
err := h.Process(NewSystemScheduler, eval)
1544+
must.NoError(t, err)
1545+
1546+
// Create a mock evaluation to run the job again, which will not place any
1547+
// new allocations (fooNode is already running, barNode is constrained), but
1548+
// will not report failed allocations
1549+
eval2 := &structs.Evaluation{
1550+
Namespace: structs.DefaultNamespace,
1551+
ID: uuid.Generate(),
1552+
Priority: job.Priority,
1553+
TriggeredBy: structs.EvalTriggerJobRegister,
1554+
JobID: job.ID,
1555+
Status: structs.EvalStatusPending,
1556+
}
1557+
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2}))
1558+
1559+
err = h.Process(NewSystemScheduler, eval2)
1560+
must.NoError(t, err)
1561+
1562+
// Ensure a single plan
1563+
must.Len(t, 1, h.Plans)
1564+
1565+
// Ensure that no evals report a failed allocation
1566+
for _, eval := range h.Evals {
1567+
must.Eq(t, 0, len(eval.FailedTGAllocs))
1568+
}
1569+
1570+
// Ensure that plan includes allocation running on fooNode
1571+
must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID])
1572+
// Ensure that plan does not include allocation running on barNode
1573+
must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID])
1574+
}
1575+
14681576
// No errors reported when no available nodes prevent placement
14691577
func TestSystemSched_ExistingAllocNoNodes(t *testing.T) {
14701578
ci.Parallel(t)

0 commit comments

Comments
 (0)