Skip to content

Commit 3c4ddc0

Browse files
authored
koord-scheduler: refactor ReservationInfo as public (#1280)
Signed-off-by: Joseph <joseph.t.lee@outlook.com>
1 parent f377ce8 commit 3c4ddc0

File tree

13 files changed

+236
-230
lines changed

13 files changed

+236
-230
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
Copyright 2022 The Koordinator Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package frameworkext
18+
19+
import (
20+
corev1 "k8s.io/api/core/v1"
21+
"k8s.io/apimachinery/pkg/types"
22+
quotav1 "k8s.io/apiserver/pkg/quota/v1"
23+
"k8s.io/kubernetes/pkg/api/v1/resource"
24+
25+
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
26+
reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation"
27+
)
28+
29+
type ReservationInfo struct {
30+
Reservation *schedulingv1alpha1.Reservation
31+
ResourceNames []corev1.ResourceName
32+
Allocatable corev1.ResourceList
33+
Allocated corev1.ResourceList
34+
Pods map[types.UID]*PodRequirement
35+
}
36+
37+
type PodRequirement struct {
38+
Namespace string
39+
Name string
40+
UID types.UID
41+
Requests corev1.ResourceList
42+
}
43+
44+
func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
45+
allocatable := reservationutil.ReservationRequests(r)
46+
resourceNames := quotav1.ResourceNames(allocatable)
47+
48+
return &ReservationInfo{
49+
Reservation: r.DeepCopy(),
50+
ResourceNames: resourceNames,
51+
Allocatable: allocatable,
52+
Pods: map[types.UID]*PodRequirement{},
53+
}
54+
}
55+
56+
func (ri *ReservationInfo) Clone() *ReservationInfo {
57+
resourceNames := make([]corev1.ResourceName, 0, len(ri.ResourceNames))
58+
for _, v := range ri.ResourceNames {
59+
resourceNames = append(resourceNames, v)
60+
}
61+
62+
pods := map[types.UID]*PodRequirement{}
63+
for k, v := range ri.Pods {
64+
pods[k] = &PodRequirement{
65+
Namespace: v.Namespace,
66+
Name: v.Name,
67+
UID: v.UID,
68+
Requests: v.Requests.DeepCopy(),
69+
}
70+
}
71+
72+
return &ReservationInfo{
73+
Reservation: ri.Reservation.DeepCopy(),
74+
ResourceNames: resourceNames,
75+
Allocatable: ri.Allocatable.DeepCopy(),
76+
Allocated: ri.Allocated.DeepCopy(),
77+
Pods: pods,
78+
}
79+
}
80+
81+
func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation) {
82+
ri.Reservation = r.DeepCopy()
83+
ri.Allocatable = reservationutil.ReservationRequests(r)
84+
ri.ResourceNames = quotav1.ResourceNames(ri.Allocatable)
85+
ri.Allocated = quotav1.Mask(ri.Allocated, ri.ResourceNames)
86+
}
87+
88+
func (ri *ReservationInfo) AddPod(pod *corev1.Pod) {
89+
requests, _ := resource.PodRequestsAndLimits(pod)
90+
ri.Allocated = quotav1.Add(ri.Allocated, quotav1.Mask(requests, ri.ResourceNames))
91+
ri.Pods[pod.UID] = &PodRequirement{
92+
Namespace: pod.Namespace,
93+
Name: pod.Name,
94+
UID: pod.UID,
95+
Requests: requests,
96+
}
97+
}
98+
99+
func (ri *ReservationInfo) RemovePod(pod *corev1.Pod) {
100+
if requirement, ok := ri.Pods[pod.UID]; ok {
101+
if len(requirement.Requests) > 0 {
102+
ri.Allocated = quotav1.SubtractWithNonNegativeResult(ri.Allocated, quotav1.Mask(requirement.Requests, ri.ResourceNames))
103+
}
104+
delete(ri.Pods, pod.UID)
105+
}
106+
}

pkg/scheduler/plugins/reservation/cache.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,20 @@ import (
2424

2525
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
2626
schedulinglister "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1"
27+
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
2728
)
2829

