Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .licenseignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pkg/descheduler
pkg/descheduler/controllers/migration/controllerfinder
pkg/scheduler/plugins/coscheduling
pkg/util/kubelet
pkg/util/cpuset/cpuset.go
pkg/util/cpuset/cpuset_test.go
test/e2e/common
test/e2e/framework
test/e2e/generated
Expand Down
26 changes: 13 additions & 13 deletions pkg/koordlet/resmanager/cpu_suppress.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
Expand All @@ -40,6 +39,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

Expand Down Expand Up @@ -324,11 +324,11 @@ func calculateBESuppressCPUSetPolicy(cpus int32, processorInfos []util.Processor
}

// applyCPUSetWithNonePolicy applies the be suppress policy by writing best-effort cgroups
func applyCPUSetWithNonePolicy(cpuset []int32, oldCPUSet []int32) error {
func applyCPUSetWithNonePolicy(cpus []int32, oldCPUSet []int32) error {
// 1. get current be cgroups cpuset
// 2. temporarily write with a union of old cpuset and new cpuset from upper to lower, to avoid cgroup conflicts
// 3. write with the new cpuset from lower to upper to apply the real policy
if len(cpuset) <= 0 {
if len(cpus) <= 0 {
klog.Warningf("applyCPUSetWithNonePolicy skipped due to the empty cpuset")
return nil
}
Expand All @@ -340,23 +340,23 @@ func applyCPUSetWithNonePolicy(cpuset []int32, oldCPUSet []int32) error {
}

// write a loose cpuset for all be cgroups before applying the real policy
mergedCPUSet := util.MergeCPUSet(oldCPUSet, cpuset)
mergedCPUSetStr := util.GenerateCPUSetStr(mergedCPUSet)
mergedCPUSet := cpuset.MergeCPUSet(oldCPUSet, cpus)
mergedCPUSetStr := cpuset.GenerateCPUSetStr(mergedCPUSet)
klog.V(6).Infof("applyCPUSetWithNonePolicy temporarily writes cpuset from upper cgroup to lower, cpuset %v",
mergedCPUSet)
writeBECgroupsCPUSet(cpusetCgroupPaths, mergedCPUSetStr, false)

// apply the suppress policy from lower to upper
cpusetStr := util.GenerateCPUSetStr(cpuset)
cpusetStr := cpuset.GenerateCPUSetStr(cpus)
klog.V(6).Infof("applyCPUSetWithNonePolicy writes suppressed cpuset from lower cgroup to upper, cpuset %v",
cpuset)
cpus)
writeBECgroupsCPUSet(cpusetCgroupPaths, cpusetStr, true)
metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpuset)))
metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpus)))
return nil
}

func applyCPUSetWithStaticPolicy(cpuset []int32) error {
if len(cpuset) <= 0 {
func applyCPUSetWithStaticPolicy(cpus []int32) error {
if len(cpus) <= 0 {
klog.Warningf("applyCPUSetWithStaticPolicy skipped due to the empty cpuset")
return nil
}
Expand All @@ -367,10 +367,10 @@ func applyCPUSetWithStaticPolicy(cpuset []int32) error {
return fmt.Errorf("apply be suppress policy failed, err: %s", err)
}

cpusetStr := util.GenerateCPUSetStr(cpuset)
klog.V(6).Infof("applyCPUSetWithStaticPolicy writes suppressed cpuset to containers, cpuset %v", cpuset)
cpusetStr := cpuset.GenerateCPUSetStr(cpus)
klog.V(6).Infof("applyCPUSetWithStaticPolicy writes suppressed cpuset to containers, cpuset %v", cpus)
writeBECgroupsCPUSet(containerPaths, cpusetStr, false)
metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpuset)))
metrics.RecordBESuppressCores(string(slov1alpha1.CPUSetPolicy), float64(len(cpus)))
return nil

}
Expand Down
10 changes: 5 additions & 5 deletions pkg/koordlet/resmanager/cpu_suppress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func Test_cpuSuppress_suppressBECPU(t *testing.T) {
},
wantBECFSQuota: -1,
wantCFSQuotaPolicyStatus: &policyRecovered,
wantBECPUSet: "0,1",
wantBECPUSet: "0-1",
wantCPUSetPolicyStatus: &policyUsing,
},
{
Expand Down Expand Up @@ -1171,7 +1171,7 @@ func Test_applyCPUSetWithNonePolicy(t *testing.T) {
testingPrepareBECgroupData(helper, podDirs, "1,2")

cpuset := []int32{3, 2, 1}
wantCPUSetStr := "3,2,1"
wantCPUSetStr := "1-3"

oldCPUSet, err := util.GetRootCgroupCurCPUSet(corev1.PodQOSBestEffort)
assert.NoError(t, err)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ func Test_adjustByCPUSet(t *testing.T) {
},
oldCPUSets: "7,6,3,2",
},
wantCPUSet: "2,3,4",
wantCPUSet: "2-4",
},
{
name: "test scale up by cpuset.",
Expand All @@ -1251,7 +1251,7 @@ func Test_adjustByCPUSet(t *testing.T) {
},
oldCPUSets: "7,6",
},
wantCPUSet: "2,3,4",
wantCPUSet: "2-4",
},
}
ctrl := gomock.NewController(t)
Expand Down Expand Up @@ -1544,7 +1544,7 @@ func TestCPUSuppress_applyBESuppressCPUSet(t *testing.T) {
wants: wants{
beDirCPUSet: "0-15",
podDirCPUSet: "0-15",
containerDirCPUSet: "0,1,2,3",
containerDirCPUSet: "0-3",
},
},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/koordlet/statesinformer/states_noderesourcetopology.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
kubeletutil "github.com/koordinator-sh/koordinator/pkg/util/kubelet"
)

