Skip to content

Commit 16be105

Browse files
authored
koord-scheduler: compatible with lightweight-coscheduling (#1007)
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
1 parent 718bef7 commit 16be105

File tree

5 files changed

+169
-15
lines changed

5 files changed

+169
-15
lines changed

apis/extension/scheduling.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ const (
7878
GangModeNonStrict = "NonStrict"
7979
)
8080

81+
const (
82+
// Deprecated: kubernetes-sigs/scheduler-plugins/lightweight-coscheduling
83+
LabelLightweightCoschedulingPodGroupName = "pod-group.scheduling.sigs.k8s.io/name"
84+
// Deprecated: kubernetes-sigs/scheduler-plugins/lightweight-coscheduling
85+
LabelLightweightCoschedulingPodGroupMinAvailable = "pod-group.scheduling.sigs.k8s.io/min-available"
86+
)
87+
8188
// CustomUsageThresholds supports user-defined node resource utilization thresholds.
8289
type CustomUsageThresholds struct {
8390
// UsageThresholds indicates the resource utilization threshold of the whole machine.

pkg/scheduler/plugins/coscheduling/core/gang.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
104104
if gang.HasGangInit {
105105
return false
106106
}
107-
minRequiredNumber, err := extension.GetMinNum(pod)
107+
minRequiredNumber, err := util.GetGangMinNumFromPod(pod)
108108
if err != nil {
109109
klog.Errorf("pod's annotation MinRequiredNumber illegal, gangName: %v, value: %v",
110110
gang.Name, pod.Annotations[extension.AnnotationGangMinNum])

pkg/scheduler/plugins/coscheduling/core/gang_cache.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) {
114114

115115
// the gang is created in Annotation way
116116
shouldCreatePg := false
117-
if _, exist := pod.Labels[v1alpha1.PodGroupLabel]; !exist {
118-
shouldCreatePg = gang.tryInitByPodConfig(pod, gangCache.pluginArgs)
117+
if pod.Labels[v1alpha1.PodGroupLabel] == "" {
118+
if gang.tryInitByPodConfig(pod, gangCache.pluginArgs) {
119+
shouldCreatePg = util.ShouldCreatePodGroup(pod)
120+
}
119121
}
120122
gang.setChild(pod)
121123
if pod.Spec.NodeName != "" {
@@ -167,6 +169,9 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) {
167169
shouldDeleteGang := gang.deletePod(pod)
168170
if shouldDeleteGang {
169171
gangCache.deleteGangFromCacheByGangId(gangId)
172+
if !util.ShouldDeletePodGroup(pod) {
173+
return
174+
}
170175
// delete podGroup
171176
err := retry.OnError(
172177
retry.DefaultRetry,

pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go

Lines changed: 123 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
"github.com/koordinator-sh/koordinator/apis/extension"
3535
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config"
36+
"github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2"
3637
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util"
3738
)
3839

@@ -42,7 +43,17 @@ var fakeTimeNowFn = func() time.Time {
4243
return t
4344
}
4445

46+
func getTestDefaultCoschedulingArgs(t *testing.T) *config.CoschedulingArgs {
47+
var v1beta2args v1beta2.CoschedulingArgs
48+
v1beta2.SetDefaults_CoschedulingArgs(&v1beta2args)
49+
var args config.CoschedulingArgs
50+
err := v1beta2.Convert_v1beta2_CoschedulingArgs_To_config_CoschedulingArgs(&v1beta2args, &args, nil)
51+
assert.NoError(t, err)
52+
return &args
53+
}
54+
4555
func TestGangCache_OnPodAdd(t *testing.T) {
56+
defaultArgs := getTestDefaultCoschedulingArgs(t)
4657
tests := []struct {
4758
name string
4859
pods []*corev1.Pod
@@ -240,6 +251,105 @@ func TestGangCache_OnPodAdd(t *testing.T) {
240251
},
241252
},
242253
},
254+
{
255+
name: "add pod announcing Gang in lightweight-coscheduling way",
256+
pods: []*corev1.Pod{
257+
// pod1 announce GangA
258+
{
259+
ObjectMeta: metav1.ObjectMeta{
260+
Namespace: "default",
261+
Name: "pod1",
262+
Labels: map[string]string{
263+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
264+
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
265+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
266+
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
267+
},
268+
},
269+
Spec: corev1.PodSpec{
270+
NodeName: "nba",
271+
},
272+
},
273+
// pod2 also announce GangA but with different annotations after pod1's announcing
274+
// so gangA in cache should only be created with pod1's Annotations
275+
{
276+
ObjectMeta: metav1.ObjectMeta{
277+
Namespace: "default",
278+
Name: "pod2",
279+
Labels: map[string]string{
280+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
281+
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
282+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
283+
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
284+
},
285+
},
286+
},
287+
},
288+
wantCache: map[string]*Gang{
289+
"default/ganga": {
290+
Name: "default/ganga",
291+
WaitTime: defaultArgs.DefaultTimeout.Duration,
292+
CreateTime: fakeTimeNowFn(),
293+
Mode: extension.GangModeStrict,
294+
MinRequiredNumber: 2,
295+
TotalChildrenNum: 2,
296+
GangGroup: []string{"default/ganga"},
297+
HasGangInit: true,
298+
GangFrom: GangFromPodAnnotation,
299+
Children: map[string]*corev1.Pod{
300+
"default/pod1": {
301+
ObjectMeta: metav1.ObjectMeta{
302+
Namespace: "default",
303+
Name: "pod1",
304+
Labels: map[string]string{
305+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
306+
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
307+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
308+
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
309+
},
310+
},
311+
Spec: corev1.PodSpec{
312+
NodeName: "nba",
313+
},
314+
},
315+
"default/pod2": {
316+
ObjectMeta: metav1.ObjectMeta{
317+
Namespace: "default",
318+
Name: "pod2",
319+
Labels: map[string]string{
320+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
321+
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
322+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
323+
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
324+
},
325+
},
326+
},
327+
},
328+
WaitingForBindChildren: map[string]*corev1.Pod{},
329+
BoundChildren: map[string]*corev1.Pod{
330+
"default/pod1": {
331+
ObjectMeta: metav1.ObjectMeta{
332+
Namespace: "default",
333+
Name: "pod1",
334+
Labels: map[string]string{
335+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
336+
extension.LabelLightweightCoschedulingPodGroupName: "ganga",
337+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable
338+
extension.LabelLightweightCoschedulingPodGroupMinAvailable: "2",
339+
},
340+
},
341+
Spec: corev1.PodSpec{
342+
NodeName: "nba",
343+
},
344+
},
345+
},
346+
ScheduleCycleValid: true,
347+
ScheduleCycle: 1,
348+
OnceResourceSatisfied: true,
349+
ChildrenScheduleRoundMap: map[string]int{},
350+
},
351+
},
352+
},
243353
{
244354
name: "add pods announcing Gang in Annotation way,but with illegal args",
245355
pods: []*corev1.Pod{
@@ -275,7 +385,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
275385
wantCache: map[string]*Gang{
276386
"default/gangb": {
277387
Name: "default/gangb",
278-
WaitTime: 0,
388+
WaitTime: defaultArgs.DefaultTimeout.Duration,
279389
CreateTime: fakeTimeNowFn(),
280390
Mode: extension.GangModeStrict,
281391
MinRequiredNumber: 2,
@@ -328,7 +438,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
328438
},
329439
},
330440
Spec: v1alpha1.PodGroupSpec{
331-
ScheduleTimeoutSeconds: pointer.Int32(0),
441+
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
332442
MinMember: int32(2),
333443
},
334444
},
@@ -367,7 +477,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
367477
wantCache: map[string]*Gang{
368478
"default/gangc": {
369479
Name: "default/gangc",
370-
WaitTime: 0,
480+
WaitTime: defaultArgs.DefaultTimeout.Duration,
371481
CreateTime: fakeTimeNowFn(),
372482
Mode: extension.GangModeStrict,
373483
GangGroupId: "default/gangc",
@@ -398,7 +508,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
398508
},
399509
"default/gangd": {
400510
Name: "default/gangd",
401-
WaitTime: 0,
511+
WaitTime: defaultArgs.DefaultTimeout.Duration,
402512
CreateTime: fakeTimeNowFn(),
403513
Mode: extension.GangModeStrict,
404514
GangGroupId: "default/gangd",
@@ -439,7 +549,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
439549
},
440550
},
441551
Spec: v1alpha1.PodGroupSpec{
442-
ScheduleTimeoutSeconds: pointer.Int32(0),
552+
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
443553
MinMember: int32(0),
444554
},
445555
},
@@ -453,7 +563,7 @@ func TestGangCache_OnPodAdd(t *testing.T) {
453563
},
454564
},
455565
Spec: v1alpha1.PodGroupSpec{
456-
ScheduleTimeoutSeconds: pointer.Int32(0),
566+
ScheduleTimeoutSeconds: pointer.Int32(int32(defaultArgs.DefaultTimeout.Duration.Seconds())),
457567
MinMember: int32(0),
458568
},
459569
},
@@ -472,7 +582,8 @@ func TestGangCache_OnPodAdd(t *testing.T) {
472582
pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0)
473583
pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups()
474584
pglister := pgInformer.Lister()
475-
gangCache := NewGangCache(&config.CoschedulingArgs{}, nil, pglister, pgClientSet)
585+
586+
gangCache := NewGangCache(defaultArgs, nil, pglister, pgClientSet)
476587
for _, pg := range tt.podGroups {
477588
err := retry.OnError(
478589
retry.DefaultRetry,
@@ -496,6 +607,11 @@ func TestGangCache_OnPodAdd(t *testing.T) {
496607
tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup)
497608
}
498609
assert.Equal(t, tt.wantCache, gangCache.gangItems)
610+
if len(tt.wantedPodGroupMap) == 0 {
611+
podGroupList, err := pgClientSet.SchedulingV1alpha1().PodGroups("default").List(context.TODO(), metav1.ListOptions{})
612+
assert.NoError(t, err)
613+
assert.Empty(t, podGroupList.Items)
614+
}
499615
for pgKey, targetPg := range tt.wantedPodGroupMap {
500616
var pg *v1alpha1.PodGroup
501617
err := retry.OnError(

pkg/scheduler/plugins/coscheduling/util/gang_helper.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package util
1818

1919
import (
20+
"encoding/json"
21+
"errors"
2022
"sort"
23+
"strconv"
2124
"strings"
2225
"time"
2326

@@ -26,8 +29,6 @@ import (
2629
"k8s.io/klog/v2"
2730
"sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1"
2831

29-
"encoding/json"
30-
3132
"github.com/koordinator-sh/koordinator/apis/extension"
3233
)
3334

@@ -45,13 +46,38 @@ func GetGangNameByPod(pod *v1.Pod) string {
4546
return ""
4647
}
4748
var gangName string
48-
gangName = pod.Labels[v1alpha1.PodGroupLabel]
49-
if gangName == "" {
50-
gangName = extension.GetGangName(pod)
49+
if gangName = pod.Labels[v1alpha1.PodGroupLabel]; gangName == "" {
50+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
51+
if gangName = pod.Labels[extension.LabelLightweightCoschedulingPodGroupName]; gangName == "" {
52+
gangName = extension.GetGangName(pod)
53+
}
5154
}
5255
return gangName
5356
}
5457

58+
func GetGangMinNumFromPod(pod *v1.Pod) (minNum int, err error) {
59+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupMinAvailable is deprecated
60+
if s := pod.Labels[extension.LabelLightweightCoschedulingPodGroupMinAvailable]; s != "" {
61+
val, err := strconv.ParseInt(pod.Labels[extension.LabelLightweightCoschedulingPodGroupMinAvailable], 10, 32)
62+
return int(val), err
63+
}
64+
if _, ok := pod.Annotations[extension.AnnotationGangMinNum]; ok {
65+
return extension.GetMinNum(pod)
66+
}
67+
return 0, errors.New("missing min available")
68+
}
69+
70+
func ShouldCreatePodGroup(pod *v1.Pod) bool {
71+
// nolint:staticcheck // SA1019: extension.LabelLightweightCoschedulingPodGroupName is deprecated
72+
return pod.Labels[v1alpha1.PodGroupLabel] == "" &&
73+
pod.Labels[extension.LabelLightweightCoschedulingPodGroupName] == "" &&
74+
pod.Annotations[extension.AnnotationGangName] != ""
75+
}
76+
77+
func ShouldDeletePodGroup(pod *v1.Pod) bool {
78+
return ShouldCreatePodGroup(pod)
79+
}
80+
5581
func IsPodNeedGang(pod *v1.Pod) bool {
5682
return GetGangNameByPod(pod) != ""
5783
}

0 commit comments

Comments
 (0)