Skip to content

Commit a47133a

Browse files
authored
scheduler: coscheduling tweak and fix (#2660)
Signed-off-by: Zach Zhu <zzqshu@126.com>
1 parent af54776 commit a47133a

File tree

7 files changed

+600
-59
lines changed

7 files changed

+600
-59
lines changed

apis/extension/coscheduling.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/json"
2121
"sort"
2222
"strconv"
23+
"time"
2324

2425
corev1 "k8s.io/api/core/v1"
2526
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -84,24 +85,44 @@ const (
8485
LabelLightweightCoschedulingPodGroupMinAvailable = "pod-group.scheduling.sigs.k8s.io/min-available"
8586
)
8687

87-
func GetMinNum(pod *corev1.Pod) (int, error) {
88+
func GetGangMinNum(pod *corev1.Pod) (int, error) {
8889
minRequiredNum, err := strconv.ParseInt(pod.Annotations[AnnotationGangMinNum], 10, 32)
8990
if err != nil {
9091
return 0, err
9192
}
9293
return int(minRequiredNum), nil
9394
}
9495

96+
func GetGangTotalNum(obj metav1.Object) (int, error) {
97+
totalNumStr := obj.GetAnnotations()[AnnotationGangTotalNum]
98+
if totalNumStr == "" {
99+
return 0, nil
100+
}
101+
totalNum, err := strconv.ParseInt(totalNumStr, 10, 32)
102+
if err != nil {
103+
return 0, err
104+
}
105+
return int(totalNum), nil
106+
}
107+
95108
func GetGangName(pod *corev1.Pod) string {
96109
return pod.Annotations[AnnotationGangName]
97110
}
98111

99-
func GetGangMatchPolicy(pod *corev1.Pod) string {
100-
policy := pod.Annotations[AnnotationGangMatchPolicy]
112+
func GetGangMatchPolicy(obj metav1.Object) string {
113+
policy := obj.GetAnnotations()[AnnotationGangMatchPolicy]
101114
if policy != "" {
102115
return policy
103116
}
104-
return pod.Annotations[AnnotationAliasGangMatchPolicy]
117+
return obj.GetAnnotations()[AnnotationAliasGangMatchPolicy]
118+
}
119+
120+
func GetGangWaitTime(pod *corev1.Pod) (time.Duration, error) {
121+
waitTimeStr := pod.Annotations[AnnotationGangWaitTime]
122+
if waitTimeStr == "" {
123+
return 0, nil
124+
}
125+
return time.ParseDuration(waitTimeStr)
105126
}
106127

107128
type NetworkTopologySpec struct {

pkg/scheduler/frameworkext/networktopology/tree.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ type TreeNodeMeta struct {
107107
func (t *tree) AddNode(node *corev1.Node) {
108108
treeNodesMeta, err := t.getTreeNodesMeta(node)
109109
if err != nil {
110-
klog.Error(err)
110+
klog.V(5).ErrorS(err, "Failed to get tree nodes meta")
111111
return
112112
}
113113

pkg/scheduler/plugins/coscheduling/core/gang.go

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package core
1818

1919
import (
2020
"fmt"
21-
"strconv"
2221
"sync"
2322
"time"
2423

@@ -113,31 +112,38 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
113112
}
114113
gang.MinRequiredNumber = minRequiredNumber
115114

116-
totalChildrenNum, err := strconv.ParseInt(pod.Annotations[extension.AnnotationGangTotalNum], 10, 32)
115+
totalChildrenNum, err := extension.GetGangTotalNum(pod)
117116
if err != nil {
118-
klog.V(4).ErrorS(err, "pod's annotation totalNumber illegal",
117+
klog.ErrorS(err, "pod's annotation totalNumber illegal",
119118
"gangName", gang.Name, "value", pod.Annotations[extension.AnnotationGangTotalNum])
120-
totalChildrenNum = int64(minRequiredNumber)
121-
} else if totalChildrenNum != 0 && totalChildrenNum < int64(minRequiredNumber) {
122-
123-
klog.V(4).Infof("pod's annotation totalNumber cannot less than minRequiredNumber, gangName: %v, totalNumber: %v,minRequiredNumber: %v",
119+
totalChildrenNum = minRequiredNumber
120+
} else if totalChildrenNum == 0 {
121+
totalChildrenNum = minRequiredNumber
122+
} else if totalChildrenNum < minRequiredNumber {
123+
klog.Errorf("pod's annotation totalNumber cannot less than minRequiredNumber, gangName: %v, totalNumber: %v,minRequiredNumber: %v",
124124
gang.Name, pod.Annotations[extension.AnnotationGangTotalNum], minRequiredNumber)
125-
totalChildrenNum = int64(minRequiredNumber)
125+
totalChildrenNum = minRequiredNumber
126126
}
127-
gang.TotalChildrenNum = int(totalChildrenNum)
127+
gang.TotalChildrenNum = totalChildrenNum
128128

129129
mode := pod.Annotations[extension.AnnotationGangMode]
130+
if mode == "" {
131+
mode = extension.GangModeStrict
132+
}
130133
if mode != extension.GangModeStrict && mode != extension.GangModeNonStrict {
131-
klog.V(4).Infof("pod's annotation GangModeAnnotation illegal, gangName: %v, value: %v",
134+
klog.Errorf("pod's annotation GangModeAnnotation illegal, gangName: %v, value: %v",
132135
gang.Name, pod.Annotations[extension.AnnotationGangMode])
133136
mode = extension.GangModeStrict
134137
}
135138
gang.Mode = mode
136139

137-
matchPolicy := util.GetGangMatchPolicyByPod(pod)
140+
matchPolicy := extension.GetGangMatchPolicy(pod)
141+
if matchPolicy == "" {
142+
matchPolicy = extension.GangMatchPolicyOnceSatisfied
143+
}
138144
if matchPolicy != extension.GangMatchPolicyOnlyWaiting && matchPolicy != extension.GangMatchPolicyWaitingAndRunning &&
139145
matchPolicy != extension.GangMatchPolicyOnceSatisfied {
140-
klog.V(4).Infof("pod's annotation AnnotationGangMatchPolicy illegal, gangName: %v, value: %v",
146+
klog.Errorf("pod's annotation AnnotationGangMatchPolicy illegal, gangName: %v, value: %v",
141147
gang.Name, matchPolicy)
142148
matchPolicy = extension.GangMatchPolicyOnceSatisfied
143149
}
@@ -146,17 +152,20 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
146152
// here we assume that Coscheduling's CreateTime equal with the pod's CreateTime
147153
gang.CreateTime = pod.CreationTimestamp.Time
148154

149-
waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime])
150-
if err != nil || waitTime <= 0 {
151-
klog.V(4).ErrorS(err, "pod's annotation GangWaitTimeAnnotation illegal",
155+
waitTime, err := extension.GetGangWaitTime(pod)
156+
if waitTime == 0 {
157+
waitTime = args.DefaultTimeout.Duration
158+
}
159+
if err != nil || waitTime < 0 {
160+
klog.ErrorS(err, "pod's annotation GangWaitTimeAnnotation illegal",
152161
"gangName", gang.Name, "value", pod.Annotations[extension.AnnotationGangWaitTime])
153162
waitTime = args.DefaultTimeout.Duration
154163
}
155164
gang.WaitTime = waitTime
156165

157166
groupSlice, err := util.StringToGangGroupSlice(pod.Annotations[extension.AnnotationGangGroups])
158167
if err != nil {
159-
klog.V(4).ErrorS(err, "pod's annotation GangGroupsAnnotation illegal",
168+
klog.ErrorS(err, "pod's annotation GangGroupsAnnotation illegal",
160169
"gangName", gang.Name, "value", pod.Annotations[extension.AnnotationGangGroups])
161170
}
162171
if len(groupSlice) == 0 {
@@ -167,9 +176,10 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
167176

168177
gang.NetworkTopologySpec, err = extension.GetNetworkTopologySpec(pod)
169178
if err != nil {
170-
klog.V(4).ErrorS(err, "pod's annotation AnnotationGangNetworkTopologySpec illegal",
179+
klog.ErrorS(err, "pod's annotation AnnotationGangNetworkTopologySpec illegal",
171180
"gangName", gang.Name, "value", pod.Annotations[extension.AnnotationGangNetworkTopologySpec])
172181
}
182+
173183
gang.GangFrom = GangFromPodAnnotation
174184
gang.HasGangInit = true
175185

@@ -182,33 +192,41 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs)
182192
func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.CoschedulingArgs) {
183193
gang.lock.Lock()
184194
defer gang.lock.Unlock()
185-
minRequiredNumber := pg.Spec.MinMember
186-
gang.MinRequiredNumber = int(minRequiredNumber)
195+
minRequiredNumber := int(pg.Spec.MinMember)
196+
gang.MinRequiredNumber = minRequiredNumber
187197

188-
totalChildrenNum, err := strconv.ParseInt(pg.Annotations[extension.AnnotationGangTotalNum], 10, 32)
198+
totalChildrenNum, err := extension.GetGangTotalNum(pg)
189199
if err != nil {
190-
klog.V(4).ErrorS(err, "podGroup's annotation totalNumber illegal",
200+
klog.ErrorS(err, "podGroup's annotation totalNumber illegal",
191201
"gangName", gang.Name, "value", pg.Annotations[extension.AnnotationGangTotalNum])
192-
totalChildrenNum = int64(minRequiredNumber)
193-
} else if totalChildrenNum != 0 && totalChildrenNum < int64(minRequiredNumber) {
194-
klog.V(4).Infof("podGroup's annotation totalNumber cannot less than minRequiredNumber, gangName:%v, totalNumber: %v,minRequiredNumber: %v",
202+
totalChildrenNum = minRequiredNumber
203+
} else if totalChildrenNum == 0 {
204+
totalChildrenNum = minRequiredNumber
205+
} else if totalChildrenNum < minRequiredNumber {
206+
klog.Errorf("podGroup's annotation totalNumber cannot less than minRequiredNumber, gangName:%v, totalNumber: %v,minRequiredNumber: %v",
195207
gang.Name, pg.Annotations[extension.AnnotationGangTotalNum], minRequiredNumber)
196-
totalChildrenNum = int64(minRequiredNumber)
208+
totalChildrenNum = minRequiredNumber
197209
}
198-
gang.TotalChildrenNum = int(totalChildrenNum)
210+
gang.TotalChildrenNum = totalChildrenNum
199211

200212
mode := pg.Annotations[extension.AnnotationGangMode]
213+
if mode == "" {
214+
mode = extension.GangModeStrict
215+
}
201216
if mode != extension.GangModeStrict && mode != extension.GangModeNonStrict {
202-
klog.V(4).Infof("podGroup's annotation GangModeAnnotation illegal, gangName: %v, value: %v",
217+
klog.Errorf("podGroup's annotation GangModeAnnotation illegal, gangName: %v, value: %v",
203218
gang.Name, pg.Annotations[extension.AnnotationGangMode])
204219
mode = extension.GangModeStrict
205220
}
206221
gang.Mode = mode
207222

208-
matchPolicy := pg.Annotations[extension.AnnotationGangMatchPolicy]
223+
matchPolicy := extension.GetGangMatchPolicy(pg)
224+
if matchPolicy == "" {
225+
matchPolicy = extension.GangMatchPolicyOnceSatisfied
226+
}
209227
if matchPolicy != extension.GangMatchPolicyOnlyWaiting && matchPolicy != extension.GangMatchPolicyWaitingAndRunning &&
210228
matchPolicy != extension.GangMatchPolicyOnceSatisfied {
211-
klog.V(4).Infof("podGroup's annotation AnnotationGangMatchPolicy illegal, gangName: %v, value: %v",
229+
klog.Errorf("podGroup's annotation AnnotationGangMatchPolicy illegal, gangName: %v, value: %v",
212230
gang.Name, matchPolicy)
213231
matchPolicy = extension.GangMatchPolicyOnceSatisfied
214232
}
@@ -222,7 +240,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu
222240

223241
groupSlice, err := util.StringToGangGroupSlice(pg.Annotations[extension.AnnotationGangGroups])
224242
if err != nil {
225-
klog.V(4).ErrorS(err, "podGroup's annotation GangGroupsAnnotation illegal",
243+
klog.ErrorS(err, "podGroup's annotation GangGroupsAnnotation illegal",
226244
"gangName", gang.Name, "value", pg.Annotations[extension.AnnotationGangGroups])
227245
}
228246
if len(groupSlice) == 0 {
@@ -233,7 +251,7 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu
233251

234252
gang.NetworkTopologySpec, err = extension.GetNetworkTopologySpec(pg)
235253
if err != nil {
236-
klog.V(4).ErrorS(err, "podGroup's annotation AnnotationGangNetworkTopologySpec illegal",
254+
klog.ErrorS(err, "podGroup's annotation AnnotationGangNetworkTopologySpec illegal",
237255
"gangName", gang.Name, "value", pg.Annotations[extension.AnnotationGangNetworkTopologySpec])
238256
}
239257

pkg/scheduler/plugins/coscheduling/core/network_topology_solver.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (solver *networkTopologySolverImpl) calculateNodeOfferSlot(
9898
) map[string]int {
9999
topologyState := TopologyStateFromContext(ctx)
100100
topologyState.NodeOfferSlot = make(map[string]int, len(nodeInfos))
101-
topologyState.NodeToStatusMap = make(framework.NodeToStatusMap, len(nodeInfos))
101+
topologyState.NodeToStatusMap = make(framework.NodeToStatusMap)
102102
var statusLock sync.RWMutex
103103
calculateForNode := func(nodeI int) {
104104
nodeInfo := nodeInfos[nodeI]
@@ -127,7 +127,9 @@ func (solver *networkTopologySolverImpl) calculateNodeOfferSlot(
127127
}
128128
statusLock.Lock()
129129
topologyState.NodeOfferSlot[nodeInfo.Node().Name] = offerSlot
130-
topologyState.NodeToStatusMap[nodeInfo.Node().Name] = status
130+
if !status.IsSuccess() {
131+
topologyState.NodeToStatusMap[nodeInfo.Node().Name] = status
132+
}
131133
statusLock.Unlock()
132134
}
133135
solver.handle.Parallelizer().Until(ctx, len(nodeInfos), calculateForNode, OperationCalculateNodeOfferSlot)
@@ -292,14 +294,14 @@ func distributePods(
292294
for _, pod := range toSchedulePods {
293295
currentNode := topologyOrderedNodes[currentNodeIndex]
294296
offerSlot := nodeToOfferSlot[currentNode]
295-
if offerSlot > 0 {
296-
podToNode[framework.GetNamespacedName(pod.Namespace, pod.Name)] = currentNode
297-
offerSlot--
298-
nodeToOfferSlot[currentNode] = offerSlot
299-
}
300-
if offerSlot <= 0 {
301-
currentNodeIndex += 1
297+
for offerSlot <= 0 {
298+
currentNodeIndex++
299+
currentNode = topologyOrderedNodes[currentNodeIndex]
300+
offerSlot = nodeToOfferSlot[currentNode]
302301
}
302+
podToNode[framework.GetNamespacedName(pod.Namespace, pod.Name)] = currentNode
303+
offerSlot--
304+
nodeToOfferSlot[currentNode] = offerSlot
303305
}
304306
return podToNode
305307
}

pkg/scheduler/plugins/coscheduling/core/network_topology_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ type JobTopologyRequirements struct {
3737

3838
func GetMustGatherLayer(spec *extension.NetworkTopologySpec, isLayerAncestorFunc networktopology.IsLayerAncestorFunc) schedulingv1alpha1.TopologyLayer {
3939
sort.Slice(spec.GatherStrategy, func(i, j int) bool {
40-
return isLayerAncestorFunc(spec.GatherStrategy[i].Layer, spec.GatherStrategy[j].Layer)
40+
return !isLayerAncestorFunc(spec.GatherStrategy[i].Layer, spec.GatherStrategy[j].Layer)
4141
})
4242
for _, rule := range spec.GatherStrategy {
4343
if rule.Strategy == extension.NetworkTopologyGatherStrategyMustGather {

0 commit comments

Comments
 (0)