Skip to content

Commit 86097cc

Browse files
authored
feature: report cpu info to noderesoucetopology (#312)
Signed-off-by: Yue Zhang <huaihuan.zy@alibaba-inc.com>
1 parent ea82b20 commit 86097cc

File tree

8 files changed

+363
-4
lines changed

8 files changed

+363
-4
lines changed

pkg/koordlet/koordlet.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"time"
2323

24+
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
2425
apiruntime "k8s.io/apimachinery/pkg/runtime"
2526
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2627
"k8s.io/apimachinery/pkg/util/wait"
@@ -100,18 +101,20 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
100101

101102
kubeClient := clientset.NewForConfigOrDie(config.KubeRestConf)
102103
crdClient := clientsetbeta1.NewForConfigOrDie(config.KubeRestConf)
104+
topologyClient := topologyclientset.NewForConfigOrDie(config.KubeRestConf)
103105

104106
pleg, err := pleg.NewPLEG(system.Conf.CgroupRootDir)
105107
if err != nil {
106108
return nil, err
107109
}
108110

109-
statesInformer := statesinformer.NewStatesInformer(config.StatesInformerConf, kubeClient, crdClient, pleg, nodeName)
110111
metricCache, err := metriccache.NewMetricCache(config.MetricCacheConf)
111112
if err != nil {
112113
return nil, err
113114
}
114115

116+
statesInformer := statesinformer.NewStatesInformer(config.StatesInformerConf, kubeClient, crdClient, topologyClient, metricCache, pleg, nodeName)
117+
115118
collectorService := metricsadvisor.NewCollector(config.CollectorConf, statesInformer, metricCache)
116119
reporterService := reporter.NewReporter(config.ReporterConf, kubeClient, crdClient, nodeName, metricCache, statesInformer)
117120

pkg/koordlet/statesinformer/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Config struct {
2929
KubeletSyncTimeout time.Duration
3030
InsecureKubeletTLS bool
3131
KubeletReadOnlyPort uint
32+
NodeTopologySyncInterval time.Duration
3233
}
3334

3435
func NewDefaultConfig() *Config {
@@ -38,6 +39,7 @@ func NewDefaultConfig() *Config {
3839
KubeletSyncTimeout: 3 * time.Second,
3940
InsecureKubeletTLS: false,
4041
KubeletReadOnlyPort: 10255,
42+
NodeTopologySyncInterval: 3 * time.Second,
4143
}
4244
}
4345

@@ -47,4 +49,5 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
4749
fs.DurationVar(&c.KubeletSyncTimeout, "kubelet-sync-timeout", c.KubeletSyncTimeout, "The length of time to wait before giving up on a single request to Kubelet. Non-zero values should contain a corresponding time unit (e.g. 1s, 2m, 3h).")
4850
fs.BoolVar(&c.InsecureKubeletTLS, "kubelet-insecure-tls", c.InsecureKubeletTLS, "Using read-only port to communicate with Kubelet. For testing purposes only, not recommended for production use.")
4951
fs.UintVar(&c.KubeletReadOnlyPort, "kubelet-read-only-port", c.KubeletReadOnlyPort, "The read-only port for the kubelet to serve on with no authentication/authorization. Default: 10255.")
52+
fs.DurationVar(&c.NodeTopologySyncInterval, "node-topology-sync-interval", c.NodeTopologySyncInterval, "The interval which Koordlet will report the node topology info")
5053
}

pkg/koordlet/statesinformer/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func TestNewDefaultConfig(t *testing.T) {
3838
KubeletSyncTimeout: 3 * time.Second,
3939
InsecureKubeletTLS: false,
4040
KubeletReadOnlyPort: 10255,
41+
NodeTopologySyncInterval: 3 * time.Second,
4142
},
4243
},
4344
}
@@ -58,6 +59,7 @@ func TestConfig_InitFlags(t *testing.T) {
5859
"--kubelet-sync-timeout=10s",
5960
"--kubelet-insecure-tls=true",
6061
"--kubelet-read-only-port=10258",
62+
"--node-topology-sync-interval=10s",
6163
}
6264
fs := flag.NewFlagSet(cmdArgs[0], flag.ExitOnError)
6365

