Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/25911.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scaling: Set the scaling policies to disabled when a job is stopped
```
9 changes: 9 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,15 @@ func (j *Job) AddSpread(s *Spread) *Job {
return j
}

func (j *Job) GetScalingPoliciesPerTaskGroup() map[string]*ScalingPolicy {
ret := map[string]*ScalingPolicy{}
for _, tg := range j.TaskGroups {
ret[*tg.Name] = tg.Scaling
}

return ret
}

type WriteRequest struct {
// The target region for this write
Region string
Expand Down
6 changes: 6 additions & 0 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,12 @@ func (g *TaskGroup) AddSpread(s *Spread) *TaskGroup {
return g
}

// AddSpread is used to add a new spread preference to a task group.
func (g *TaskGroup) ScalingPolicy(sp *ScalingPolicy) *TaskGroup {
g.Scaling = sp
return g
}

// LogConfig provides configuration for log rotation
type LogConfig struct {
MaxFiles *int `mapstructure:"max_files" hcl:"max_files,optional"`
Expand Down
53 changes: 53 additions & 0 deletions command/job_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@
package command

import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/jobspec2"
"github.com/posener/complete"
)

Expand Down Expand Up @@ -132,6 +135,35 @@ func (c *JobStartCommand) Run(args []string) int {
// register the job in a not stopped state
*job.Stop = false

// When a job is stopped, all its scaling policies are disabled. Before
// starting the job again, set them back to the last user submitted state.
ps := job.GetScalingPoliciesPerTaskGroup()
if len(ps) > 0 {
sub, _, err := client.Jobs().Submission(*job.ID, int(*job.Version), &api.QueryOptions{
Region: *job.Region,
Namespace: *job.Namespace,
})
if err != nil {
if _, ok := err.(api.UnexpectedResponseError); !ok {
c.Ui.Error(fmt.Sprintf("%+T\n", err) + err.Error())
return 1
}
// If the job was submitted using the API, there are no submissions stored.
c.Ui.Warn("All scaling policies for this job were disabled when it was stopped, resubmit it to enable them again.")
} else {
lastJob, err := parseFromSubmission(sub)
if err != nil {
c.Ui.Error(err.Error())
return 1
}

sps := lastJob.GetScalingPoliciesPerTaskGroup()
for _, tg := range job.TaskGroups {
tg.Scaling.Enabled = sps[*tg.Name].Enabled
}
}
}

resp, _, err := client.Jobs().Register(job, nil)

// Check if the job is periodic or is a parameterized job
Expand Down Expand Up @@ -162,3 +194,24 @@ func (c *JobStartCommand) Run(args []string) int {
mon := newMonitor(c.Ui, client, length)
return mon.monitor(resp.EvalID)
}

func parseFromSubmission(sub *api.JobSubmission) (*api.Job, error) {
var job *api.Job
var err error

switch sub.Format {
case "hcl2":
job, err = jobspec2.Parse("", strings.NewReader(sub.Source))
if err != nil {
return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err)
}

case "json":
err = json.Unmarshal([]byte(sub.Source), &job)
if err != nil {
return nil, fmt.Errorf("Unable to parse job submission to re-enable scaling policies: %w", err)
}
}

return job, nil
}
82 changes: 81 additions & 1 deletion command/job_start_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
package command

import (
"encoding/json"
"testing"

"github.com/hashicorp/cli"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -41,7 +43,79 @@ func TestStartCommand(t *testing.T) {
client, err := cmd.Meta.Client()
must.NoError(t, err)

_, _, err = client.Jobs().Register(job, nil)
jsonBytes, err := json.Marshal(job)
must.NoError(t, err)

_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{
Submission: &api.JobSubmission{
Source: string(jsonBytes),
Format: "json",
},
}, nil)
must.NoError(t, err)

waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")

_, _, err = client.Jobs().Deregister(*job.ID, false, nil)
must.Nil(t, err)

waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "")

res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res)

pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.True(t, *job.TaskGroups[0].Scaling.Enabled)

})

t.Run("succeeds when starting a stopped job with disabled scaling policies and no submissions", func(t *testing.T) {
job := testJob(uuid.Generate())

client, err := cmd.Meta.Client()
must.NoError(t, err)

job.TaskGroups[0].Scaling.Enabled = pointer.Of(false)

_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{}, nil)
must.NoError(t, err)

waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")

_, _, err = client.Jobs().Deregister(*job.ID, false, nil)
must.Nil(t, err)

waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusComplete, "")

res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res)

pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.False(t, *job.TaskGroups[0].Scaling.Enabled)

})

t.Run("succeeds when starting a stopped job with enabled scaling policies", func(t *testing.T) {
job := testJob(uuid.Generate())

client, err := cmd.Meta.Client()
must.NoError(t, err)

job.TaskGroups[0].Scaling.Enabled = pointer.Of(true)

jsonBytes, err := json.Marshal(job)
must.NoError(t, err)

_, _, err = client.Jobs().RegisterOpts(job, &api.RegisterOptions{
Submission: &api.JobSubmission{
Source: string(jsonBytes),
Format: "json",
},
}, nil)
must.NoError(t, err)

waitForJobAllocsStatus(t, client, *job.ID, api.AllocClientStatusRunning, "")
Expand All @@ -53,6 +127,12 @@ func TestStartCommand(t *testing.T) {

res := cmd.Run([]string{"-address", addr, *job.ID})
must.Zero(t, res)

pol, _, err := client.Scaling().ListPolicies(nil)
must.NoError(t, err)
must.One(t, len(pol))
must.True(t, *job.TaskGroups[0].Scaling.Enabled)

})

t.Run("fails to start a job not previously stopped", func(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion command/testing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ func testJob(jobID string) *api.Job {
AddTask(task).
RequireDisk(&api.EphemeralDisk{
SizeMB: pointer.Of(20),
})
}).ScalingPolicy(&api.ScalingPolicy{
Min: pointer.Of(int64(1)),
Max: pointer.Of(int64(5)),
Enabled: pointer.Of(true),
})

job := api.NewBatchJob(jobID, jobID, "global", 1).
AddDatacenter("dc1").
Expand Down
44 changes: 26 additions & 18 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by
// handleJobDeregister is used to deregister a job. Leaves error logging up to
// caller.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, submitTime int64, noShutdownDelay bool, tx state.Txn) error {

// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
return fmt.Errorf("periodicDispatcher.Remove failed: %w", err)
Expand Down Expand Up @@ -833,27 +834,34 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
return fmt.Errorf("JobByID lookup failed: %w", err)
}
return nil
}

if current == nil {
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
}
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx)
if err != nil {
return fmt.Errorf("JobByID lookup failed: %w", err)
}

stopped := current.Copy()
stopped.Stop = true
if submitTime != 0 {
stopped.SubmitTime = submitTime
}
if current == nil {
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
}

if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil {
return fmt.Errorf("UpsertJob failed: %w", err)
}
stopped := current.Copy()
stopped.Stop = true
if submitTime != 0 {
stopped.SubmitTime = submitTime
}

// Disable scaling policies to avoid monitoring stopped jobs
scalingPolicies := stopped.GetScalingPolicies()
for _, policy := range scalingPolicies {
policy.Enabled = false
}

if err := n.state.UpsertJobTxn(index, nil, stopped, tx); err != nil {
return fmt.Errorf("UpsertJob failed: %w", err)
}

return nil
Expand Down
12 changes: 12 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,12 +977,19 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
fsm := testFSM(t)

job := mock.PeriodicJob()

job.TaskGroups[0].Scaling = &structs.ScalingPolicy{
ID: "mockID",
Enabled: true,
}

req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}

buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
Expand Down Expand Up @@ -1040,6 +1047,11 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
if launchOut == nil {
t.Fatalf("launch not found!")
}

// Verify the scaling policies were disabled
for _, policy := range jobOut.GetScalingPolicies() {
must.False(t, policy.Enabled)
}
}

func TestFSM_BatchDeregisterJob(t *testing.T) {
Expand Down