Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/scheduler/plugins/loadaware/load_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -102,6 +103,12 @@ func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *c

nodeMetric, err := p.nodeMetricLister.Get(node.Name)
if err != nil {
// For nodes that lack load information, fall back to the situation where there is no load-aware scheduling.
// Some nodes in the cluster do not install the koordlet, but users newly created Pod use koord-scheduler to schedule,
// and the load-aware scheduling itself is an optimization, so we should skip these nodes.
if errors.IsNotFound(err) {
return nil
}
return framework.NewStatus(framework.Error, err.Error())
}

Expand Down Expand Up @@ -168,7 +175,12 @@ func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *co
}
nodeMetric, err := p.nodeMetricLister.Get(nodeName)
if err != nil {
return 0, framework.NewStatus(framework.Error, "nodeMetric not found")
// caused by load-aware scheduling itself is an optimization,
// so we should skip the node and score the node 0
if errors.IsNotFound(err) {
return 0, nil
}
return 0, framework.NewStatus(framework.Error, err.Error())
}
if p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) {
return 0, nil
Expand Down
88 changes: 69 additions & 19 deletions pkg/scheduler/plugins/loadaware/load_aware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,13 @@ func TestFilterUsage(t *testing.T) {
name string
usageThresholds map[corev1.ResourceName]int64
customUsageThresholds map[corev1.ResourceName]int64
nodeName string
nodeMetric *slov1alpha1.NodeMetric
wantStatus *framework.Status
}{
{
name: "filter normal usage",
name: "filter normal usage",
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand All @@ -322,7 +324,13 @@ func TestFilterUsage(t *testing.T) {
wantStatus: nil,
},
{
name: "filter exceed cpu usage",
name: "filter node missing NodeMetrics",
nodeName: "test-node-1",
wantStatus: nil,
},
{
name: "filter exceed cpu usage",
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand All @@ -349,7 +357,8 @@ func TestFilterUsage(t *testing.T) {
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, corev1.ResourceCPU)),
},
{
name: "filter exceed memory usage",
name: "filter exceed memory usage",
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -380,6 +389,7 @@ func TestFilterUsage(t *testing.T) {
customUsageThresholds: map[corev1.ResourceName]int64{
corev1.ResourceMemory: 60,
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -410,6 +420,7 @@ func TestFilterUsage(t *testing.T) {
usageThresholds: map[corev1.ResourceName]int64{
corev1.ResourceMemory: 0,
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -480,7 +491,7 @@ func TestFilterUsage(t *testing.T) {
nodes := []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: tt.nodeMetric.Name,
Name: tt.nodeName,
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
Expand Down Expand Up @@ -515,15 +526,17 @@ func TestFilterUsage(t *testing.T) {
assert.NotNil(t, p)
assert.Nil(t, err)

_, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{})
assert.NoError(t, err)
if tt.nodeMetric != nil {
_, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{})
assert.NoError(t, err)
}

koordSharedInformerFactory.Start(context.TODO().Done())
koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done())

cycleState := framework.NewCycleState()

nodeInfo, err := snapshot.Get(tt.nodeMetric.Name)
nodeInfo, err := snapshot.Get(tt.nodeName)
assert.NoError(t, err)
assert.NotNil(t, nodeInfo)

Expand All @@ -538,12 +551,14 @@ func TestScore(t *testing.T) {
name string
pod *corev1.Pod
assignedPod []*podAssignInfo
nodeName string
nodeMetric *slov1alpha1.NodeMetric
wantScore int64
wantStatus *framework.Status
}{
{
name: "score node with expired nodeMetric",
name: "score node with expired nodeMetric",
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -583,6 +598,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand All @@ -602,7 +618,32 @@ func TestScore(t *testing.T) {
wantStatus: nil,
},
{
name: "score cert load node",
name: "score node missing NodeMetrics",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test-container",
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
},
},
},
},
},
},
nodeName: "test-node-1",
wantScore: 0,
wantStatus: nil,
},
{
name: "score load node",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand All @@ -622,6 +663,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -649,7 +691,7 @@ func TestScore(t *testing.T) {
wantStatus: nil,
},
{
name: "score cert load node with just assigned pod",
name: "score load node with just assigned pod",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand Down Expand Up @@ -693,6 +735,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -720,7 +763,7 @@ func TestScore(t *testing.T) {
wantStatus: nil,
},
{
name: "score cert load node with just assigned pod where after updateTime",
name: "score load node with just assigned pod where after updateTime",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand Down Expand Up @@ -764,6 +807,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -791,7 +835,7 @@ func TestScore(t *testing.T) {
wantStatus: nil,
},
{
name: "score cert load node with just assigned pod where before updateTime",
name: "score load node with just assigned pod where before updateTime",
pod: &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand Down Expand Up @@ -835,6 +879,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -883,6 +928,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -922,6 +968,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -951,6 +998,7 @@ func TestScore(t *testing.T) {
},
},
},
nodeName: "test-node-1",
nodeMetric: &slov1alpha1.NodeMetric{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -1010,7 +1058,7 @@ func TestScore(t *testing.T) {
nodes := []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: tt.nodeMetric.Name,
Name: tt.nodeName,
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
Expand All @@ -1033,30 +1081,32 @@ func TestScore(t *testing.T) {
assert.NotNil(t, p)
assert.Nil(t, err)

_, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{})
assert.NoError(t, err)
if tt.nodeMetric != nil {
_, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{})
assert.NoError(t, err)
}

koordSharedInformerFactory.Start(context.TODO().Done())
koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done())

cycleState := framework.NewCycleState()

nodeInfo, err := snapshot.Get(tt.nodeMetric.Name)
nodeInfo, err := snapshot.Get(tt.nodeName)
assert.NoError(t, err)
assert.NotNil(t, nodeInfo)

assignCache := p.(*Plugin).podAssignCache
for _, v := range tt.assignedPod {
m := assignCache.podInfoItems[tt.nodeMetric.Name]
m := assignCache.podInfoItems[tt.nodeName]
if m == nil {
m = map[types.UID]*podAssignInfo{}
assignCache.podInfoItems[tt.nodeMetric.Name] = m
assignCache.podInfoItems[tt.nodeName] = m
}
v.pod.UID = uuid.NewUUID()
m[v.pod.UID] = v
}

score, status := p.(*Plugin).Score(context.TODO(), cycleState, tt.pod, tt.nodeMetric.Name)
score, status := p.(*Plugin).Score(context.TODO(), cycleState, tt.pod, tt.nodeName)
assert.Equal(t, tt.wantScore, score)
assert.True(t, tt.wantStatus.Equal(status), "want status: %s, but got %s", tt.wantStatus.Message(), status.Message())
})
Expand Down