Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/26392.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
resource: Added support for disk throttle limits
```
9 changes: 5 additions & 4 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,11 @@ type AllocatedResources struct {
}

type AllocatedTaskResources struct {
Cpu AllocatedCpuResources
Memory AllocatedMemoryResources
Networks []*NetworkResource
Devices []*AllocatedDeviceResource
Cpu AllocatedCpuResources
Memory AllocatedMemoryResources
DiskThrottles []*DiskThrottle
Networks []*NetworkResource
Devices []*AllocatedDeviceResource
}

type AllocatedSharedResources struct {
Expand Down
32 changes: 23 additions & 9 deletions api/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@ import (
// Resources encapsulates the required resources of
// a given task or task group.
type Resources struct {
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
MemoryMaxMB *int `mapstructure:"memory_max" hcl:"memory_max,optional"`
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`
NUMA *NUMAResource `hcl:"numa,block"`
SecretsMB *int `mapstructure:"secrets" hcl:"secrets,optional"`
CPU *int `hcl:"cpu,optional"`
Cores *int `hcl:"cores,optional"`
MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"`
MemoryMaxMB *int `mapstructure:"memory_max" hcl:"memory_max,optional"`
DiskMB *int `mapstructure:"disk" hcl:"disk,optional"`
DiskThrottles []*DiskThrottle `hcl:"disk_throttle,block"`
Networks []*NetworkResource `hcl:"network,block"`
Devices []*RequestedDevice `hcl:"device,block"`
NUMA *NUMAResource `hcl:"numa,block"`
SecretsMB *int `mapstructure:"secrets" hcl:"secrets,optional"`

// COMPAT(0.10)
// XXX Deprecated. Please do not use. The field will be removed in Nomad
Expand Down Expand Up @@ -161,6 +162,19 @@ type CNIConfig struct {
Args map[string]string `hcl:"args,optional"`
}

// DiskThrottle is used to describe disk throttling
type DiskThrottle struct {
Major *int `hcl:"major"`
Minor *int `hcl:"minor"`
ReadBps *uint64 `hcl:"read_bps,optional"`
ReadIops *uint64 `hcl:"read_iops,optional"`
WriteBps *uint64 `hcl:"write_bps,optional"`
WriteIops *uint64 `hcl:"write_iops,optional"`
}

func (d *DiskThrottle) Canonicalize() {
}

// NetworkResource is used to describe required network
// resources of a given task.
type NetworkResource struct {
Expand Down
40 changes: 40 additions & 0 deletions api/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,46 @@ func TestResources_Canonicalize(t *testing.T) {
MemoryMB: pointerOf(1024),
},
},
{
name: "disk throttles",
input: &Resources{
CPU: pointerOf(500),
MemoryMB: pointerOf(1024),
DiskThrottles: []*DiskThrottle{
{
Major: pointerOf(8),
Minor: pointerOf(0),
ReadBps: pointerOf(uint64(1000)),
WriteBps: pointerOf(uint64(1000)),
},
{
Major: pointerOf(8),
Minor: pointerOf(1),
ReadBps: pointerOf(uint64(1000)),
WriteBps: pointerOf(uint64(1000)),
},
},
},
expected: &Resources{
CPU: pointerOf(500),
Cores: pointerOf(0),
MemoryMB: pointerOf(1024),
DiskThrottles: []*DiskThrottle{
{
Major: pointerOf(8),
Minor: pointerOf(0),
ReadBps: pointerOf(uint64(1000)),
WriteBps: pointerOf(uint64(1000)),
},
{
Major: pointerOf(8),
Minor: pointerOf(1),
ReadBps: pointerOf(uint64(1000)),
WriteBps: pointerOf(uint64(1000)),
},
},
},
},
}

for _, tc := range testCases {
Expand Down
41 changes: 41 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,6 +1601,10 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
out.MemoryMaxMB = *in.MemoryMaxMB
}

if in.DiskThrottles != nil {
out.DiskThrottles = ApiDiskThrottleToStructs(in.DiskThrottles)
}

// COMPAT(0.10): Only being used to issue warnings
if in.IOPS != nil {
out.IOPS = *in.IOPS
Expand Down Expand Up @@ -1635,6 +1639,43 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
return out
}