@@ -67,6 +69,7 @@ func TestConfig_InitFlags(t *testing.T) {
6769
KubeletSyncTimeout time.Duration
6870
InsecureKubeletTLS bool
6971
KubeletReadOnlyPort uint
72+
NodeTopologySyncInterval time.Duration
7073
}
7174
type args struct {
7275
fs *flag.FlagSet
@@ -84,6 +87,7 @@ func TestConfig_InitFlags(t *testing.T) {
8487
KubeletSyncTimeout: 10 * time.Second,
8588
InsecureKubeletTLS: true,
8689
KubeletReadOnlyPort: 10258,
90+
NodeTopologySyncInterval: 10 * time.Second,
8791
},
8892
args: args{fs: fs},
8993
},
@@ -97,6 +101,7 @@ func TestConfig_InitFlags(t *testing.T) {
97101
KubeletSyncTimeout: tt.fields.KubeletSyncTimeout,
98102
InsecureKubeletTLS: tt.fields.InsecureKubeletTLS,
99103
KubeletReadOnlyPort: tt.fields.KubeletReadOnlyPort,
104+
NodeTopologySyncInterval: tt.fields.NodeTopologySyncInterval,
100105
}
101106
c := NewDefaultConfig()
102107
c.InitFlags(tt.args.fs)

pkg/koordlet/statesinformer/states_informer.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,22 @@ import (
2424
"sync"
2525
"time"
2626

27+
topologyclientset "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned"
28+
_ "github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/generated/clientset/versioned/scheme"
2729
"go.uber.org/atomic"
2830
corev1 "k8s.io/api/core/v1"
2931
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3032
apiruntime "k8s.io/apimachinery/pkg/runtime"
3133
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
34+
"k8s.io/apimachinery/pkg/util/wait"
3235
"k8s.io/apimachinery/pkg/watch"
3336
clientset "k8s.io/client-go/kubernetes"
3437
"k8s.io/client-go/tools/cache"
3538
"k8s.io/klog/v2"
3639

3740
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
3841
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
42+
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
3943
"github.com/koordinator-sh/koordinator/pkg/koordlet/pleg"
4044
"github.com/koordinator-sh/koordinator/pkg/util"
4145
)
@@ -72,15 +76,18 @@ type statesInformer struct {
7276
nodeSLORWMutex sync.RWMutex
7377
nodeSLO *slov1alpha1.NodeSLO
7478

79+
topologyClient topologyclientset.Interface
80+
7581
podRWMutex sync.RWMutex
7682
podMap map[string]*PodMeta
7783
podUpdatedTime time.Time
84+
metricsCache metriccache.MetricCache
7885

7986
callbackChans map[reflect.Type]chan struct{}
8087
stateUpdateCallbacks map[reflect.Type][]updateCallback
8188
}
8289

