Skip to content

Commit 1168c17

Browse files
authored
koordlet: PerformanceCollector supports cgroups-v2 (#984)
Signed-off-by: Re-Grh <1271013391@qq.com>
1 parent ddf7ad0 commit 1168c17

22 files changed

+392
-104
lines changed

pkg/koordlet/metrics/metrics_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2828

2929
apiext "github.com/koordinator-sh/koordinator/apis/extension"
30-
koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util"
3130
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
3231
"github.com/koordinator-sh/koordinator/pkg/util"
3332
)
@@ -77,7 +76,7 @@ func TestCommonCollectors(t *testing.T) {
7776
UID: "test01",
7877
},
7978
}
80-
testingPSI := &koordletutil.PSIByResource{
79+
testingPSI := &system.PSIByResource{
8180
CPU: system.PSIStats{
8281
Some: &system.PSILine{
8382
Avg10: 1,

pkg/koordlet/metrics/psi.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/prometheus/client_golang/prometheus"
2323
corev1 "k8s.io/api/core/v1"
2424

25-
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
2625
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
2726
)
2827

@@ -74,7 +73,7 @@ type PSIRecord struct {
7473
CPUFullSupported bool
7574
}
7675

77-
func getPSIRecords(psi *util.PSIByResource) []PSIRecord {
76+
func getPSIRecords(psi *system.PSIByResource) []PSIRecord {
7877
var psiRecordAll []PSIRecord
7978
psiRecordAll = append(psiRecordAll, makePSIRecordSlice(ResourceTypeCPU, psi.CPU)...)
8079
psiRecordAll = append(psiRecordAll, makePSIRecordSlice(ResourceTypeMem, psi.Mem)...)
@@ -135,7 +134,7 @@ func makePSIRecordSlice(resourceType string, psiStats system.PSIStats) []PSIReco
135134
return records
136135
}
137136

138-
func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *util.PSIByResource) {
137+
func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *system.PSIByResource) {
139138
psiRecords := getPSIRecords(psi)
140139
for _, record := range psiRecords {
141140
labels := genNodeLabels()
@@ -156,7 +155,7 @@ func RecordContainerPSI(status *corev1.ContainerStatus, pod *corev1.Pod, psi *ut
156155
}
157156
}
158157

159-
func RecordPodPSI(pod *corev1.Pod, psi *util.PSIByResource) {
158+
func RecordPodPSI(pod *corev1.Pod, psi *system.PSIByResource) {
160159
psiRecords := getPSIRecords(psi)
161160
for _, record := range psiRecords {
162161
labels := genNodeLabels()

pkg/koordlet/metrics/psi_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@ import (
2121

2222
"github.com/stretchr/testify/assert"
2323

24-
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
2524
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
2625
)
2726

2827
func TestGetPSIRecords(t *testing.T) {
29-
testingRecords := &util.PSIByResource{
28+
testingRecords := &system.PSIByResource{
3029
CPU: system.PSIStats{
3130
Some: &system.PSILine{},
3231
Full: &system.PSILine{},

pkg/koordlet/metricsadvisor/collector.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (c *collector) Run(stopCh <-chan struct{}) error {
140140

141141
go wait.Until(c.collectNodeCPUInfo, time.Duration(c.config.CollectNodeCPUInfoIntervalSeconds)*time.Second, stopCh)
142142

143-
ic := NewPerformanceCollector(c.statesInformer, c.metricCache, c.config.CPICollectorTimeWindowSeconds)
143+
ic := NewPerformanceCollector(c.statesInformer, c.metricCache, c.cgroupReader, c.config.CPICollectorTimeWindowSeconds)
144144
util.RunFeature(func() {
145145
// add sync statesInformer cache check before collect pod information
146146
// because collect function will get all pods.
@@ -153,10 +153,15 @@ func (c *collector) Run(stopCh <-chan struct{}) error {
153153
}, []featuregate.Feature{features.CPICollector}, c.config.CPICollectorIntervalSeconds, stopCh)
154154

155155
util.RunFeature(func() {
156-
// psi collector support only on anolis os currently
157-
if !system.HostSystemInfo.IsAnolisOS {
158-
klog.Fatalf("collect psi fail, need anolis os")
159-
return
156+
// CgroupV1 psi collector support only on anolis os currently
157+
if system.GetCurrentCgroupVersion() == system.CgroupVersionV1 {
158+
cpuPressureCheck, _ := system.CPUAcctCPUPressure.IsSupported("")
159+
memPressureCheck, _ := system.CPUAcctMemoryPressure.IsSupported("")
160+
ioPressureCheck, _ := system.CPUAcctIOPressure.IsSupported("")
161+
if !(cpuPressureCheck && memPressureCheck && ioPressureCheck) {
162+
klog.V(5).Infof("system now not support psi feature in CgroupV1, please check pressure file exist and readable in cpuacct directory.")
163+
return
164+
}
160165
}
161166
// add sync statesInformer cache check before collect pod information
162167
// because collect function will get all pods.

pkg/koordlet/metricsadvisor/performance_collector_linux.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
3030
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
31+
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
3132
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
3233
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
3334
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/perf"
@@ -36,13 +37,15 @@ import (
3637
type performanceCollector struct {
3738
statesInformer statesinformer.StatesInformer
3839
metricCache metriccache.MetricCache
40+
cgroupReader resourceexecutor.CgroupReader
3941
collectTimeWindow int
4042
}
4143

42-
func NewPerformanceCollector(statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache, collectTimeWindow int) *performanceCollector {
44+
func NewPerformanceCollector(statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache, cgroupReader resourceexecutor.CgroupReader, collectTimeWindow int) *performanceCollector {
4345
return &performanceCollector{
4446
statesInformer: statesInformer,
4547
metricCache: metricCache,
48+
cgroupReader: cgroupReader,
4649
collectTimeWindow: collectTimeWindow,
4750
}
4851
}
@@ -175,7 +178,12 @@ func (c *performanceCollector) collectContainerPSI() {
175178

176179
func (c *performanceCollector) collectSingleContainerPSI(podParentCgroupDir string, containerStatus *corev1.ContainerStatus, pod *corev1.Pod) {
177180
collectTime := time.Now()
178-
containerPSI, err := util.GetContainerPSI(podParentCgroupDir, containerStatus)
181+
containerPath, err := util.GetContainerCgroupPathWithKube(podParentCgroupDir, containerStatus)
182+
if err != nil {
183+
klog.Errorf("failed to get container path for container %v/%v/%v cgroup path failed, error: %v", pod.Namespace, pod.Name, containerStatus.Name, err)
184+
return
185+
}
186+
containerPSI, err := c.cgroupReader.ReadPSI(containerPath)
179187
if err != nil {
180188
klog.Errorf("collect container %s psi err: %v", containerStatus.Name, err)
181189
return
@@ -223,9 +231,10 @@ func (c *performanceCollector) collectPodPSI() {
223231

224232
func (c *performanceCollector) collectSinglePodPSI(pod *corev1.Pod, podCgroupDir string) {
225233
collectTime := time.Now()
226-
podPSI, err := util.GetPodPSI(podCgroupDir)
234+
paths := util.GetPodCgroupDirWithKube(podCgroupDir)
235+
podPSI, err := c.cgroupReader.ReadPSI(paths)
227236
if err != nil {
228-
klog.Errorf("collect pod %s psi err: %v", pod.Name, err)
237+
klog.Errorf("collect pod %v/%v psi err: %v", pod.Namespace, pod.Name, err)
229238
return
230239
}
231240
podPsiMetric := &metriccache.PodInterferenceMetric{

pkg/koordlet/metricsadvisor/performance_collector_linux_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929

3030
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
3131
mockmetriccache "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache/mockmetriccache"
32+
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
3233
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
3334
mockstatesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer"
3435
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
@@ -40,6 +41,7 @@ func TestNewPerformanceCollector(t *testing.T) {
4041
cfg *Config
4142
statesInformer statesinformer.StatesInformer
4243
metricCache metriccache.MetricCache
44+
cgroupReader resourceexecutor.CgroupReader
4345
timeWindow int
4446
}
4547
tests := []struct {
@@ -52,13 +54,14 @@ func TestNewPerformanceCollector(t *testing.T) {
5254
cfg: &Config{},
5355
statesInformer: nil,
5456
metricCache: nil,
57+
cgroupReader: nil,
5558
timeWindow: 10,
5659
},
5760
},
5861
}
5962
for _, tt := range tests {
6063
t.Run(tt.name, func(t *testing.T) {
61-
if got := NewPerformanceCollector(tt.args.statesInformer, tt.args.metricCache, tt.args.timeWindow); got == nil {
64+
if got := NewPerformanceCollector(tt.args.statesInformer, tt.args.metricCache, nil, tt.args.timeWindow); got == nil {
6265
t.Errorf("NewPerformanceCollector() = %v", got)
6366
}
6467
})
@@ -75,7 +78,7 @@ func Test_collectContainerCPI(t *testing.T) {
7578
mockStatesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{}).AnyTimes()
7679
mockMetricCache.EXPECT().GetNodeCPUInfo(&metriccache.QueryParam{}).Return(cpuInfo, nil).AnyTimes()
7780

78-
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, 1)
81+
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, nil, 1)
7982
assert.NotPanics(t, func() {
8083
c.collectContainerCPI()
8184
})
@@ -91,7 +94,7 @@ func Test_collectContainerCPI_cpuInfoErr(t *testing.T) {
9194
mockStatesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{}).AnyTimes()
9295
mockMetricCache.EXPECT().GetNodeCPUInfo(&metriccache.QueryParam{}).Return(cpuInfo, fmt.Errorf("cpu_error")).AnyTimes()
9396

94-
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, 1)
97+
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, nil, 1)
9598
assert.NotPanics(t, func() {
9699
c.collectContainerCPI()
97100
})
@@ -109,7 +112,7 @@ func Test_collectContainerCPI_mockPod(t *testing.T) {
109112
mockStatesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{pod}).AnyTimes()
110113
mockMetricCache.EXPECT().GetNodeCPUInfo(&metriccache.QueryParam{}).Return(cpuInfo, nil).AnyTimes()
111114

112-
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, 1)
115+
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, nil, 1)
113116
assert.NotPanics(t, func() {
114117
c.collectContainerCPI()
115118
})
@@ -146,7 +149,7 @@ func Test_getAndStartCollectorOnSingleContainer(t *testing.T) {
146149
containerStatus := &corev1.ContainerStatus{
147150
ContainerID: "containerd://test",
148151
}
149-
c := NewPerformanceCollector(nil, nil, 0)
152+
c := NewPerformanceCollector(nil, nil, nil, 0)
150153
assert.NotPanics(t, func() {
151154
_, err := c.getAndStartCollectorOnSingleContainer(tempDir, containerStatus, 0)
152155
if err != nil {
@@ -169,7 +172,7 @@ func Test_profilePerfOnSingleContainer(t *testing.T) {
169172
f, _ := os.OpenFile(tempDir, os.O_RDONLY, os.ModeDir)
170173
perfCollector, _ := perf.NewPerfCollector(f, []int{})
171174

172-
c := NewPerformanceCollector(nil, m, 0)
175+
c := NewPerformanceCollector(nil, m, nil, 0)
173176
testingPod := &corev1.Pod{
174177
ObjectMeta: metav1.ObjectMeta{
175178
Name: "test_pod",
@@ -225,7 +228,7 @@ func Test_collectContainerPSI(t *testing.T) {
225228
t.Fatalf("got error when create psi files: %v", errCreateIO)
226229
}
227230

228-
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, 1)
231+
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, resourceexecutor.NewCgroupReader(), 1)
229232
assert.NotPanics(t, func() {
230233
c.collectContainerPSI()
231234
})
@@ -255,7 +258,7 @@ func Test_collectPodPSI(t *testing.T) {
255258
t.Fatalf("got error when create psi files: %v", errCreateIO)
256259
}
257260

258-
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, 1)
261+
c := NewPerformanceCollector(mockStatesInformer, mockMetricCache, resourceexecutor.NewCgroupReader(), 1)
259262
assert.NotPanics(t, func() {
260263
c.collectPodPSI()
261264
})

pkg/koordlet/metricsadvisor/performance_collector_unsupported.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ package metricsadvisor
2121

2222
import (
2323
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
24+
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
2425
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
2526
)
2627

2728
type performanceCollector struct{}
2829

29-
func NewPerformanceCollector(statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache, collectTimeWindow int) *performanceCollector {
30+
func NewPerformanceCollector(statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache, cgroupReader resourceexecutor.CgroupReader, collectTimeWindow int) *performanceCollector {
3031
return &performanceCollector{}
3132
}
3233

pkg/koordlet/resmanager/cpu_burst.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,11 @@ func (b *CPUBurst) applyCFSQuotaBurst(burstCfg *slov1alpha1.CPUBurstConfig, podM
325325
continue
326326
}
327327
containerCurCFS, err := b.cgroupReader.ReadCPUQuota(containerPath)
328-
if err != nil {
328+
if err != nil && system.IsCgroupDirErr(err) {
329+
klog.V(5).Infof("get container %s/%s/%s current cfs quota failed, maybe not exist, skip this round, reason %v",
330+
pod.Namespace, pod.Name, containerStat.Name, err)
331+
continue
332+
} else if err != nil {
329333
klog.Infof("get container %s/%s/%s current cfs quota failed, maybe not exist, skip this round, reason %v",
330334
pod.Namespace, pod.Name, containerStat.Name, err)
331335
continue

pkg/koordlet/resourceexecutor/executor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(cacheable bool, updaters
119119
}
120120

121121
mergedUpdater, err := updater.MergeUpdate()
122-
if err != nil && sysutil.IsResourceUnsupportedErr(err) {
123-
klog.V(5).Infof("failed merge update unsupported resource %s, err: %v", updater.Key(), err)
122+
if err != nil && (sysutil.IsResourceUnsupportedErr(err) || sysutil.IsCgroupDirErr(err)) {
123+
klog.V(5).Infof("failed merge update resource %s, err: %v", updater.Key(), err)
124124
continue
125125
} else if err != nil {
126126
klog.V(4).Infof("failed merge update resource %s, err: %v", updater.Key(), err)
@@ -155,8 +155,8 @@ func (e *ResourceUpdateExecutorImpl) LeveledUpdateBatch(cacheable bool, updaters
155155
continue
156156
}
157157
err = updater.Update()
158-
if err != nil && sysutil.IsResourceUnsupportedErr(err) {
159-
klog.V(5).Infof("failed update unsupported resource %s, err: %v", updater.Key(), err)
158+
if err != nil && (sysutil.IsResourceUnsupportedErr(err) || sysutil.IsCgroupDirErr(err)) {
159+
klog.V(5).Infof("failed update resource %s, err: %v", updater.Key(), err)
160160
continue
161161
} else if err != nil {
162162
klog.V(4).Infof("failed update resource %s, err: %v", updater.Key(), err)

pkg/koordlet/resourceexecutor/reader.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type CgroupReader interface {
3636
ReadMemoryLimit(parentDir string) (int64, error)
3737
ReadMemoryStat(parentDir string) (*sysutil.MemoryStatRaw, error)
3838
ReadCPUTasks(parentDir string) ([]int32, error)
39+
ReadPSI(parentDir string) (*sysutil.PSIByResource, error)
3940
}
4041

4142
var _ CgroupReader = &CgroupV1Reader{}
@@ -66,6 +67,32 @@ func (r *CgroupV1Reader) ReadCPUShares(parentDir string) (int64, error) {
6667
return sysutil.ReadCgroupAndParseInt64(parentDir, resource)
6768
}
6869

70+
func (r *CgroupV1Reader) ReadPSI(parentDir string) (*sysutil.PSIByResource, error) {
71+
cpuPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV1, sysutil.CPUAcctCPUPressureName)
72+
if !ok {
73+
return nil, ErrResourceNotRegistered
74+
}
75+
memPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV1, sysutil.CPUAcctMemoryPressureName)
76+
if !ok {
77+
return nil, ErrResourceNotRegistered
78+
}
79+
ioPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV1, sysutil.CPUAcctIOPressureName)
80+
if !ok {
81+
return nil, ErrResourceNotRegistered
82+
}
83+
84+
paths := sysutil.PSIPath{
85+
CPU: cpuPressureResource.Path(parentDir),
86+
Mem: memPressureResource.Path(parentDir),
87+
IO: ioPressureResource.Path(parentDir),
88+
}
89+
psi, err := sysutil.GetPSIByResource(paths)
90+
if err != nil {
91+
return nil, err
92+
}
93+
return psi, nil
94+
}
95+
6996
func (r *CgroupV1Reader) ReadCPUSet(parentDir string) (*cpuset.CPUSet, error) {
7097
resource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV1, sysutil.CPUSetCPUSName)
7198
if !ok {
@@ -162,7 +189,9 @@ func (r *CgroupV2Reader) ReadCPUQuota(parentDir string) (int64, error) {
162189
return -1, ErrResourceNotRegistered
163190
}
164191
s, err := sysutil.CgroupFileRead(parentDir, resource)
165-
if err != nil {
192+
if err != nil && sysutil.IsCgroupDirErr(err) {
193+
return -1, err
194+
} else if err != nil {
166195
return -1, fmt.Errorf("cannot read cgroup file, err: %v", err)
167196
}
168197

@@ -293,6 +322,32 @@ func (r *CgroupV2Reader) ReadCPUTasks(parentDir string) ([]int32, error) {
293322
return sysutil.ReadCgroupAndParseInt32Slice(parentDir, resource)
294323
}
295324

325+
func (r *CgroupV2Reader) ReadPSI(parentDir string) (*sysutil.PSIByResource, error) {
326+
cpuPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV2, sysutil.CPUAcctCPUPressureName)
327+
if !ok {
328+
return nil, ErrResourceNotRegistered
329+
}
330+
memPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV2, sysutil.CPUAcctMemoryPressureName)
331+
if !ok {
332+
return nil, ErrResourceNotRegistered
333+
}
334+
ioPressureResource, ok := sysutil.DefaultRegistry.Get(sysutil.CgroupVersionV2, sysutil.CPUAcctIOPressureName)
335+
if !ok {
336+
return nil, ErrResourceNotRegistered
337+
}
338+
339+
paths := sysutil.PSIPath{
340+
CPU: cpuPressureResource.Path(parentDir),
341+
Mem: memPressureResource.Path(parentDir),
342+
IO: ioPressureResource.Path(parentDir),
343+
}
344+
psi, err := sysutil.GetPSIByResource(paths)
345+
if err != nil {
346+
return nil, err
347+
}
348+
return psi, nil
349+
}
350+
296351
func NewCgroupReader() CgroupReader {
297352
if sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
298353
return &CgroupV2Reader{}

0 commit comments

Comments
 (0)