func ApiDiskThrottleToStructs(in []*api.DiskThrottle) []*structs.DiskThrottle {
if len(in) == 0 {
return nil
}

out := make([]*structs.DiskThrottle, len(in))
for i, d := range in {
out[i] = convertDiskThrottle(d)
}
return out
}
func convertDiskThrottle(in *api.DiskThrottle) *structs.DiskThrottle {
if in == nil {
return nil
}
out := &structs.DiskThrottle{}
if in.Major != nil {
out.Major = int64(*in.Major)
}
if in.Minor != nil {
out.Minor = int64(*in.Minor)
}
if in.ReadBps != nil {
out.ReadBps = *in.ReadBps
}
if in.ReadIops != nil {
out.ReadIops = *in.ReadIops
}
if in.WriteBps != nil {
out.WriteBps = *in.WriteBps
}
if in.WriteIops != nil {
out.WriteIops = *in.WriteIops
}
return out
}

func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkResource {
var out []*structs.NetworkResource
if len(in) == 0 {
Expand Down
27 changes: 27 additions & 0 deletions command/agent/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4069,6 +4069,33 @@ func TestConversion_apiResourcesToStructs(t *testing.T) {
},
},
},
{
"with disk throttles",
&api.Resources{
CPU: pointer.Of(100),
MemoryMB: pointer.Of(200),
DiskThrottles: []*api.DiskThrottle{
{
Major: pointer.Of(8),
Minor: pointer.Of(1),
ReadBps: pointer.Of(uint64(1000)),
WriteBps: pointer.Of(uint64(2000)),
},
},
},
&structs.Resources{
CPU: 100,
MemoryMB: 200,
DiskThrottles: []*structs.DiskThrottle{
{
Major: 8,
Minor: 1,
ReadBps: 1000,
WriteBps: 2000,
},
},
},
},
}

for _, c := range cases {
Expand Down
69 changes: 69 additions & 0 deletions drivers/shared/executor/executor_universal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"os/exec"
"strconv"
"strings"
"syscall"

"github.com/hashicorp/go-set/v3"
Expand Down Expand Up @@ -213,6 +214,39 @@ func (e *UniversalExecutor) configureCG1(cgroup string, command *ExecCommand) er
_ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10))
}

// write disk Throttles
for _, throttle := range command.Resources.NomadResources.DiskThrottles {
if throttle.Major == 0 && throttle.Minor == 0 {
continue
}
// write the throttle limits
if command.Resources.NomadResources != nil && len(command.Resources.NomadResources.DiskThrottles) > 0 {
ed = cgroupslib.OpenFromFreezerCG1(cgroup, "blkio")
for _, throttle := range command.Resources.NomadResources.DiskThrottles {
if throttle.Major == 0 && throttle.Minor == 0 {
continue
}

throttleRules := []struct {
Filename string
Value uint64
}{
{"blkio.throttle.read_bps_device", throttle.ReadBps},
{"blkio.throttle.write_bps_device", throttle.WriteBps},
{"blkio.throttle.read_iops_device", throttle.ReadIops},
{"blkio.throttle.write_iops_device", throttle.WriteIops},
}

for _, rule := range throttleRules {
if rule.Value > 0 {
line := fmt.Sprintf("%d:%d %d", throttle.Major, throttle.Minor, rule.Value)
_ = ed.Write(rule.Filename, line)
}
}
}
}
}

// write cpu shares
cpuShares := strconv.FormatInt(command.Resources.LinuxResources.CPUShares, 10)
ed = cgroupslib.OpenFromFreezerCG1(cgroup, "cpu")
Expand Down Expand Up @@ -255,6 +289,41 @@ func (e *UniversalExecutor) configureCG2(cgroup string, command *ExecCommand) {
_ = ed.Write("memory.swappiness", strconv.FormatInt(value, 10))
}

// write disk Throttles
var throttle_lines []string
for _, throttle := range command.Resources.NomadResources.DiskThrottles {
if throttle.Major == 0 && throttle.Minor == 0 {
continue
}

throttleRules := []struct {
Name string
Value uint64
}{
{"rbps", throttle.ReadBps},
{"wbps", throttle.WriteBps},
{"riops", throttle.ReadIops},
{"wiops", throttle.WriteIops},
}

var rules []string
for _, rule := range throttleRules {
if rule.Value > 0 {
rules = append(rules, fmt.Sprintf("%s=%d", rule.Name, rule.Value))
}
}

if len(rules) > 0 {
deviceKey := fmt.Sprintf("%d:%d", throttle.Major, throttle.Minor)
throttle_lines = append(throttle_lines, fmt.Sprintf("%s %s", deviceKey, strings.Join(rules, " ")))
}
}