83-
func NewStatesInformer(config *Config, kubeClient clientset.Interface, crdClient koordclientset.Interface, pleg pleg.Pleg, nodeName string) StatesInformer {
90+
func NewStatesInformer(config *Config, kubeClient clientset.Interface, crdClient koordclientset.Interface, topologyClient *topologyclientset.Clientset, metricsCache metriccache.MetricCache, pleg pleg.Pleg, nodeName string) StatesInformer {
8491
nodeInformer := newNodeInformer(kubeClient, nodeName)
8592
nodeSLOInformer := newNodeSLOInformer(crdClient, nodeName)
8693

@@ -102,6 +109,8 @@ func NewStatesInformer(config *Config, kubeClient clientset.Interface, crdClient
102109
stateUpdateCallbacks: map[reflect.Type][]updateCallback{
103110
reflect.TypeOf(&slov1alpha1.NodeSLO{}): {},
104111
},
112+
topologyClient: topologyClient,
113+
metricsCache: metricsCache,
105114
}
106115
}
107116

@@ -152,6 +161,8 @@ func (s *statesInformer) Run(stopCh <-chan struct{}) error {
152161

153162
go s.startCallbackRunners(stopCh)
154163

164+
go wait.Until(s.reportNodeTopology, s.config.NodeTopologySyncInterval, stopCh)
165+
155166
klog.Infof("start states informer successfully")
156167
<-stopCh
157168
klog.Infof("shutting down states informer daemon")

pkg/koordlet/statesinformer/states_informer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func Test_statesInformer_syncPods(t *testing.T) {
188188
}
189189
c := NewDefaultConfig()
190190
c.KubeletSyncInterval = 60 * time.Second
191-
m := NewStatesInformer(c, client, crdClient, pleg, "localhost")
191+
m := NewStatesInformer(c, client, crdClient, nil, nil, pleg, "localhost")
192192
m.(*statesInformer).node = testingNode
193193
m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{
194194
Items: []corev1.Pod{
@@ -303,7 +303,7 @@ func Test_statesInformer_syncKubeletLoop(t *testing.T) {
303303
c := NewDefaultConfig()
304304
c.KubeletSyncInterval = 3 * time.Second
305305

306-
m := NewStatesInformer(c, client, crdClient, pleg, "localhost")
306+
m := NewStatesInformer(c, client, crdClient, nil, nil, pleg, "localhost")
307307
m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{
308308
Items: []corev1.Pod{
309309
{},

pkg/koordlet/statesinformer/states_node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ func (s *statesInformer) setupNodeInformer() {
3232
node, ok := obj.(*corev1.Node)
3333
if ok {
3434
s.syncNode(node)
35+
s.syncNodeResourceTopology(node)
3536
} else {
3637
klog.Errorf("node informer add func parse Node failed, obj %T", obj)
3738
}
@@ -48,6 +49,7 @@ func (s *statesInformer) setupNodeInformer() {
4849
return
4950
}
5051
s.syncNode(newNode)
52+
s.syncNodeResourceTopology(newNode)
5153
},
5254
})
5355
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
Copyright 2022 The Koordinator Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package statesinformer
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
23+
"github.com/k8stopologyawareschedwg/noderesourcetopology-api/pkg/apis/topology/v1alpha1"
24+
corev1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/api/errors"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
"k8s.io/client-go/util/retry"
28+
"k8s.io/klog/v2"
29+
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
30+
31+
"github.com/koordinator-sh/koordinator/apis/extension"
32+
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
33+
)
34+
35+
func (s *statesInformer) syncNodeResourceTopology(node *corev1.Node) {
36+
topologyName := node.Name
37+
ctx := context.TODO()
38+
blocker := true
39+
_, err := s.topologyClient.TopologyV1alpha1().NodeResourceTopologies().Get(ctx, topologyName, metav1.GetOptions{ResourceVersion: "0"})
40+
if err == nil {
41+
return
42+
}
43+
if !errors.IsNotFound(err) {
44+
klog.Errorf("failed to get NodeResourceTopology %s, err: %v", topologyName, err)
45+
return
46+
}
47+
48+
topology := &v1alpha1.NodeResourceTopology{
49+
ObjectMeta: metav1.ObjectMeta{
50+
Name: topologyName,
51+
Labels: map[string]string{
52+
extension.LabelManagedBy: "Koordinator",
53+
},
54+
OwnerReferences: []metav1.OwnerReference{
55+
{
56+
APIVersion: "v1",
57+
Kind: "Node",
58+
Name: node.Name,
59+
UID: node.GetUID(),
60+
Controller: &blocker,
61+
BlockOwnerDeletion: &blocker,
62+
},
63+
},
64+
},
65+
// fields are required
66+
TopologyPolicies: []string{string(v1alpha1.None)},
67+
Zones: v1alpha1.ZoneList{v1alpha1.Zone{Name: "fake-name", Type: "fake-type"}},
68+
}
69+
//TODO: add retry if create fail
70+
_, err = s.topologyClient.TopologyV1alpha1().NodeResourceTopologies().Create(ctx, topology, metav1.CreateOptions{})
71+
if err != nil {
72+
klog.Errorf("failed to create NodeResourceTopology %s, err: %v", topologyName, err)
73+
return
74+
}
75+
}
76+
77+
func (s *statesInformer) reportNodeTopology() {
78+
s.nodeRWMutex.RLock()
79+
nodeName := s.node.Name
80+
s.nodeRWMutex.RUnlock()
81+
ctx := context.TODO()
82+
83+
cpuTopology, usedCPUs, err := s.calCpuTopology()
84+
if err != nil {
85+
return
86+
}
87+
sharePools := s.calCPUSharePools(usedCPUs)
88+
89+
cpuTopologyJson, err := json.Marshal(cpuTopology)
90+
if err != nil {
91+
klog.Errorf("failed to marshal cpu topology of node %s, err: %v", nodeName, err)
92+
return
93+
}
94+
cpuSharePoolsJson, err := json.Marshal(sharePools)
95+
if err != nil {
96+
klog.Errorf("failed to marshal cpushare pools of node %s, err: %v", nodeName, err)
97+
return
98+
}
99+
100+
err = retry.OnError(retry.DefaultBackoff, errors.IsTooManyRequests, func() error {
101+
topology, err := s.topologyClient.TopologyV1alpha1().NodeResourceTopologies().Get(ctx, nodeName, metav1.GetOptions{ResourceVersion: "0"})
102+
if err != nil {
103+
klog.Errorf("failed to get node resource topology %s, err: %v", nodeName, err)
104+
return err
105+
}
106+
if topology.Annotations == nil {
107+
topology.Annotations = make(map[string]string)
108+
}
109+
if topology.Annotations[extension.AnnotationNodeCPUTopology] == string(cpuTopologyJson) && topology.Annotations[extension.AnnotationNodeCPUSharedPools] == string(cpuSharePoolsJson) {
110+
return nil
111+
}
112+
topology.Annotations[extension.AnnotationNodeCPUTopology] = string(cpuTopologyJson)
113+
topology.Annotations[extension.AnnotationNodeCPUSharedPools] = string(cpuSharePoolsJson)
114+
_, err = s.topologyClient.TopologyV1alpha1().NodeResourceTopologies().Update(context.TODO(), topology, metav1.UpdateOptions{})
115+
if err != nil {
116+
klog.Errorf("failed to update cpu info of node %s, err: %v", nodeName, err)
117+
return err
118+
}
119+
return nil
120+
})
121+
if err != nil {
122+
klog.Errorf("failed to update NodeResourceTopology, err: %v", err)
123+
}
124+
}
125+
126+
func (s *statesInformer) calCPUSharePools(usedCPUs map[int32]*extension.CPUInfo) []extension.CPUSharedPool {
127+
podMetas := s.GetAllPods()
128+
for _, podMeta := range podMetas {
129+
status, err := extension.GetResourceStatus(podMeta.Pod.Annotations)
130+
if err != nil {
131+
klog.Errorf("failed to get resource status of pod %s, err: %v", podMeta.Pod.Name, err)
132+
continue
133+
}
134+
if status.CPUSet == "" {
135+
continue
136+
}
137+
138+
set, err := cpuset.Parse(status.CPUSet)
139+
if err != nil {
140+
klog.Errorf("failed to parse cpuset info of pod %s, err: %v", podMeta.Pod.Name, err)
141+
continue
142+
}
143+
for _, cpuID := range set.ToSliceNoSort() {
144+
delete(usedCPUs, int32(cpuID))
145+
}
146+
}
147+
148+
// nodeID -> cpulist
149+
nodeIDToCpus := make(map[int32][]int)
150+
for cpuID, info := range usedCPUs {
151+
if info != nil {
152+
nodeIDToCpus[info.Node] = append(nodeIDToCpus[info.Node], int(cpuID))
153+
}
154+
}
155+
156+
sharePools := []extension.CPUSharedPool{}
157+
for nodeID, cpus := range nodeIDToCpus {
158+
if len(cpus) <= 0 {
159+
continue
160+
}
161+
set := cpuset.NewCPUSet(cpus...)
162+
sharePools = append(sharePools, extension.CPUSharedPool{
163+
CPUSet: set.String(),
164+
Node: nodeID,
165+
Socket: usedCPUs[int32(cpus[0])].Socket,
166+
})
167+
}
168+
return sharePools
169+
}
170+
171+
func (s *statesInformer) calCpuTopology() (*extension.CPUTopology, map[int32]*extension.CPUInfo, error) {
172+
nodeCpuInfo, err := s.metricsCache.GetNodeCPUInfo(&metriccache.QueryParam{})
173+
if err != nil {
174+
klog.Errorf("failed to get node cpu info")
175+
return nil, nil, err
176+
}
177+
178+
usedCPUs := make(map[int32]*extension.CPUInfo)
179+
cpuTopology := &extension.CPUTopology{}
180+
for _, cpu := range nodeCpuInfo.ProcessorInfos {
181+
info := extension.CPUInfo{
182+
ID: cpu.CPUID,
183+
Core: cpu.CoreID,
184+
Socket: cpu.SocketID,
185+
Node: cpu.NodeID,
186+
}
187+
cpuTopology.Detail = append(cpuTopology.Detail, info)
188+
usedCPUs[cpu.CPUID] = &info
189+
}
190+
return cpuTopology, usedCPUs, nil
191+
}

0 commit comments

Comments
 (0)