Expand Down
17 changes: 9 additions & 8 deletions pkg/scheduler/plugins/nodenumaresource/cpu_accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,25 @@ import (
"k8s.io/apimachinery/pkg/util/sets"

schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/util/cpuset"
)

func takeCPUs(
topology *CPUTopology,
maxRefCount int,
availableCPUs CPUSet,
availableCPUs cpuset.CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
cpuBindPolicy schedulingconfig.CPUBindPolicy,
cpuExclusivePolicy schedulingconfig.CPUExclusivePolicy,
numaAllocatedStrategy schedulingconfig.NUMAAllocateStrategy,
) (CPUSet, error) {
) (cpuset.CPUSet, error) {
acc := newCPUAccumulator(topology, maxRefCount, availableCPUs, allocatedCPUs, numCPUsNeeded, cpuExclusivePolicy, numaAllocatedStrategy)
if acc.isSatisfied() {
return acc.result, nil
}
if acc.isFailed() {
return NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
return cpuset.NewCPUSet(), fmt.Errorf("not enough cpus available to satisfy request")
}

fullPCPUs := cpuBindPolicy == schedulingconfig.CPUBindPolicyFullPCPUs
Expand Down Expand Up @@ -169,7 +170,7 @@ func takeCPUs(
}
}

return NewCPUSet(), fmt.Errorf("failed to allocate cpus")
return cpuset.NewCPUSet(), fmt.Errorf("failed to allocate cpus")
}

type cpuAccumulator struct {
Expand All @@ -182,13 +183,13 @@ type cpuAccumulator struct {
exclusiveInNUMANodes sets.Int
exclusivePolicy schedulingconfig.CPUExclusivePolicy
numaAllocateStrategy schedulingconfig.NUMAAllocateStrategy
result CPUSet
result cpuset.CPUSet
}

func newCPUAccumulator(
topology *CPUTopology,
maxRefCount int,
availableCPUs CPUSet,
availableCPUs cpuset.CPUSet,
allocatedCPUs CPUDetails,
numCPUsNeeded int,
exclusivePolicy schedulingconfig.CPUExclusivePolicy,
Expand Down Expand Up @@ -224,7 +225,7 @@ func newCPUAccumulator(
exclusivePolicy: exclusivePolicy,
numCPUsNeeded: numCPUsNeeded,
numaAllocateStrategy: numaAllocateStrategy,
result: NewCPUSet(),
result: cpuset.NewCPUSet(),
}
}

Expand Down Expand Up @@ -630,7 +631,7 @@ func (a *cpuAccumulator) freeCPUs(filterExclusive bool) []int {

socketColoScores := make(map[int]int)
for socketID := range socketFreeScores {
socketColoScore := a.topology.CPUDetails.CPUsInSockets(socketID).Intersection(a.result).Count()
socketColoScore := a.topology.CPUDetails.CPUsInSockets(socketID).Intersection(a.result).Size()
socketColoScores[socketID] = socketColoScore
}

Expand Down
Loading