if len(throttle_lines) > 0 {
content := strings.Join(throttle_lines, "\n")
_ = ed.Write("io.max", content)
}

// write cpu weight cgroup file
cpuWeight := e.computeCPU(command)
ed = cgroupslib.OpenPath(cgroup)
Expand Down
47 changes: 47 additions & 0 deletions drivers/shared/executor/executor_universal_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package executor

import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/lib/cgroupslib"
Expand Down Expand Up @@ -162,3 +164,48 @@ func TestUniversalExecutor_cg1_no_executor_pid(t *testing.T) {
must.Eq(t, pids[0], strconv.Itoa(p.Pid))
}
}

func TestUniversalExecutor_CgroupV2_DiskThrottle(t *testing.T) {
testutil.CgroupsCompatibleV2(t)
ci.Parallel(t)

factory := universalFactory
testExecCmd := testExecutorCommand(t)
execCmd, allocDir := testExecCmd.command, testExecCmd.allocDir
execCmd.Cmd = "sleep"
execCmd.Args = []string{"3"}

execCmd.Resources.NomadResources.DiskThrottles = []*structs.DiskThrottle{
{
Major: 8,
Minor: 0,
ReadBps: 2097152,
WriteIops: 150,
},
}
factory.configureExecCmd(t, execCmd)
defer allocDir.Destroy()
executor := factory.new(testlog.HCLogger(t), compute)
defer executor.Shutdown("", 0)

_, err := executor.Launch(execCmd)
must.NoError(t, err)
time.Sleep(1000 * time.Millisecond)

alloc := filepath.Base(allocDir.AllocDirPath())
cgroupPath := execCmd.OverrideCgroupV2
if cgroupPath == "" {
cgroupPath = filepath.Join("nomad.slice", "share.slice", alloc+".web.scope")
}
ioMaxFile := filepath.Join(cgroupslib.GetDefaultRoot(), cgroupPath, "io.max")
content, err := os.ReadFile(ioMaxFile)
must.NoError(t, err)

stringContent := strings.TrimSpace(string(content))
must.True(t, strings.Contains(stringContent, "8:0"))
must.True(t, strings.Contains(stringContent, "rbps=2097152"))
must.True(t, strings.Contains(stringContent, "wiops=150"))

_, err = executor.Wait(context.Background())
must.NoError(t, err)
}
18 changes: 18 additions & 0 deletions nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,11 @@ func (r *Resources) Diff(other *Resources, contextual bool) *ObjectDiff {
diff.Objects = append(diff.Objects, nDiffs...)
}

// DiskThrottle diff
if nDiffs := diskThrottleDiffs(r.DiskThrottles, other.DiskThrottles, contextual); nDiffs != nil {
diff.Objects = append(diff.Objects, nDiffs...)
}

// Requested Devices diff
if nDiffs := requestedDevicesDiffs(r.Devices, other.Devices, contextual); nDiffs != nil {
diff.Objects = append(diff.Objects, nDiffs...)
Expand Down Expand Up @@ -2651,6 +2656,19 @@ func (n *NetworkResource) Diff(other *NetworkResource, contextual bool) *ObjectD
return diff
}

func (d *DiskThrottle) DiffID() string {
return fmt.Sprintf("%d-%d", d.Major, d.Minor)
}

// Diff returns a diff of two DiskThrottle structs. If contextual diff is enabled,
func diskThrottleDiffs(old, new []*DiskThrottle, contextual bool) []*ObjectDiff {

oldSlice := interfaceSlice(old)
newSlice := interfaceSlice(new)

return primitiveObjectSetDiff(oldSlice, newSlice, nil, "DiskThrottle", contextual)
}

// Diff returns a diff of two DNSConfig structs
func (d *DNSConfig) Diff(other *DNSConfig, contextual bool) *ObjectDiff {
if reflect.DeepEqual(d, other) {
Expand Down
Loading
Loading