2930
type reservationCache struct {
3031
reservationLister schedulinglister.ReservationLister
3132
lock sync.Mutex
32-
reservationInfos map[types.UID]*reservationInfo
33+
reservationInfos map[types.UID]*frameworkext.ReservationInfo
3334
reservationsOnNode map[string]map[types.UID]*schedulingv1alpha1.Reservation
3435
}
3536

3637
func newReservationCache(reservationLister schedulinglister.ReservationLister) *reservationCache {
3738
cache := &reservationCache{
3839
reservationLister: reservationLister,
39-
reservationInfos: map[types.UID]*reservationInfo{},
40+
reservationInfos: map[types.UID]*frameworkext.ReservationInfo{},
4041
reservationsOnNode: map[string]map[types.UID]*schedulingv1alpha1.Reservation{},
4142
}
4243
return cache
@@ -79,10 +80,10 @@ func (cache *reservationCache) updateReservation(newR *schedulingv1alpha1.Reserv
7980
defer cache.lock.Unlock()
8081
rInfo := cache.reservationInfos[newR.UID]
8182
if rInfo == nil {
82-
rInfo = newReservationInfo(newR)
83+
rInfo = frameworkext.NewReservationInfo(newR)
8384
cache.reservationInfos[newR.UID] = rInfo
8485
} else {
85-
rInfo.updateReservation(newR)
86+
rInfo.UpdateReservation(newR)
8687
}
8788
if newR.Status.NodeName != "" {
8889
cache.updateReservationsOnNode(newR.Status.NodeName, newR)
@@ -110,7 +111,7 @@ func (cache *reservationCache) addPod(reservationUID types.UID, pod *corev1.Pod)
110111

111112
rInfo := cache.reservationInfos[reservationUID]
112113
if rInfo != nil {
113-
rInfo.addPod(pod)
114+
rInfo.AddPod(pod)
114115
}
115116
}
116117

@@ -120,8 +121,8 @@ func (cache *reservationCache) updatePod(reservationUID types.UID, oldPod, newPo
120121

121122
rInfo := cache.reservationInfos[reservationUID]
122123
if rInfo != nil {
123-
rInfo.removePod(oldPod)
124-
rInfo.addPod(newPod)
124+
rInfo.RemovePod(oldPod)
125+
rInfo.AddPod(newPod)
125126
}
126127
}
127128

@@ -131,19 +132,19 @@ func (cache *reservationCache) deletePod(reservationUID types.UID, pod *corev1.P
131132

132133
rInfo := cache.reservationInfos[reservationUID]
133134
if rInfo != nil {
134-
rInfo.removePod(pod)
135+
rInfo.RemovePod(pod)
135136
}
136137
}
137138

138-
func (cache *reservationCache) getReservationInfo(name string) *reservationInfo {
139+
func (cache *reservationCache) getReservationInfo(name string) *frameworkext.ReservationInfo {
139140
reservation, err := cache.reservationLister.Get(name)
140141
if err != nil {
141142
return nil
142143
}
143144
return cache.getReservationInfoByUID(reservation.UID)
144145
}
145146

146-
func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *reservationInfo {
147+
func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *frameworkext.ReservationInfo {
147148
cache.lock.Lock()
148149
defer cache.lock.Unlock()
149150
rInfo := cache.reservationInfos[uid]
@@ -153,14 +154,14 @@ func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *reservati
153154
return nil
154155
}
155156

156-
func (cache *reservationCache) listReservationInfosOnNode(nodeName string) []*reservationInfo {
157+
func (cache *reservationCache) listReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo {
157158
cache.lock.Lock()
158159
defer cache.lock.Unlock()
159160
rOnNode := cache.reservationsOnNode[nodeName]
160161
if len(rOnNode) == 0 {
161162
return nil
162163
}
163-
result := make([]*reservationInfo, 0, len(rOnNode))
164+
result := make([]*frameworkext.ReservationInfo, 0, len(rOnNode))
164165
for _, v := range rOnNode {
165166
rInfo := cache.reservationInfos[v.UID]
166167
if rInfo != nil {

pkg/scheduler/plugins/reservation/cache_test.go

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
quotav1 "k8s.io/apiserver/pkg/quota/v1"
3030

3131
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
32+
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
3233
)
3334

3435
func TestCacheUpdateReservation(t *testing.T) {
@@ -70,31 +71,31 @@ func TestCacheUpdateReservation(t *testing.T) {
7071
reservationInfos := cache.listReservationInfosOnNode(reservation.Status.NodeName)
7172
assert.Len(t, reservationInfos, 1)
7273
rInfo := reservationInfos[0]
73-
expectReservationInfo := &reservationInfo{
74-
reservation: reservation,
75-
resourceNames: []corev1.ResourceName{
74+
expectReservationInfo := &frameworkext.ReservationInfo{
75+
Reservation: reservation,
76+
ResourceNames: []corev1.ResourceName{
7677
corev1.ResourceCPU,
7778
corev1.ResourceMemory,
7879
},
79-
allocatable: corev1.ResourceList{
80+
Allocatable: corev1.ResourceList{
8081
corev1.ResourceCPU: resource.MustParse("4"),
8182
corev1.ResourceMemory: resource.MustParse("4Gi"),
8283
},
83-
allocated: nil,
84-
pods: map[types.UID]*podRequirement{},
84+
Allocated: nil,
85+
Pods: map[types.UID]*frameworkext.PodRequirement{},
8586
}
86-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
87-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
87+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
88+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
8889
})
8990
assert.Equal(t, expectReservationInfo, rInfo)
9091

9192
cache.updateReservation(reservation)
9293
reservationInfos = cache.listReservationInfosOnNode(reservation.Status.NodeName)
9394
assert.Len(t, reservationInfos, 1)
9495
rInfo = reservationInfos[0]
95-
expectReservationInfo.allocated = corev1.ResourceList{}
96-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
97-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
96+
expectReservationInfo.Allocated = corev1.ResourceList{}
97+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
98+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
9899
})
99100
assert.Equal(t, expectReservationInfo, rInfo)
100101
}
@@ -139,21 +140,21 @@ func TestCacheDeleteReservation(t *testing.T) {
139140
rInfo := cache.getReservationInfoByUID(reservation.UID)
140141
assert.NotNil(t, rInfo)
141142

142-
expectReservationInfo := &reservationInfo{
143-
reservation: reservation,
144-
resourceNames: []corev1.ResourceName{
143+
expectReservationInfo := &frameworkext.ReservationInfo{
144+
Reservation: reservation,
145+
ResourceNames: []corev1.ResourceName{
145146
corev1.ResourceCPU,
146147
corev1.ResourceMemory,
147148
},
148-
allocatable: corev1.ResourceList{
149+
Allocatable: corev1.ResourceList{
149150
corev1.ResourceCPU: resource.MustParse("4"),
150151
corev1.ResourceMemory: resource.MustParse("4Gi"),
151152
},
152-
allocated: nil,
153-
pods: map[types.UID]*podRequirement{},
153+
Allocated: nil,
154+
Pods: map[types.UID]*frameworkext.PodRequirement{},
154155
}
155-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
156-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
156+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
157+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
157158
})
158159
assert.Equal(t, expectReservationInfo, rInfo)
159160

@@ -225,29 +226,29 @@ func TestCacheAddOrUpdateOrDeletePod(t *testing.T) {
225226
cache.addPod(reservation.UID, pod)
226227

227228
rInfo = cache.getReservationInfoByUID(reservation.UID)
228-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
229-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
229+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
230+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
230231
})
231-
expectReservationInfo := &reservationInfo{
232-
reservation: reservation,
233-
resourceNames: []corev1.ResourceName{
232+
expectReservationInfo := &frameworkext.ReservationInfo{
233+
Reservation: reservation,
234+
ResourceNames: []corev1.ResourceName{
234235
corev1.ResourceCPU,
235236
corev1.ResourceMemory,
236237
},
237-
allocatable: corev1.ResourceList{
238+
Allocatable: corev1.ResourceList{
238239
corev1.ResourceCPU: resource.MustParse("4000m"),
239240
corev1.ResourceMemory: resource.MustParse("4Gi"),
240241
},
241-
allocated: corev1.ResourceList{
242+
Allocated: corev1.ResourceList{
242243
corev1.ResourceCPU: resource.MustParse("2000m"),
243244
corev1.ResourceMemory: resource.MustParse("2Gi"),
244245
},
245-
pods: map[types.UID]*podRequirement{
246+
Pods: map[types.UID]*frameworkext.PodRequirement{
246247
pod.UID: {
247-
namespace: pod.Namespace,
248-
name: pod.Name,
249-
uid: pod.UID,
250-
requests: corev1.ResourceList{
248+
Namespace: pod.Namespace,
249+
Name: pod.Name,
250+
UID: pod.UID,
251+
Requests: corev1.ResourceList{
251252
corev1.ResourceCPU: resource.MustParse("2000m"),
252253
corev1.ResourceMemory: resource.MustParse("2Gi"),
253254
},
@@ -258,24 +259,24 @@ func TestCacheAddOrUpdateOrDeletePod(t *testing.T) {
258259

259260
cache.updatePod(reservation.UID, pod, pod)
260261
rInfo = cache.getReservationInfoByUID(reservation.UID)
261-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
262-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
262+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
263+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
263264
})
264-
expectReservationInfo.allocated = quotav1.SubtractWithNonNegativeResult(expectReservationInfo.allocated, corev1.ResourceList{
265+
expectReservationInfo.Allocated = quotav1.SubtractWithNonNegativeResult(expectReservationInfo.Allocated, corev1.ResourceList{
265266
corev1.ResourceCPU: resource.MustParse("0"),
266267
corev1.ResourceMemory: resource.MustParse("0"),
267268
})
268269
assert.Equal(t, expectReservationInfo, rInfo)
269270

270271
cache.deletePod(reservation.UID, pod)
271272
rInfo = cache.getReservationInfoByUID(reservation.UID)
272-
sort.Slice(rInfo.resourceNames, func(i, j int) bool {
273-
return rInfo.resourceNames[i] < rInfo.resourceNames[j]
273+
sort.Slice(rInfo.ResourceNames, func(i, j int) bool {
274+
return rInfo.ResourceNames[i] < rInfo.ResourceNames[j]
274275
})
275-
expectReservationInfo.allocated = corev1.ResourceList{
276+
expectReservationInfo.Allocated = corev1.ResourceList{
276277
corev1.ResourceCPU: resource.MustParse("0"),
277278
corev1.ResourceMemory: resource.MustParse("0"),
278279
}
279-
expectReservationInfo.pods = map[types.UID]*podRequirement{}
280+
expectReservationInfo.Pods = map[types.UID]*frameworkext.PodRequirement{}
280281
assert.Equal(t, expectReservationInfo, rInfo)
281282
}

pkg/scheduler/plugins/reservation/eventhandler_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestEventHandlerOnAdd(t *testing.T) {
106106
assert.Nil(t, rInfo)
107107
} else {
108108
rInfo := cache.getReservationInfoByUID(tt.wantReservation.UID)
109-
assert.Equal(t, tt.wantReservation, rInfo.reservation)
109+
assert.Equal(t, tt.wantReservation, rInfo.Reservation)
110110
}
111111
})
112112
}
@@ -201,7 +201,7 @@ func TestEventHandlerUpdate(t *testing.T) {
201201
assert.Nil(t, rInfo)
202202
} else {
203203
rInfo := cache.getReservationInfoByUID(tt.wantReservation.UID)
204-
assert.Equal(t, tt.wantReservation, rInfo.reservation)
204+
assert.Equal(t, tt.wantReservation, rInfo.Reservation)
205205
}
206206
})
207207
}

pkg/scheduler/plugins/reservation/nominator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ func (pl *Plugin) NominateReservation(ctx context.Context, cycleState *framework
5353

5454
reservations := make([]*schedulingv1alpha1.Reservation, 0, len(rOnNode))
5555
for _, rInfo := range rOnNode {
56-
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, rInfo.reservation, nodeName)
56+
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, rInfo.Reservation, nodeName)
5757
if !status.IsSuccess() {
5858
continue
5959
}
60-
reservations = append(reservations, rInfo.reservation)
60+
reservations = append(reservations, rInfo.Reservation)
6161
}
6262
if len(reservations) == 0 {
6363
return nil, nil

0 commit comments

Comments
 (0)