diff --git a/.licenseignore b/.licenseignore index 308c12d2e..6758bcc23 100644 --- a/.licenseignore +++ b/.licenseignore @@ -3,6 +3,8 @@ pkg/descheduler pkg/descheduler/controllers/migration/controllerfinder pkg/scheduler/plugins/coscheduling pkg/util/kubelet +pkg/util/cpuset/cpuset.go +pkg/util/cpuset/cpuset_test.go test/e2e/common test/e2e/framework test/e2e/generated diff --git a/pkg/koordlet/resmanager/cpu_suppress.go b/pkg/koordlet/resmanager/cpu_suppress.go index 43ceed21d..2cca5b2c4 100644 --- a/pkg/koordlet/resmanager/cpu_suppress.go +++ b/pkg/koordlet/resmanager/cpu_suppress.go @@ -29,7 +29,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" apiext "github.com/koordinator-sh/koordinator/apis/extension" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" @@ -40,6 +39,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" "github.com/koordinator-sh/koordinator/pkg/util" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" "github.com/koordinator-sh/koordinator/pkg/util/system" ) @@ -324,11 +324,11 @@ func calculateBESuppressCPUSetPolicy(cpus int32, processorInfos []util.Processor } // applyCPUSetWithNonePolicy applies the be suppress policy by writing best-effort cgroups -func applyCPUSetWithNonePolicy(cpuset []int32, oldCPUSet []int32) error { +func applyCPUSetWithNonePolicy(cpus []int32, oldCPUSet []int32) error { // 1. get current be cgroups cpuset // 2. temporarily write with a union of old cpuset and new cpuset from upper to lower, to avoid cgroup conflicts // 3. write with the new cpuset from lower to upper to apply the real policy - if len(cpuset) <= 0 { + if len(cpus) <= 0 { klog.Warningf("applyCPUSetWithNonePolicy skipped due to the empty cpuset") return nil } @@ -340,23 +340,23 @@ func applyCPUSetWithNonePolicy(cpuset []int32, oldCPUSet []int32) error { } // write a loose cpuset for all be cgroups before applying the real policy - mergedCPUSet := util.MergeCPUSet(oldCPUSet, cpuset) - mergedCPUSetStr := util.GenerateCPUSetStr(mergedCPUSet) + mergedCPUSet := cpuset.MergeCPUSet(oldCPUSet, cpus) + mergedCPUSetStr := cpuset.GenerateCPUSetStr(mergedCPUSet) klog.V(6).Infof("applyCPUSetWithNonePolicy temporarily writes cpuset from upper cgroup to lower, cpuset %v", mergedCPUSet) writeBECgroupsCPUSet(cpusetCgroupPaths, mergedCPUSetStr, false) // apply the suppress policy from lower to upper - cpusetStr := util.GenerateCPUSetStr(cpuset) + cpusetStr := cpuset.GenerateCPUSetStr(cpus) klog.V(6).Infof("applyCPUSetWithNonePolicy writes suppressed cpuset from lower cgroup to upper, cpuset %v", - cpuset) + cpus) writeBECgroupsCPUSet(cpusetCgroupPaths, cpusetStr, true) - metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpuset))) + metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpus))) return nil } -func applyCPUSetWithStaticPolicy(cpuset []int32) error { - if len(cpuset) <= 0 { +func applyCPUSetWithStaticPolicy(cpus []int32) error { + if len(cpus) <= 0 { klog.Warningf("applyCPUSetWithStaticPolicy skipped due to the empty cpuset") return nil } @@ -367,10 +367,10 @@ func applyCPUSetWithStaticPolicy(cpuset []int32) error { return fmt.Errorf("apply be suppress policy failed, err: %s", err) } - cpusetStr := util.GenerateCPUSetStr(cpuset) - klog.V(6).Infof("applyCPUSetWithStaticPolicy writes suppressed cpuset to containers, cpuset %v", cpuset) + cpusetStr := cpuset.GenerateCPUSetStr(cpus) + klog.V(6).Infof("applyCPUSetWithStaticPolicy writes suppressed cpuset to containers, cpuset %v", cpus) writeBECgroupsCPUSet(containerPaths, cpusetStr, false) - metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpuset))) + metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpus))) return nil } diff --git a/pkg/koordlet/resmanager/cpu_suppress_test.go b/pkg/koordlet/resmanager/cpu_suppress_test.go index 709f521a8..439213905 100644 --- a/pkg/koordlet/resmanager/cpu_suppress_test.go +++ b/pkg/koordlet/resmanager/cpu_suppress_test.go @@ -438,7 +438,7 @@ func Test_cpuSuppress_suppressBECPU(t *testing.T) { }, wantBECFSQuota: -1, wantCFSQuotaPolicyStatus: &policyRecovered, - wantBECPUSet: "0,1", + wantBECPUSet: "0-1", wantCPUSetPolicyStatus: &policyUsing, }, { @@ -1171,7 +1171,7 @@ func Test_applyCPUSetWithNonePolicy(t *testing.T) { testingPrepareBECgroupData(helper, podDirs, "1,2") cpuset := []int32{3, 2, 1} - wantCPUSetStr := "3,2,1" + wantCPUSetStr := "1-3" oldCPUSet, err := util.GetRootCgroupCurCPUSet(corev1.PodQOSBestEffort) assert.NoError(t, err) @@ -1231,7 +1231,7 @@ func Test_adjustByCPUSet(t *testing.T) { }, oldCPUSets: "7,6,3,2", }, - wantCPUSet: "2,3,4", + wantCPUSet: "2-4", }, { name: "test scale up by cpuset.", @@ -1251,7 +1251,7 @@ func Test_adjustByCPUSet(t *testing.T) { }, oldCPUSets: "7,6", }, - wantCPUSet: "2,3,4", + wantCPUSet: "2-4", }, } ctrl := gomock.NewController(t) @@ -1544,7 +1544,7 @@ func TestCPUSuppress_applyBESuppressCPUSet(t *testing.T) { wants: wants{ beDirCPUSet: "0-15", podDirCPUSet: "0-15", - containerDirCPUSet: "0,1,2,3", + containerDirCPUSet: "0-3", }, }, } diff --git a/pkg/koordlet/statesinformer/states_noderesourcetopology.go b/pkg/koordlet/statesinformer/states_noderesourcetopology.go index f0a90e842..52bbea71e 100644 --- a/pkg/koordlet/statesinformer/states_noderesourcetopology.go +++ b/pkg/koordlet/statesinformer/states_noderesourcetopology.go @@ -36,12 +36,12 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" - "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/features" "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" "github.com/koordinator-sh/koordinator/pkg/util" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" kubeletutil "github.com/koordinator-sh/koordinator/pkg/util/kubelet" ) diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go index b44afc008..b1316e79c 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go @@ -23,24 +23,25 @@ import ( "k8s.io/apimachinery/pkg/util/sets" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func takeCPUs( topology *CPUTopology, maxRefCount int, - availableCPUs CPUSet, + availableCPUs cpuset.CPUSet, allocatedCPUs CPUDetails, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy, -) (CPUSet, error) { +) (cpuset.CPUSet, error) { acc := newCPUAccumulator(topology, maxRefCount, availableCPUs, allocatedCPUs, numCPUsNeeded, cpuExclusivePolicy, numaAllocatedStrategy) if acc.isSatisfied() { return acc.result, nil } if acc.isFailed() { - return NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") + return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request") } fullPCPUs := cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs @@ -169,7 +170,7 @@ func takeCPUs( } } - return NewCPUSet(), fmt.Errorf("failed to allocate cpus") + return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus") } type cpuAccumulator struct { @@ -182,13 +183,13 @@ type cpuAccumulator struct { exclusiveInNUMANodes sets.Int exclusivePolicy schedulingconfig.CPUExclusivePolicy numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy - result CPUSet + result cpuset.CPUSet } func newCPUAccumulator( topology *CPUTopology, maxRefCount int, - availableCPUs CPUSet, + availableCPUs cpuset.CPUSet, allocatedCPUs CPUDetails, numCPUsNeeded int, exclusivePolicy schedulingconfig.CPUExclusivePolicy, @@ -224,7 +225,7 @@ func newCPUAccumulator( exclusivePolicy: exclusivePolicy, numCPUsNeeded: numCPUsNeeded, numaAllocateStrategy: numaAllocateStrategy, - result: NewCPUSet(), + result: cpuset.NewCPUSet(), } } @@ -630,7 +631,7 @@ func (a *cpuAccumulator) freeCPUs(filterExclusive bool) []int { socketColoScores := make(map[int]int) for socketID := range socketFreeScores { - socketColoScore := a.topology.CPUDetails.CPUsInSockets(socketID).Intersection(a.result).Count() + socketColoScore := a.topology.CPUDetails.CPUsInSockets(socketID).Intersection(a.result).Size() socketColoScores[socketID] = socketColoScore } diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go index 190f0de5c..fc4851cba 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_accumulator_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func buildCPUTopologyForTest(numSockets, nodesPerSocket, coresPerNode, cpusPerCore int) *CPUTopology { @@ -60,95 +61,95 @@ func TestTakeFullPCPUs(t *testing.T) { name string topology *CPUTopology maxRefCount int - allocatedCPUs CPUSet + allocatedCPUs cpuset.CPUSet numCPUsNeeded int wantError bool - wantResult CPUSet + wantResult cpuset.CPUSet }{ { name: "allocate on non-NUMA node", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 2, - wantResult: NewCPUSet(0, 1), + wantResult: cpuset.NewCPUSet(0, 1), }, { name: "with allocated cpus", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1), + allocatedCPUs: cpuset.NewCPUSet(0, 1), numCPUsNeeded: 2, - wantResult: NewCPUSet(2, 3), + wantResult: cpuset.NewCPUSet(2, 3), }, { name: "allocate whole socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 8, - wantResult: MustParse("0-7"), + wantResult: cpuset.MustParse("0-7"), }, { name: "allocate across socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 12, - wantResult: MustParse("0-11"), + wantResult: cpuset.MustParse("0-11"), }, { name: "allocate whole socket with partially-allocated socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1), + allocatedCPUs: cpuset.NewCPUSet(0, 1), numCPUsNeeded: 8, - wantResult: MustParse("8-15"), + wantResult: cpuset.MustParse("8-15"), }, { name: "allocate in the smallest idle socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-5,16-23"), + allocatedCPUs: cpuset.MustParse("0-5,16-23"), numCPUsNeeded: 6, - wantResult: MustParse("24-29"), + wantResult: cpuset.MustParse("24-29"), }, { name: "allocate the most of CPUs on the same socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-5,16-23"), + allocatedCPUs: cpuset.MustParse("0-5,16-23"), numCPUsNeeded: 12, - wantResult: MustParse("6-15,24-25"), + wantResult: cpuset.MustParse("6-15,24-25"), }, { name: "allocate from first socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-3,8-11"), + allocatedCPUs: cpuset.MustParse("0-3,8-11"), numCPUsNeeded: 4, - wantResult: MustParse("4-7"), + wantResult: cpuset.MustParse("4-7"), }, { name: "allocate with less spread cpus", topology: buildCPUTopologyForTest(2, 2, 2, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 12), numCPUsNeeded: 4, - wantResult: NewCPUSet(10, 11, 14, 15), + wantResult: cpuset.NewCPUSet(10, 11, 14, 15), }, { name: "allocate with the most spread cpus", topology: buildCPUTopologyForTest(2, 2, 2, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 10, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 10, 12), numCPUsNeeded: 6, - wantResult: NewCPUSet(5, 6, 7, 13, 14, 15), + wantResult: cpuset.NewCPUSet(5, 6, 7, 13, 14, 15), }, { name: "allocate with the most spread cpus on the smallest idle cpus socket", topology: buildCPUTopologyForTest(2, 2, 2, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 9, 10, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 9, 10, 12), numCPUsNeeded: 6, - wantResult: NewCPUSet(6, 7, 11, 13, 14, 15), + wantResult: cpuset.NewCPUSet(6, 7, 11, 13, 14, 15), }, } @@ -176,95 +177,95 @@ func TestTakeFullPCPUsWithNUMALeastAllocated(t *testing.T) { name string topology *CPUTopology maxRefCount int - allocatedCPUs CPUSet + allocatedCPUs cpuset.CPUSet numCPUsNeeded int wantError bool - wantResult CPUSet + wantResult cpuset.CPUSet }{ { name: "allocate on non-NUMA node", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 2, - wantResult: NewCPUSet(0, 1), + wantResult: cpuset.NewCPUSet(0, 1), }, { name: "with allocated cpus", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1), + allocatedCPUs: cpuset.NewCPUSet(0, 1), numCPUsNeeded: 2, - wantResult: NewCPUSet(2, 3), + wantResult: cpuset.NewCPUSet(2, 3), }, { name: "allocate whole socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 8, - wantResult: MustParse("0-7"), + wantResult: cpuset.MustParse("0-7"), }, { name: "allocate across socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 12, - wantResult: MustParse("0-11"), + wantResult: cpuset.MustParse("0-11"), }, { name: "allocate whole socket with partially-allocated socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1), + allocatedCPUs: cpuset.NewCPUSet(0, 1), numCPUsNeeded: 8, - wantResult: MustParse("8-15"), + wantResult: cpuset.MustParse("8-15"), }, { name: "allocate in the most idle socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-5,16-23"), + allocatedCPUs: cpuset.MustParse("0-5,16-23"), numCPUsNeeded: 6, - wantResult: MustParse("8-13"), + wantResult: cpuset.MustParse("8-13"), }, { name: "allocate the most of CPUs on the same socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-5,16-23"), + allocatedCPUs: cpuset.MustParse("0-5,16-23"), numCPUsNeeded: 12, - wantResult: MustParse("6-15,24-25"), + wantResult: cpuset.MustParse("6-15,24-25"), }, { name: "allocate from second socket", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: MustParse("0-3,8-11"), + allocatedCPUs: cpuset.MustParse("0-3,8-11"), numCPUsNeeded: 4, - wantResult: MustParse("16-19"), + wantResult: cpuset.MustParse("16-19"), }, { name: "allocate with less spread cpus", topology: buildCPUTopologyForTest(2, 2, 2, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 12), numCPUsNeeded: 4, - wantResult: NewCPUSet(10, 11, 14, 15), + wantResult: cpuset.NewCPUSet(10, 11, 14, 15), }, { name: "allocate with the less spread cpus 2", topology: buildCPUTopologyForTest(2, 2, 2, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 10, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 10, 12), numCPUsNeeded: 6, - wantResult: NewCPUSet(6, 7, 14, 15, 1, 3), + wantResult: cpuset.NewCPUSet(6, 7, 14, 15, 1, 3), }, { name: "allocate with the most spread cpus on the most idle cpus socket 3", topology: buildCPUTopologyForTest(2, 2, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2, 4, 8, 9, 10, 12), + allocatedCPUs: cpuset.NewCPUSet(0, 2, 4, 8, 9, 10, 12), numCPUsNeeded: 6, - wantResult: NewCPUSet(16, 17, 18, 19, 20, 21), + wantResult: cpuset.NewCPUSet(16, 17, 18, 19, 20, 21), }, } @@ -302,41 +303,41 @@ func TestTakeSpreadByPCPUs(t *testing.T) { name string topology *CPUTopology maxRefCount int - allocatedCPUs CPUSet + allocatedCPUs cpuset.CPUSet numCPUsNeeded int wantError bool - wantResult CPUSet + wantResult cpuset.CPUSet }{ { name: "allocate on non-NUMA node", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 4, - wantResult: NewCPUSet(0, 2, 4, 6), + wantResult: cpuset.NewCPUSet(0, 2, 4, 6), }, { name: "allocate satisfied the partially-allocated socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2), + allocatedCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 4, - wantResult: NewCPUSet(1, 3, 4, 6), + wantResult: cpuset.NewCPUSet(1, 3, 4, 6), }, { name: "allocate cpus on full-free socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1, 2, 3), + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(8, 10, 12, 14), }, { name: "allocate most of CPUs in the same socket and overlapped-cores", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2), + allocatedCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 6, - wantResult: MustParse("1,3-7"), + wantResult: cpuset.MustParse("1,3-7"), }, } @@ -374,41 +375,41 @@ func TestTakeSpreadByPCPUsWithNUMALeastAllocated(t *testing.T) { name string topology *CPUTopology maxRefCount int - allocatedCPUs CPUSet + allocatedCPUs cpuset.CPUSet numCPUsNeeded int wantError bool - wantResult CPUSet + wantResult cpuset.CPUSet }{ { name: "allocate on non-NUMA node", topology: buildCPUTopologyForTest(1, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 4, - wantResult: NewCPUSet(0, 2, 4, 6), + wantResult: cpuset.NewCPUSet(0, 2, 4, 6), }, { name: "allocate satisfied the partially-allocated socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2), + allocatedCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(8, 10, 12, 14), }, { name: "allocate cpus on full-free socket", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 1, 2, 3), + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(8, 10, 12, 14), }, { name: "allocate most of CPUs in the same socket and overlapped-cores", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedCPUs: NewCPUSet(0, 2), + allocatedCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 6, - wantResult: MustParse("8,10,12,14,9,11"), + wantResult: cpuset.MustParse("8,10,12,14,9,11"), }, } @@ -436,87 +437,87 @@ func TestTakeCPUsWithExclusivePolicy(t *testing.T) { name string topology *CPUTopology maxRefCount int - allocatedExclusiveCPUs CPUSet + allocatedExclusiveCPUs cpuset.CPUSet allocatedExclusivePolicy schedulingconfig.CPUExclusivePolicy bindPolicy schedulingconfig.CPUBindPolicy exclusivePolicy schedulingconfig.CPUExclusivePolicy numCPUsNeeded int wantError bool - wantResult CPUSet + wantResult cpuset.CPUSet }{ { name: "allocate cpus on full-free socket with PCPULevel", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(8, 10, 12, 14), }, { name: "allocate overlapped cpus with PCPULevel", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, numCPUsNeeded: 10, - wantResult: NewCPUSet(0, 1, 2, 3, 4, 6, 8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(0, 1, 2, 3, 4, 6, 8, 10, 12, 14), }, { name: "allocate cpus on large-size partially-allocated socket with PCPULevel", topology: buildCPUTopologyForTest(2, 1, 8, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), numCPUsNeeded: 4, - wantResult: NewCPUSet(4, 6, 8, 10), + wantResult: cpuset.NewCPUSet(4, 6, 8, 10), }, { name: "allocate cpus with none exclusive policy", topology: buildCPUTopologyForTest(2, 1, 8, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), exclusivePolicy: schedulingconfig.CPUExclusivePolicyNone, numCPUsNeeded: 4, - wantResult: NewCPUSet(1, 3, 4, 6), + wantResult: cpuset.NewCPUSet(1, 3, 4, 6), }, { name: "allocate cpus on full-free socket with NUMANodeLevel", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), allocatedExclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, exclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 10, 12, 14), + wantResult: cpuset.NewCPUSet(8, 10, 12, 14), }, { name: "allocate cpus on partially-allocated socket without NUMANodeLevel", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), allocatedExclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, exclusivePolicy: schedulingconfig.CPUExclusivePolicyNone, numCPUsNeeded: 4, - wantResult: NewCPUSet(1, 3, 4, 6), + wantResult: cpuset.NewCPUSet(1, 3, 4, 6), }, { name: "allocate cpus on full-free socket with NUMANodeLevel with PCPUs", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), allocatedExclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, exclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, bindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, - wantResult: NewCPUSet(8, 9, 10, 11), + wantResult: cpuset.NewCPUSet(8, 9, 10, 11), }, { name: "allocate cpus on partially-allocated socket without NUMANodeLevel with PCPUs", topology: buildCPUTopologyForTest(2, 1, 4, 2), maxRefCount: 1, - allocatedExclusiveCPUs: NewCPUSet(0, 2), + allocatedExclusiveCPUs: cpuset.NewCPUSet(0, 2), allocatedExclusivePolicy: schedulingconfig.CPUExclusivePolicyNUMANodeLevel, exclusivePolicy: schedulingconfig.CPUExclusivePolicyNone, bindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, - wantResult: NewCPUSet(4, 5, 6, 7), + wantResult: cpuset.NewCPUSet(4, 5, 6, 7), }, } @@ -568,31 +569,31 @@ func TestTakeCPUsWithMaxRefCount(t *testing.T) { // first pod request 4 CPUs podUID := uuid.NewUUID() - availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err := takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("0-3"))) + assert.True(t, result.Equals(cpuset.MustParse("0-3"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) // second pod request 5 CPUs podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 5, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("0,4-7"))) + assert.True(t, result.Equals(cpuset.MustParse("0,4-7"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) // third pod request 4 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 4, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("2-5"))) + assert.True(t, result.Equals(cpuset.MustParse("2-5"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) } @@ -609,46 +610,46 @@ func TestTakeCPUsSortByRefCount(t *testing.T) { // first pod request 16 CPUs podUID := uuid.NewUUID() - availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err := takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("0,2,4,6,8,10,12,14,16,18,20,22,24,26,28,30"))) + assert.True(t, result.Equals(cpuset.MustParse("0,2,4,6,8,10,12,14,16,18,20,22,24,26,28,30"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) // second pod request 16 CPUs podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15"))) + assert.True(t, result.Equals(cpuset.MustParse("0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) // third pod request 16 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicySpreadByPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31"))) + assert.True(t, result.Equals(cpuset.MustParse("1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) // forth pod request 16 cpus podUID = uuid.NewUUID() - availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) + availableCPUs, allocatedCPUsDetails = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) result, err = takeCPUs( cpuTopology, 2, availableCPUs, allocatedCPUsDetails, 16, schedulingconfig.CPUBindPolicyFullPCPUs, schedulingconfig.CPUExclusivePolicyNone, schedulingconfig.NUMAMostAllocated) - assert.True(t, result.Equals(MustParse("16-31"))) + assert.True(t, result.Equals(cpuset.MustParse("16-31"))) assert.NoError(t, err) allocationState.addCPUs(cpuTopology, podUID, result, schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) - assert.Equal(t, MustParse(""), availableCPUs) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + assert.Equal(t, cpuset.MustParse(""), availableCPUs) } func BenchmarkTakeCPUsWithSameCoreFirst(b *testing.B) { diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go index e5e167f0d..54dafee8e 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation.go @@ -22,29 +22,30 @@ import ( "k8s.io/apimachinery/pkg/types" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) type cpuAllocation struct { lock sync.Mutex nodeName string - allocatedPods map[types.UID]CPUSet + allocatedPods map[types.UID]cpuset.CPUSet allocatedCPUs CPUDetails } func newCPUAllocation(nodeName string) *cpuAllocation { return &cpuAllocation{ nodeName: nodeName, - allocatedPods: map[types.UID]CPUSet{}, + allocatedPods: map[types.UID]cpuset.CPUSet{}, allocatedCPUs: NewCPUDetails(), } } -func (n *cpuAllocation) updateAllocatedCPUSet(cpuTopology *CPUTopology, podUID types.UID, cpuset CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) { +func (n *cpuAllocation) updateAllocatedCPUSet(cpuTopology *CPUTopology, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) { n.releaseCPUs(podUID) n.addCPUs(cpuTopology, podUID, cpuset, cpuExclusivePolicy) } -func (n *cpuAllocation) addCPUs(cpuTopology *CPUTopology, podUID types.UID, cpuset CPUSet, exclusivePolicy schedulingconfig.CPUExclusivePolicy) { +func (n *cpuAllocation) addCPUs(cpuTopology *CPUTopology, podUID types.UID, cpuset cpuset.CPUSet, exclusivePolicy schedulingconfig.CPUExclusivePolicy) { if _, ok := n.allocatedPods[podUID]; ok { return } @@ -82,7 +83,7 @@ func (n *cpuAllocation) releaseCPUs(podUID types.UID) { } } -func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs CPUSet) (availableCPUs CPUSet, allocateInfo CPUDetails) { +func (n *cpuAllocation) getAvailableCPUs(cpuTopology *CPUTopology, maxRefCount int, reservedCPUs cpuset.CPUSet) (availableCPUs cpuset.CPUSet, allocateInfo CPUDetails) { allocateInfo = n.allocatedCPUs.Clone() allocated := allocateInfo.CPUs().Filter(func(cpuID int) bool { return allocateInfo[cpuID].RefCount >= maxRefCount diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go index 8217c165c..90cdbeaf1 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_allocation_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func TestNodeAllocationStateAddCPUs(t *testing.T) { @@ -36,14 +37,14 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) { allocationState := newCPUAllocation("test-node-1") assert.NotNil(t, allocationState) podUID := uuid.NewUUID() - allocationState.addCPUs(cpuTopology, podUID, MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) + allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) - cpuset := MustParse("1-4") - expectAllocatedPods := map[types.UID]CPUSet{ - podUID: cpuset, + cpus := cpuset.MustParse("1-4") + expectAllocatedPods := map[types.UID]cpuset.CPUSet{ + podUID: cpus, } expectAllocatedCPUs := CPUDetails{} - for _, cpuID := range cpuset.ToSliceNoSort() { + for _, cpuID := range cpus.ToSliceNoSort() { cpuInfo := cpuTopology.CPUDetails[cpuID] cpuInfo.ExclusivePolicy = schedulingconfig.CPUExclusivePolicyPCPULevel cpuInfo.RefCount++ @@ -53,23 +54,23 @@ func TestNodeAllocationStateAddCPUs(t *testing.T) { assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods) assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs) - availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) - expectAvailableCPUs := MustParse("0-15") + availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + expectAvailableCPUs := cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) // test with add already allocated Pod - allocationState.addCPUs(cpuTopology, podUID, MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) + allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods) assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) - MustParse("0-15") + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) // test with add already allocated cpu(refCount > 1 but less than maxRefCount) and another pod anotherPodUID := uuid.NewUUID() - allocationState.addCPUs(cpuTopology, anotherPodUID, MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel) - anotherCPUSet := MustParse("2-5") + allocationState.addCPUs(cpuTopology, anotherPodUID, cpuset.MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel) + anotherCPUSet := cpuset.MustParse("2-5") expectAllocatedPods[anotherPodUID] = anotherCPUSet for _, cpuID := range anotherCPUSet.ToSliceNoSort() { cpuInfo, ok := expectAllocatedCPUs[cpuID] @@ -94,11 +95,11 @@ func TestNodeAllocationStateReleaseCPUs(t *testing.T) { allocationState := newCPUAllocation("test-node-1") assert.NotNil(t, allocationState) podUID := uuid.NewUUID() - allocationState.addCPUs(cpuTopology, podUID, MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) + allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) allocationState.releaseCPUs(podUID) - expectAllocatedPods := map[types.UID]CPUSet{} + expectAllocatedPods := map[types.UID]cpuset.CPUSet{} expectAllocatedCPUs := CPUDetails{} assert.Equal(t, expectAllocatedPods, allocationState.allocatedPods) assert.Equal(t, expectAllocatedCPUs, allocationState.allocatedCPUs) @@ -117,21 +118,21 @@ func Test_cpuAllocation_getAvailableCPUs(t *testing.T) { allocationState := newCPUAllocation("test-node-1") assert.NotNil(t, allocationState) podUID := uuid.NewUUID() - allocationState.addCPUs(cpuTopology, podUID, MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) + allocationState.addCPUs(cpuTopology, podUID, cpuset.MustParse("1-4"), schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) - expectAvailableCPUs := MustParse("0-15") + availableCPUs, _ := allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + expectAvailableCPUs := cpuset.MustParse("0-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) // test with add already allocated cpu(refCount > 1 but less than maxRefCount) and another pod anotherPodUID := uuid.NewUUID() - allocationState.addCPUs(cpuTopology, anotherPodUID, MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, NewCPUSet()) - expectAvailableCPUs = MustParse("0-1,5-15") + allocationState.addCPUs(cpuTopology, anotherPodUID, cpuset.MustParse("2-5"), schedulingconfig.CPUExclusivePolicyPCPULevel) + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 2, cpuset.NewCPUSet()) + expectAvailableCPUs = cpuset.MustParse("0-1,5-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) allocationState.releaseCPUs(podUID) - availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, NewCPUSet()) - expectAvailableCPUs = MustParse("0-1,6-15") + availableCPUs, _ = allocationState.getAvailableCPUs(cpuTopology, 1, cpuset.NewCPUSet()) + expectAvailableCPUs = cpuset.MustParse("0-1,6-15") assert.Equal(t, expectAvailableCPUs, availableCPUs) } diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go b/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go index fc484dda0..a1756851b 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_manager.go @@ -29,6 +29,7 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) type CPUManager interface { @@ -36,9 +37,9 @@ type CPUManager interface { node *corev1.Node, numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, - cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) (CPUSet, error) + cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) (cpuset.CPUSet, error) - UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) + UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) Free(nodeName string, podUID types.UID) @@ -48,7 +49,7 @@ type CPUManager interface { cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) int64 - GetAvailableCPUs(nodeName string) (availableCPUs CPUSet, allocated CPUDetails, err error) + GetAvailableCPUs(nodeName string) (availableCPUs cpuset.CPUSet, allocated CPUDetails, err error) } type cpuManagerImpl struct { @@ -111,8 +112,8 @@ func (c *cpuManagerImpl) Allocate( numCPUsNeeded int, cpuBindPolicy schedulingconfig.CPUBindPolicy, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy, -) (CPUSet, error) { - result := CPUSet{} +) (cpuset.CPUSet, error) { + result := cpuset.CPUSet{} // The Pod requires the CPU to be allocated according to CPUBindPolicy, // but the current node does not have a NodeResourceTopology or a valid CPUTopology, // so this error should be exposed to the user @@ -145,7 +146,7 @@ func (c *cpuManagerImpl) Allocate( return result, err } -func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) { +func (c *cpuManagerImpl) UpdateAllocatedCPUSet(nodeName string, podUID types.UID, cpuset cpuset.CPUSet, cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy) { cpuTopologyOptions := c.topologyManager.GetCPUTopologyOptions(nodeName) if cpuTopologyOptions.CPUTopology == nil || !cpuTopologyOptions.CPUTopology.IsValid() { return @@ -276,13 +277,13 @@ func (c *cpuManagerImpl) getNUMAAllocateStrategy(node *corev1.Node) schedulingco return numaAllocateStrategy } -func (c *cpuManagerImpl) GetAvailableCPUs(nodeName string) (availableCPUs CPUSet, allocated CPUDetails, err error) { +func (c *cpuManagerImpl) GetAvailableCPUs(nodeName string) (availableCPUs cpuset.CPUSet, allocated CPUDetails, err error) { cpuTopologyOptions := c.topologyManager.GetCPUTopologyOptions(nodeName) if cpuTopologyOptions.CPUTopology == nil { - return NewCPUSet(), nil, errors.New(ErrNotFoundCPUTopology) + return cpuset.NewCPUSet(), nil, errors.New(ErrNotFoundCPUTopology) } if !cpuTopologyOptions.CPUTopology.IsValid() { - return NewCPUSet(), nil, fmt.Errorf("cpuTopology is invalid") + return cpuset.NewCPUSet(), nil, fmt.Errorf("cpuTopology is invalid") } allocation := c.getOrCreateAllocation(nodeName) diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go b/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go index 04eba49c4..13ef23f15 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_topology.go @@ -18,6 +18,7 @@ package nodenumaresource import ( schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) // CPUTopology contains details of node cpu @@ -130,7 +131,7 @@ func (d CPUDetails) Clone() CPUDetails { } // KeepOnly returns a new CPUDetails object with only the supplied cpus. -func (d CPUDetails) KeepOnly(cpus CPUSet) CPUDetails { +func (d CPUDetails) KeepOnly(cpus cpuset.CPUSet) CPUDetails { result := CPUDetails{} for cpu, info := range d { if cpus.Contains(cpu) { @@ -141,8 +142,8 @@ func (d CPUDetails) KeepOnly(cpus CPUSet) CPUDetails { } // NUMANodes returns the NUMANode IDs associated with the CPUs in this CPUDetails. -func (d CPUDetails) NUMANodes() CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) NUMANodes() cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, info := range d { b.Add(info.NodeID) } @@ -150,8 +151,8 @@ func (d CPUDetails) NUMANodes() CPUSet { } // NUMANodesInSockets returns the logical NUMANode IDs associated with the given socket IDs in this CPUDetails. -func (d CPUDetails) NUMANodesInSockets(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) NUMANodesInSockets(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for _, info := range d { if info.SocketID == id { @@ -163,8 +164,8 @@ func (d CPUDetails) NUMANodesInSockets(ids ...int) CPUSet { } // Sockets returns the socket IDs associated with the CPUs in this CPUDetails. -func (d CPUDetails) Sockets() CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) Sockets() cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, info := range d { b.Add(info.SocketID) } @@ -172,8 +173,8 @@ func (d CPUDetails) Sockets() CPUSet { } // CPUsInSockets returns logical CPU IDs associated with the given socket IDs in this CPUDetails. -func (d CPUDetails) CPUsInSockets(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CPUsInSockets(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for cpu, info := range d { if info.SocketID == id { @@ -185,8 +186,8 @@ func (d CPUDetails) CPUsInSockets(ids ...int) CPUSet { } // SocketsInNUMANodes returns the socket IDs associated with the given NUMANode IDs in this CPUDetails. -func (d CPUDetails) SocketsInNUMANodes(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) SocketsInNUMANodes(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for _, info := range d { if info.NodeID == id { @@ -198,8 +199,8 @@ func (d CPUDetails) SocketsInNUMANodes(ids ...int) CPUSet { } // Cores returns the core IDs associated with the CPUs in this CPUDetails. -func (d CPUDetails) Cores() CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) Cores() cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, info := range d { b.Add(info.CoreID) } @@ -207,8 +208,8 @@ func (d CPUDetails) Cores() CPUSet { } // CoresInNUMANodes returns the core IDs associated with the given NUMANode IDs in this CPUDetails. -func (d CPUDetails) CoresInNUMANodes(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CoresInNUMANodes(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for _, info := range d { if info.NodeID == id { @@ -220,8 +221,8 @@ func (d CPUDetails) CoresInNUMANodes(ids ...int) CPUSet { } // CoresInSockets returns the core IDs associated with the given socket IDs in this CPUDetails. -func (d CPUDetails) CoresInSockets(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CoresInSockets(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for _, info := range d { if info.SocketID == id { @@ -233,8 +234,8 @@ func (d CPUDetails) CoresInSockets(ids ...int) CPUSet { } // CPUs returns the logical CPU IDs in this CPUDetails. -func (d CPUDetails) CPUs() CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CPUs() cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for cpuID := range d { b.Add(cpuID) } @@ -242,8 +243,8 @@ func (d CPUDetails) CPUs() CPUSet { } // CPUsInNUMANodes returns the logical CPU IDs associated with the given NUMANode IDs in this CPUDetails. -func (d CPUDetails) CPUsInNUMANodes(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CPUsInNUMANodes(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for cpu, info := range d { if info.NodeID == id { @@ -255,8 +256,8 @@ func (d CPUDetails) CPUsInNUMANodes(ids ...int) CPUSet { } // CPUsInCores returns the logical CPU IDs associated with the given core IDs in this CPUDetails. -func (d CPUDetails) CPUsInCores(ids ...int) CPUSet { - b := NewCPUSetBuilder() +func (d CPUDetails) CPUsInCores(ids ...int) cpuset.CPUSet { + b := cpuset.NewCPUSetBuilder() for _, id := range ids { for cpu, info := range d { if info.CoreID == id { diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager.go b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager.go index 6fc1733a9..65c8e9777 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) // CPUTopologyManager manages the CPU Topology and CPU assignments options. @@ -31,7 +32,7 @@ type CPUTopologyManager interface { type CPUTopologyOptions struct { CPUTopology *CPUTopology `json:"cpuTopology,omitempty"` - ReservedCPUs CPUSet `json:"reservedCPUs,omitempty"` + ReservedCPUs cpuset.CPUSet `json:"reservedCPUs,omitempty"` MaxRefCount int `json:"maxRefCount,omitempty"` Policy *extension.KubeletCPUManagerPolicy `json:"policy,omitempty"` } diff --git a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go index 881a260d4..693056c83 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/cpu_topology_manager_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func TestCPUTopologyManager(t *testing.T) { @@ -109,7 +110,7 @@ func TestCPUTopologyManager(t *testing.T) { assert.Equal(t, 1, cpuTopologyOptions.MaxRefCount) - expectReservedCPUs := MustParse("0-3") + expectReservedCPUs := cpuset.MustParse("0-3") assert.Equal(t, expectReservedCPUs, cpuTopologyOptions.ReservedCPUs) delete(topology.Annotations, extension.AnnotationNodeCPUAllocs) diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 557884dee..dd494a12f 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -30,6 +30,7 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/util" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) const ( @@ -171,7 +172,7 @@ type preFilterState struct { preferredCPUBindPolicy schedulingconfig.CPUBindPolicy preferredCPUExclusivePolicy schedulingconfig.CPUExclusivePolicy numCPUsNeeded int - allocatedCPUs CPUSet + allocatedCPUs cpuset.CPUSet } func (s *preFilterState) Clone() framework.StateData { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_service.go b/pkg/scheduler/plugins/nodenumaresource/plugin_service.go index 82a5d4db5..229501408 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_service.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_service.go @@ -22,13 +22,14 @@ import ( "github.com/gin-gonic/gin" "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/services" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) var _ services.APIServiceProvider = &Plugin{} type AvailableCPUsResponse struct { - AvailableCPUs CPUSet `json:"availableCPUs,omitempty"` - Allocated CPUDetails `json:"allocated,omitempty"` + AvailableCPUs cpuset.CPUSet `json:"availableCPUs,omitempty"` + Allocated CPUDetails `json:"allocated,omitempty"` } func (p *Plugin) RegisterEndpoints(group *gin.RouterGroup) { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go index d37290c05..688d78f20 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go @@ -28,6 +28,7 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func TestEndpointsQueryCPUTopologyOptions(t *testing.T) { @@ -39,7 +40,7 @@ func TestEndpointsQueryCPUTopologyOptions(t *testing.T) { expectedCPUTopologyOptions := CPUTopologyOptions{ CPUTopology: buildCPUTopologyForTest(2, 1, 4, 2), - ReservedCPUs: MustParse("0-1"), + ReservedCPUs: cpuset.MustParse("0-1"), MaxRefCount: 1, Policy: &extension.KubeletCPUManagerPolicy{ Policy: extension.KubeletCPUManagerPolicyStatic, @@ -73,7 +74,7 @@ func TestEndpointsQueryAvailableCPUsOptions(t *testing.T) { cpuTopologyOptions := CPUTopologyOptions{ CPUTopology: buildCPUTopologyForTest(2, 1, 4, 2), - ReservedCPUs: MustParse("0-1"), + ReservedCPUs: cpuset.MustParse("0-1"), MaxRefCount: 1, Policy: &extension.KubeletCPUManagerPolicy{ Policy: extension.KubeletCPUManagerPolicyStatic, @@ -86,7 +87,7 @@ func TestEndpointsQueryAvailableCPUsOptions(t *testing.T) { *options = cpuTopologyOptions }) - p.cpuManager.UpdateAllocatedCPUSet("test-node-1", uuid.NewUUID(), MustParse("0,2,4,6"), schedulingconfig.CPUExclusivePolicyNone) + p.cpuManager.UpdateAllocatedCPUSet("test-node-1", uuid.NewUUID(), cpuset.MustParse("0,2,4,6"), schedulingconfig.CPUExclusivePolicyNone) engine := gin.Default() p.RegisterEndpoints(engine.Group("/")) @@ -99,7 +100,7 @@ func TestEndpointsQueryAvailableCPUsOptions(t *testing.T) { assert.NoError(t, err) expectedResponse := &AvailableCPUsResponse{ - AvailableCPUs: MustParse("3,5,7-15"), + AvailableCPUs: cpuset.MustParse("3,5,7-15"), Allocated: CPUDetails{}, } for _, v := range []int{0, 2, 4, 6} { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index dde24f9fe..0a5544567 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -40,6 +40,7 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" _ "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/scheme" ) @@ -807,7 +808,7 @@ func TestPlugin_Reserve(t *testing.T) { allocationState *cpuAllocation allocatedCPUs []int want *framework.Status - wantCPUSet CPUSet + wantCPUSet cpuset.CPUSet }{ { name: "error with missing preFilterState", @@ -854,7 +855,7 @@ func TestPlugin_Reserve(t *testing.T) { allocationState: newCPUAllocation("test-node-1"), pod: &corev1.Pod{}, want: nil, - wantCPUSet: NewCPUSet(0, 1, 2, 3), + wantCPUSet: cpuset.NewCPUSet(0, 1, 2, 3), }, { name: "error with big request cpu", @@ -888,7 +889,7 @@ func TestPlugin_Reserve(t *testing.T) { allocatedCPUs: []int{0, 1, 2, 3}, pod: &corev1.Pod{}, want: nil, - wantCPUSet: NewCPUSet(16, 17, 18, 19), + wantCPUSet: cpuset.NewCPUSet(16, 17, 18, 19), }, { name: "succeed with valid cpu topology and node numa most allocate strategy", @@ -908,7 +909,7 @@ func TestPlugin_Reserve(t *testing.T) { allocatedCPUs: []int{0, 1, 2, 3}, pod: &corev1.Pod{}, want: nil, - wantCPUSet: NewCPUSet(4, 5, 6, 7), + wantCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), }, } for _, tt := range tests { @@ -943,7 +944,7 @@ func TestPlugin_Reserve(t *testing.T) { options.CPUTopology = tt.cpuTopology }) if len(tt.allocatedCPUs) > 0 { - tt.allocationState.addCPUs(tt.cpuTopology, uuid.NewUUID(), NewCPUSet(tt.allocatedCPUs...), schedulingconfig.CPUExclusivePolicyNone) + tt.allocationState.addCPUs(tt.cpuTopology, uuid.NewUUID(), cpuset.NewCPUSet(tt.allocatedCPUs...), schedulingconfig.CPUExclusivePolicyNone) } } @@ -980,7 +981,7 @@ func TestPlugin_Unreserve(t *testing.T) { resourceSpec: &extension.ResourceSpec{ PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, }, - allocatedCPUs: NewCPUSet(0, 1, 2, 3), + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), } cpuTopology := buildCPUTopologyForTest(2, 1, 4, 2) @@ -1038,7 +1039,7 @@ func TestPlugin_PreBind(t *testing.T) { resourceSpec: &extension.ResourceSpec{ PreferredCPUBindPolicy: extension.CPUBindPolicyFullPCPUs, }, - allocatedCPUs: NewCPUSet(0, 1, 2, 3), + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), } cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) @@ -1085,7 +1086,7 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { PreferredCPUBindPolicy: extension.CPUBindPolicyDefault, }, preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, - allocatedCPUs: NewCPUSet(0, 1, 2, 3), + allocatedCPUs: cpuset.NewCPUSet(0, 1, 2, 3), } cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) diff --git a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go index c505735a2..9dde650dc 100644 --- a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go +++ b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/pkg/util" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) type podEventHandler struct { @@ -89,8 +90,8 @@ func (c *podEventHandler) updatePod(oldPod, pod *corev1.Pod) { if err != nil { return } - cpuset, err := Parse(resourceStatus.CPUSet) - if err != nil || cpuset.IsEmpty() { + cpus, err := cpuset.Parse(resourceStatus.CPUSet) + if err != nil || cpus.IsEmpty() { return } @@ -99,7 +100,7 @@ func (c *podEventHandler) updatePod(oldPod, pod *corev1.Pod) { return } - c.cpuManager.UpdateAllocatedCPUSet(pod.Spec.NodeName, pod.UID, cpuset, resourceSpec.PreferredCPUExclusivePolicy) + c.cpuManager.UpdateAllocatedCPUSet(pod.Spec.NodeName, pod.UID, cpus, resourceSpec.PreferredCPUExclusivePolicy) } func (c *podEventHandler) deletePod(pod *corev1.Pod) { @@ -111,8 +112,8 @@ func (c *podEventHandler) deletePod(pod *corev1.Pod) { if err != nil { return } - cpuset, err := Parse(resourceStatus.CPUSet) - if err != nil || cpuset.IsEmpty() { + cpus, err := cpuset.Parse(resourceStatus.CPUSet) + if err != nil || cpus.IsEmpty() { return } diff --git a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler_test.go b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler_test.go index 3572f7ccf..bc589d62e 100644 --- a/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/pod_eventhandler_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) func TestPodEventHandler(t *testing.T) { @@ -32,7 +33,7 @@ func TestPodEventHandler(t *testing.T) { name string pod *corev1.Pod wantAdd bool - want CPUSet + want cpuset.CPUSet }{ { name: "pending pod", @@ -95,7 +96,7 @@ func TestPodEventHandler(t *testing.T) { }, }, wantAdd: true, - want: MustParse("0-3"), + want: cpuset.MustParse("0-3"), }, } @@ -124,7 +125,7 @@ func TestPodEventHandler(t *testing.T) { t.Errorf("expect not add the Pod but found") } - cpusetBuilder := NewCPUSetBuilder() + cpusetBuilder := cpuset.NewCPUSetBuilder() for _, v := range allocation.allocatedCPUs { cpusetBuilder.Add(v.CPUID) } diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_eventhandler.go b/pkg/scheduler/plugins/nodenumaresource/topology_eventhandler.go index e2fcfc811..d6b4089b9 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_eventhandler.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_eventhandler.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) type nodeResourceTopologyEventHandler struct { @@ -110,9 +111,9 @@ func (m *nodeResourceTopologyEventHandler) updateNodeResourceTopology(oldNodeRes if err != nil { klog.Errorf("Failed to GetKubeletCPUManagerPolicy from NodeResourceTopology %s, err: %v", newNodeResTopology.Name, err) } - var kubeletReservedCPUs CPUSet + var kubeletReservedCPUs cpuset.CPUSet if kubeletPolicy != nil { - kubeletReservedCPUs, err = Parse(kubeletPolicy.ReservedCPUs) + kubeletReservedCPUs, err = cpuset.Parse(kubeletPolicy.ReservedCPUs) if err != nil { klog.Errorf("Failed to Parse kubelet reserved CPUs %s, err: %v", kubeletPolicy.ReservedCPUs, err) } @@ -138,16 +139,16 @@ func (m *nodeResourceTopologyEventHandler) updateNodeResourceTopology(oldNodeRes }) } -func (m *nodeResourceTopologyEventHandler) getPodAllocsCPUSet(podCPUAllocs extension.PodCPUAllocs) CPUSet { +func (m *nodeResourceTopologyEventHandler) getPodAllocsCPUSet(podCPUAllocs extension.PodCPUAllocs) cpuset.CPUSet { if len(podCPUAllocs) == 0 { - return CPUSet{} + return cpuset.CPUSet{} } - builder := NewCPUSetBuilder() + builder := cpuset.NewCPUSetBuilder() for _, v := range podCPUAllocs { if !v.ManagedByKubelet || v.UID == "" || v.CPUSet == "" { continue } - cpuset, err := Parse(v.CPUSet) + cpuset, err := cpuset.Parse(v.CPUSet) if err != nil || cpuset.IsEmpty() { continue } diff --git a/pkg/util/container.go b/pkg/util/container.go index 64a44f876..4f30c04b7 100644 --- a/pkg/util/container.go +++ b/pkg/util/container.go @@ -20,6 +20,7 @@ import ( "fmt" "os" "path" + "path/filepath" "strconv" "strings" @@ -267,3 +268,8 @@ func ParseContainerId(data string) (cType, cID string, err error) { cType, cID = parts[0], parts[1] return } + +// WriteCgroupCPUSet writes the cgroup cpuset file according to the specified cgroup dir +func WriteCgroupCPUSet(cgroupFileDir, cpusetStr string) error { + return os.WriteFile(filepath.Join(cgroupFileDir, system.CPUSFileName), []byte(cpusetStr), 0644) +} diff --git a/pkg/util/container_test.go b/pkg/util/container_test.go index e96f7a012..092afa832 100644 --- a/pkg/util/container_test.go +++ b/pkg/util/container_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" "github.com/koordinator-sh/koordinator/pkg/util/system" ) @@ -856,3 +857,24 @@ func writeCgroupContent(filePath string, content []byte) error { } return os.WriteFile(filePath, content, 0655) } + +func Test_UtilCgroupCPUSet(t *testing.T) { + // prepare testing files + dname := t.TempDir() + + cpus := []int32{5, 1, 0} + cpusetStr := cpuset.GenerateCPUSetStr(cpus) + + err := WriteCgroupCPUSet(dname, cpusetStr) + assert.NoError(t, err) + + rawContent, err := os.ReadFile(filepath.Join(dname, system.CPUSFileName)) + assert.NoError(t, err) + + gotCPUSetStr := string(rawContent) + assert.Equal(t, cpusetStr, gotCPUSetStr) + + gotCPUSet, err := cpuset.ParseCPUSetStr(gotCPUSetStr) + assert.NoError(t, err) + assert.Equal(t, []int32{0, 1, 5}, gotCPUSet) +} diff --git a/pkg/util/cpuset.go b/pkg/util/cpuset.go deleted file mode 100644 index 5af0c8700..000000000 --- a/pkg/util/cpuset.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright 2022 The Koordinator Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "fmt" - "os" - "path/filepath" - "sort" - "strconv" - "strings" - - "github.com/koordinator-sh/koordinator/pkg/util/system" -) - -// MergeCPUSet merges the old cpuset with the new one, and also deduplicate and keeps a desc order by processor ids -// e.g. [1,0], [3,2,2,1] => [3,2,1,0] -func MergeCPUSet(old, new []int32) []int32 { - cpuMap := map[int32]struct{}{} - - for _, id := range old { - cpuMap[id] = struct{}{} - } - for _, id := range new { - cpuMap[id] = struct{}{} - } - - var merged []int32 - for id := range cpuMap { - merged = append(merged, id) - } - sort.Slice(merged, func(i, j int) bool { - return merged[i] > merged[j] - }) - - return merged -} - -// ParseCPUSetStr parses cpuset string into a slice -// eg. "0-5,34,46-48" => [0,1,2,3,4,5,34,46,47,48] -func ParseCPUSetStr(cpusetStr string) ([]int32, error) { - cpusetStr = strings.Trim(strings.TrimSpace(cpusetStr), "\n") - if cpusetStr == "" { - return nil, nil - } - - // split CPU list string - // eg. "0-5,34,46-48" => ["0-5", "34", "46-48"] - ranges := strings.Split(cpusetStr, ",") - - var cpuset []int32 - for _, r := range ranges { - boundaries := strings.Split(r, "-") - if len(boundaries) == 1 { - // only one element case, eg. "46" - elem, err := strconv.ParseInt(boundaries[0], 10, 32) - if err != nil { - return nil, err - } - cpuset = append(cpuset, int32(elem)) - } else if len(boundaries) == 2 { - // multi-element case, eg. "0-5" - start, err := strconv.ParseInt(boundaries[0], 10, 32) - if err != nil { - return nil, err - } - end, err := strconv.ParseInt(boundaries[1], 10, 32) - if err != nil { - return nil, err - } - // add all elements to the result. - // e.g. "0-5" => [0, 1, 2, 3, 4, 5] - for e := start; e <= end; e++ { - cpuset = append(cpuset, int32(e)) - } - } - } - - return cpuset, nil -} - -// GenerateCPUSetStr generates the cpuset string from the cpuset slice -// eg. [3,2,1,0] => "3,2,1,0" -func GenerateCPUSetStr(cpuset []int32) string { - return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(cpuset)), ","), "[]") -} - -// WriteCgroupCPUSet writes the cgroup cpuset file according to the specified cgroup dir -func WriteCgroupCPUSet(cgroupFileDir, cpusetStr string) error { - return os.WriteFile(filepath.Join(cgroupFileDir, system.CPUSFileName), []byte(cpusetStr), 0644) -} diff --git a/pkg/scheduler/plugins/nodenumaresource/cpuset.go b/pkg/util/cpuset/cpuset.go similarity index 98% rename from pkg/scheduler/plugins/nodenumaresource/cpuset.go rename to pkg/util/cpuset/cpuset.go index 25788c019..3c0d85b6f 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpuset.go +++ b/pkg/util/cpuset/cpuset.go @@ -1,5 +1,6 @@ /* Copyright 2022 The Koordinator Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nodenumaresource +package cpuset import ( "bytes" @@ -87,14 +88,14 @@ func (s CPUSet) Clone() CPUSet { return b.Result() } -// Count returns the number of elements in this CPUSet. -func (s CPUSet) Count() int { +// Size returns the number of elements in this CPUSet. +func (s CPUSet) Size() int { return len(s.elems) } // IsEmpty returns true if there are zero elements in this CPUSet. func (s CPUSet) IsEmpty() bool { - return s.Count() == 0 + return s.Size() == 0 } // Contains returns true if the supplied element is present in this CPUSet. diff --git a/pkg/scheduler/plugins/nodenumaresource/cpuset_test.go b/pkg/util/cpuset/cpuset_test.go similarity index 98% rename from pkg/scheduler/plugins/nodenumaresource/cpuset_test.go rename to pkg/util/cpuset/cpuset_test.go index 153a3104d..e776bb602 100644 --- a/pkg/scheduler/plugins/nodenumaresource/cpuset_test.go +++ b/pkg/util/cpuset/cpuset_test.go @@ -1,5 +1,6 @@ /* Copyright 2022 The Koordinator Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package nodenumaresource +package cpuset import ( "reflect" @@ -33,7 +34,7 @@ func TestCPUSetBuilder(t *testing.T) { t.Fatalf("expected cpuset to contain element %d: [%v]", elem, result) } } - if len(elems) != result.Count() { + if len(elems) != result.Size() { t.Fatalf("expected cpuset %s to have the same size as %v", result, elems) } } @@ -49,7 +50,7 @@ func TestCPUSetSize(t *testing.T) { } for _, c := range testCases { - actual := c.cpuset.Count() + actual := c.cpuset.Size() if actual != c.expected { t.Fatalf("expected: %d, actual: %d, cpuset: [%v]", c.expected, actual, c.cpuset) } diff --git a/pkg/util/cpuset/helper.go b/pkg/util/cpuset/helper.go new file mode 100644 index 000000000..14168380e --- /dev/null +++ b/pkg/util/cpuset/helper.go @@ -0,0 +1,61 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpuset + +func int32sToInts(values []int32) []int { + r := make([]int, 0, len(values)) + for _, v := range values { + r = append(r, int(v)) + } + return r +} + +func intsToInt32s(values []int) []int32 { + r := make([]int32, 0, len(values)) + for _, v := range values { + r = append(r, int32(v)) + } + return r +} + +// MergeCPUSet merges the old cpuset with the new one, and also deduplicate and keeps a desc order by processor ids +// e.g. [1,0], [3,2,2,1] => [3,2,1,0] +func MergeCPUSet(old, new []int32) []int32 { + oldCPUSet := NewCPUSet(int32sToInts(old)...) + result := oldCPUSet.UnionSlice(int32sToInts(new)...) + return intsToInt32s(result.ToSlice()) +} + +// ParseCPUSetStr parses cpuset string into a slice +// eg. "0-5,34,46-48" => [0,1,2,3,4,5,34,46,47,48] +func ParseCPUSetStr(cpusetStr string) ([]int32, error) { + cpuset, err := Parse(cpusetStr) + if err != nil { + return nil, err + } + cpus := make([]int32, 0, cpuset.Size()) + for _, v := range cpuset.ToSlice() { + cpus = append(cpus, int32(v)) + } + return cpus, nil +} + +// GenerateCPUSetStr generates the cpuset string from the cpuset slice +// eg. [3,2,1,0] => "3,2,1,0" +func GenerateCPUSetStr(cpuset []int32) string { + return NewCPUSet(int32sToInts(cpuset)...).String() +} diff --git a/pkg/util/cpuset_test.go b/pkg/util/cpuset/helper_test.go similarity index 79% rename from pkg/util/cpuset_test.go rename to pkg/util/cpuset/helper_test.go index fb57ab9c3..2f3c8879b 100644 --- a/pkg/util/cpuset_test.go +++ b/pkg/util/cpuset/helper_test.go @@ -14,16 +14,12 @@ See the License for the specific language governing permissions and limitations under the License. */ -package util +package cpuset import ( - "os" - "path/filepath" "testing" "github.com/stretchr/testify/assert" - - "github.com/koordinator-sh/koordinator/pkg/util/system" ) func Test_MergeCPUSet(t *testing.T) { @@ -38,6 +34,7 @@ func Test_MergeCPUSet(t *testing.T) { }{ { name: "do not panic on empty input", + want: []int32{}, }, { name: "merge and sort correctly for disjoint input", @@ -45,14 +42,14 @@ func Test_MergeCPUSet(t *testing.T) { old: []int32{0, 1, 2}, new: []int32{5, 8, 7}, }, - want: []int32{8, 7, 5, 2, 1, 0}, + want: []int32{0, 1, 2, 5, 7, 8}, }, { name: "merge and sort correctly for incomplete input", args: args{ new: []int32{1, 0, 2}, }, - want: []int32{2, 1, 0}, + want: []int32{0, 1, 2}, }, { name: "merge and sort correctly for intersecting input", @@ -60,7 +57,7 @@ func Test_MergeCPUSet(t *testing.T) { old: []int32{2, 1, 0}, new: []int32{1, 7, 5}, }, - want: []int32{7, 5, 2, 1, 0}, + want: []int32{0, 1, 2, 5, 7}, }, } for _, tt := range tests { @@ -83,6 +80,7 @@ func Test_ParseCPUSetStr(t *testing.T) { }{ { name: "do not panic on empty input", + want: []int32{}, }, { name: "parse mixed cpuset correctly", @@ -94,7 +92,7 @@ func Test_ParseCPUSetStr(t *testing.T) { name: "parse empty content", args: args{cpusetStr: " \n"}, want: nil, - wantErr: false, + wantErr: true, }, { name: "parse and throw an error for illegal input", @@ -138,7 +136,7 @@ func Test_GenerateCPUSetStr(t *testing.T) { { name: "generate for multi-element input", args: args{cpuset: []int32{5, 3, 1, 0}}, - want: "5,3,1,0", + want: "0-1,3,5", }, } for _, tt := range tests { @@ -148,24 +146,3 @@ func Test_GenerateCPUSetStr(t *testing.T) { }) } } - -func Test_UtilCgroupCPUSet(t *testing.T) { - // prepare testing files - dname := t.TempDir() - - cpuset := []int32{5, 1, 0} - cpusetStr := GenerateCPUSetStr(cpuset) - - err := WriteCgroupCPUSet(dname, cpusetStr) - assert.NoError(t, err) - - rawContent, err := os.ReadFile(filepath.Join(dname, system.CPUSFileName)) - assert.NoError(t, err) - - gotCPUSetStr := string(rawContent) - assert.Equal(t, cpusetStr, gotCPUSetStr) - - gotCPUSet, err := ParseCPUSetStr(gotCPUSetStr) - assert.NoError(t, err) - assert.Equal(t, cpuset, gotCPUSet) -} diff --git a/pkg/util/node.go b/pkg/util/node.go index 6164137a2..2e56bf6da 100644 --- a/pkg/util/node.go +++ b/pkg/util/node.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/koordinator-sh/koordinator/pkg/util/cpuset" "github.com/koordinator-sh/koordinator/pkg/util/system" ) @@ -52,7 +53,7 @@ func GetRootCgroupCurCPUSet(qosClass corev1.PodQOSClass) ([]int32, error) { return nil, err } - return ParseCPUSetStr(rawContent) + return cpuset.ParseCPUSetStr(rawContent) } func GetRootCgroupCurCFSQuota(qosClass corev1.PodQOSClass) (int64, error) {