Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apis/extension/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ const (
GangModeNonStrict = "NonStrict"
)

const (
// Deprecated: kubernetes-sigs/scheduler-plugins/lightweight-coscheduling
LabelLightweightCoschedulingPodGroupName = "pod-group.scheduling.sigs.k8s.io/name"
// Deprecated: kubernetes-sigs/scheduler-plugins/lightweight-coscheduling
LabelLightweightCoschedulingPodGroupMinAvailable = "pod-group.scheduling.sigs.k8s.io/min-available"
)

// CustomUsageThresholds supports user-defined node resource utilization thresholds.
type CustomUsageThresholds struct {
// UsageThresholds indicates the resource utilization threshold of the whole machine.
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/coscheduling/core/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
if gang.HasGangInit {
return false
}
minRequiredNumber, err := extension.GetMinNum(pod)
minRequiredNumber, err := util.GetGangMinNumFromPod(pod)
if err != nil {
klog.Errorf("pod's annotation MinRequiredNumber illegal, gangName: %v, value: %v",
gang.Name, pod.Annotations[extension.AnnotationGangMinNum])
Expand Down
9 changes: 7 additions & 2 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {

// the gang is created in Annotation way
shouldCreatePg := false
if _, exist := pod.Labels[v1alpha1.PodGroupLabel]; !exist {
shouldCreatePg = gang.tryInitByPodConfig(pod, gangCache.pluginArgs)
if pod.Labels[v1alpha1.PodGroupLabel] == "" {
if gang.tryInitByPodConfig(pod, gangCache.pluginArgs) {
shouldCreatePg = util.ShouldCreatePodGroup(pod)
}
}
gang.setChild(pod)
if pod.Spec.NodeName != "" {
Expand Down Expand Up @@ -167,6 +169,9 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) {
shouldDeleteGang := gang.deletePod(pod)
if shouldDeleteGang {
gangCache.deleteGangFromCacheByGangId(gangId)
if !util.ShouldDeletePodGroup(pod) {
return
}
// delete podGroup
err := retry.OnError(
retry.DefaultRetry,
Expand Down
130 changes: 123 additions & 7 deletions pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
)

Expand All @@ -42,7 +43,17 @@ var fakeTimeNowFn = func() time.Time {
return t
}

func getTestDefaultCoschedulingArgs(t *testing.T) *config.CoschedulingArgs {
var v1beta2args v1beta2.CoschedulingArgs
v1beta2.SetDefaults_CoschedulingArgs(&v1beta2args)
var args config.CoschedulingArgs
err := v1beta2.Convert_v1beta2_CoschedulingArgs_To_config_CoschedulingArgs(&v1beta2args, &args, nil)
assert.NoError(t, err)
return &args
}

func TestGangCache_OnPodAdd(t *testing.T) {
defaultArgs := getTestDefaultCoschedulingArgs(t)
tests := []struct {
name string
pods []*corev1.Pod
Expand Down Expand Up @@ -240,6 +251,105 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
},
{
name: "add pod announcing Gang in lightweight-coscheduling way",
pods: []*corev1.Pod{
// pod1 announce GangA
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
},
},
Spec: corev1.PodSpec{
NodeName: "nba",
},
},
// pod2 also announce GangA but with different annotations after pod1's announcing
// so gangA in cache should only be created with pod1's Annotations
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod2",
Labels: map[string]string{
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
},
},
},
},
wantCache: map[string]*Gang{
"default/ganga": {
Name: "default/ganga",
WaitTime: defaultArgs.DefaultTimeout.Duration,
CreateTime: fakeTimeNowFn(),
Mode: extension.GangModeStrict,
MinRequiredNumber: 2,
TotalChildrenNum: 2,
GangGroup: []string{"default/ganga"},
HasGangInit: true,
GangFrom: GangFromPodAnnotation,
Children: map[string]*corev1.Pod{
"default/pod1": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
},
},
Spec: corev1.PodSpec{
NodeName: "nba",
},
},
"default/pod2": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod2",
Labels: map[string]string{
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
},
},
},
},
WaitingForBindChildren: map[string]*corev1.Pod{},
BoundChildren: map[string]*corev1.Pod{
"default/pod1": {
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Labels: map[string]string{
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
},
},
Spec: corev1.PodSpec{
NodeName: "nba",
},
},
},
ScheduleCycleValid: true,
ScheduleCycle: 1,
OnceResourceSatisfied: true,
ChildrenScheduleRoundMap: map[string]int{},
},
},
},
{
name: "add pods announcing Gang in Annotation way,but with illegal args",
pods: []*corev1.Pod{
Expand Down Expand Up @@ -275,7 +385,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
wantCache: map[string]*Gang{
"default/gangb": {
Name: "default/gangb",
WaitTime: 0,
WaitTime: defaultArgs.DefaultTimeout.Duration,
CreateTime: fakeTimeNowFn(),
Mode: extension.GangModeStrict,
MinRequiredNumber: 2,
Expand Down Expand Up @@ -328,7 +438,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
Spec: v1alpha1.PodGroupSpec{
ScheduleTimeoutSeconds: pointer.Int32(0),
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
MinMember: int32(2),
},
},
Expand Down Expand Up @@ -367,7 +477,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
wantCache: map[string]*Gang{
"default/gangc": {
Name: "default/gangc",
WaitTime: 0,
WaitTime: defaultArgs.DefaultTimeout.Duration,
CreateTime: fakeTimeNowFn(),
Mode: extension.GangModeStrict,
GangGroupId: "default/gangc",
Expand Down Expand Up @@ -398,7 +508,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
"default/gangd": {
Name: "default/gangd",
WaitTime: 0,
WaitTime: defaultArgs.DefaultTimeout.Duration,
CreateTime: fakeTimeNowFn(),
Mode: extension.GangModeStrict,
GangGroupId: "default/gangd",
Expand Down Expand Up @@ -439,7 +549,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
Spec: v1alpha1.PodGroupSpec{
ScheduleTimeoutSeconds: pointer.Int32(0),
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
MinMember: int32(0),
},
},
Expand All @@ -453,7 +563,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
},
},
Spec: v1alpha1.PodGroupSpec{
ScheduleTimeoutSeconds: pointer.Int32(0),
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
MinMember: int32(0),
},
},
Expand All @@ -472,7 +582,8 @@ func TestGangCache_OnPodAdd(t *testing.T) {
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0)
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
pglister := pgInformer.Lister()
gangCache := NewGangCache(&config.CoschedulingArgs{}, nil, pglister, pgClientSet)

gangCache := NewGangCache(defaultArgs, nil, pglister, pgClientSet)
for _, pg := range tt.podGroups {
err := retry.OnError(
retry.DefaultRetry,
Expand All @@ -496,6 +607,11 @@ func TestGangCache_OnPodAdd(t *testing.T) {
tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup)
}
assert.Equal(t, tt.wantCache, gangCache.gangItems)
if len(tt.wantedPodGroupMap) == 0 {
podGroupList, err := pgClientSet.SchedulingV1alpha1().PodGroups("default").List(context.TODO(), metav1.ListOptions{})
assert.NoError(t, err)
assert.Empty(t, podGroupList.Items)
}
for pgKey, targetPg := range tt.wantedPodGroupMap {
var pg *v1alpha1.PodGroup
err := retry.OnError(
Expand Down
36 changes: 31 additions & 5 deletions pkg/scheduler/plugins/coscheduling/util/gang_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package util

import (
"encoding/json"
"errors"
"sort"
"strconv"
"strings"
"time"

Expand All @@ -26,8 +29,6 @@ import (
"k8s.io/klog/v2"
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"

"encoding/json"

"github.com/koordinator-sh/koordinator/apis/extension"
)

Expand All @@ -45,13 +46,38 @@ func GetGangNameByPod(pod *v1.Pod) string {
return ""
}
var gangName string
gangName = pod.Labels[v1alpha1.PodGroupLabel]
if gangName == "" {
gangName = extension.GetGangName(pod)
if gangName = pod.Labels[v1alpha1.PodGroupLabel]; gangName == "" {
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
if gangName = pod.Labels[extension.LabelLightweightCoschedulingPodGroupName]; gangName == "" {
gangName = extension.GetGangName(pod)
}
}
return gangName
}

func GetGangMinNumFromPod(pod *v1.Pod) (minNum int, err error) {
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
if s := pod.Labels[extension.LabelLightweightCoschedulingPodGroupMinAvailable]; s != "" {
val, err := strconv.ParseInt(pod.Labels[extension.LabelLightweightCoschedulingPodGroupMinAvailable], 10, 32)
return int(val), err
}
if _, ok := pod.Annotations[extension.AnnotationGangMinNum]; ok {
return extension.GetMinNum(pod)
}
return 0, errors.New("missing min available")
}

func ShouldCreatePodGroup(pod *v1.Pod) bool {
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
return pod.Labels[v1alpha1.PodGroupLabel] == "" &&
pod.Labels[extension.LabelLightweightCoschedulingPodGroupName] == "" &&
pod.Annotations[extension.AnnotationGangName] != ""
}

func ShouldDeletePodGroup(pod *v1.Pod) bool {
return ShouldCreatePodGroup(pod)
}

func IsPodNeedGang(pod *v1.Pod) bool {
return GetGangNameByPod(pod) != ""
}
Expand Down