Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {

qosManager := qosmanager.NewQOSManager(config.QOSManagerConf, scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, config.CollectorConf, evictVersion)

runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf)
runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf, scheme, kubeClient, nodeName)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/container_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"

"github.com/containerd/nri/pkg/api"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -217,6 +219,10 @@ type ContainerContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (c *ContainerContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: Don't record pod by container level
}

func (c *ContainerContext) FromNri(pod *api.PodSandbox, container *api.Container) {
c.Request.FromNri(pod, container)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/host_qos_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package protocol

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

ext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -49,6 +51,10 @@ type HostAppContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (c *HostAppContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: don't support record pod by host level
}

func (c *HostAppContext) FromReconciler(hostAppSpec *slov1alpha1.HostApplicationSpec) {
c.Request.FromReconciler(hostAppSpec)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/kubeqos_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package protocol

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
Expand Down Expand Up @@ -46,6 +47,10 @@ type KubeQOSContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (k *KubeQOSContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: Don't record pods by QoS
}

func (k *KubeQOSContext) FromReconciler(kubeQOS corev1.PodQOSClass) {
k.Request.FromReconciler(kubeQOS)
}
Expand Down
50 changes: 45 additions & 5 deletions pkg/koordlet/runtimehooks/protocol/pod_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package protocol

import (
"fmt"
"sort"

"github.com/containerd/nri/pkg/api"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
recutil "k8s.io/client-go/tools/record/util"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -122,15 +125,52 @@ func (p *PodRequest) FromReconciler(podMeta *statesinformer.PodMeta) {
}
}

type RecorderEvent struct {
HookName string
MsgFmt string
Reason string
EventType string
}

type PodResponse struct {
Resources Resources
}

type PodContext struct {
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
RecorderEvents []RecorderEvent
}

func (p *PodContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
// Noraml, Warning => RecordEvent
events := make(map[string]RecorderEvent)
for _, event := range p.RecorderEvents {
if !recutil.ValidateEventType(event.EventType) {
klog.Warningf("EventType is not valid %v", event)
continue
}

e := event
if _, ok := events[event.EventType]; ok {
e.MsgFmt += "-" + event.MsgFmt
e.Reason += "-" + event.Reason
}
events[event.EventType] = e
}

eventTypes := make([]string, 0, len(events))
for eventType := range events {
eventTypes = append(eventTypes, eventType)
}
sort.Strings(eventTypes)

for _, eventType := range eventTypes {
event := events[eventType]
r.Eventf(pod, eventType, event.Reason, event.MsgFmt)
}
}

func (p *PodResponse) ProxyDone(resp *runtimeapi.PodSandboxHookResponse) {
Expand Down
84 changes: 84 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/pod_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"testing"

"github.com/containerd/nri/pkg/api"
"github.com/golang/mock/gomock"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/koordinator-sh/koordinator/apis/extension"
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/testutil"
)

func TestPodContext_FromNri(t *testing.T) {
Expand Down Expand Up @@ -174,3 +177,84 @@ func TestPodContext_NriRemoveDone(t *testing.T) {
})
}
}

func TestPodContext_RecordEvent(t *testing.T) {
type fields struct {
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
RecorderEvents []RecorderEvent
}
type args struct {
pod *corev1.Pod
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "EventType is not valid",
fields: fields{
Request: PodRequest{},
Response: PodResponse{},
executor: nil,
updaters: nil,
RecorderEvents: []RecorderEvent{
{EventType: "test"},
},
},
args: args{
pod: nil,
},
},
{
name: "EventType is valid",
fields: fields{
Request: PodRequest{},
Response: PodResponse{},
executor: nil,
updaters: nil,
RecorderEvents: []RecorderEvent{
{
HookName: "resctrl",
EventType: corev1.EventTypeNormal,
MsgFmt: "test",
Reason: "test",
},
{
HookName: "cpuset",
EventType: corev1.EventTypeNormal,
MsgFmt: "test",
Reason: "test",
},
{
HookName: "resctrl",
EventType: corev1.EventTypeWarning,
MsgFmt: "test",
Reason: "test",
},
},
},
},
}
pod := testutil.MockTestPod(apiext.QoSBE, "test_be_pod")
// env
ctl := gomock.NewController(t)
defer ctl.Finish()

fakeRecorder := &testutil.FakeRecorder{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &PodContext{
Request: tt.fields.Request,
Response: tt.fields.Response,
executor: tt.fields.executor,
updaters: tt.fields.updaters,
RecorderEvents: tt.fields.RecorderEvents,
}
p.RecordEvent(fakeRecorder, pod)
})
}
}
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1/resource"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
Expand All @@ -33,6 +34,7 @@ type HooksProtocol interface {
ReconcilerDone(executor resourceexecutor.ResourceUpdateExecutor)
Update()
GetUpdaters() []resourceexecutor.ResourceUpdater
RecordEvent(r record.EventRecorder, pod *corev1.Pod)
}

type hooksProtocolBuilder struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -301,13 +302,15 @@ type Context struct {
StatesInformer statesinformer.StatesInformer
Executor resourceexecutor.ResourceUpdateExecutor
ReconcileInterval time.Duration
EventRecorder record.EventRecorder
}

func NewReconciler(ctx Context) Reconciler {
r := &reconciler{
podUpdated: make(chan struct{}, 1),
executor: ctx.Executor,
reconcileInterval: ctx.ReconcileInterval,
eventRecorder: ctx.EventRecorder,
}
// TODO register individual pod event
ctx.StatesInformer.RegisterCallbacks(statesinformer.RegisterTypeAllPods, "runtime-hooks-reconciler",
Expand All @@ -321,6 +324,7 @@ type reconciler struct {
podUpdated chan struct{}
executor resourceexecutor.ResourceUpdateExecutor
reconcileInterval time.Duration
eventRecorder record.EventRecorder
}

func (c *reconciler) Run(stopCh <-chan struct{}) error {
Expand Down Expand Up @@ -420,6 +424,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
klog.V(5).Infof("calling reconcile function %v for pod %v finished",
r.description, podMeta.Key())
}
podCtx.RecordEvent(c.eventRecorder, podMeta.Pod)
}

for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel {
Expand Down
11 changes: 10 additions & 1 deletion pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package runtimehooks
import (
"fmt"

corev1 "k8s.io/api/core/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -82,7 +87,10 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error {
return nil
}

func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, error) {
func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config, schema *apiruntime.Scheme, kubeClient clientset.Interface, nodeName string) (RuntimeHook, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(schema, corev1.EventSource{Component: "koordlet-runtimehook", Host: nodeName})
failurePolicy, err := config.GetFailurePolicyType(cfg.RuntimeHooksFailurePolicy)
if err != nil {
return nil, err
Expand Down Expand Up @@ -136,6 +144,7 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
StatesInformer: si,
Executor: e,
ReconcileInterval: cfg.RuntimeHookReconcileInterval,
EventRecorder: recorder,
}

newPluginOptions := hooks.Options{
Expand Down
7 changes: 6 additions & 1 deletion pkg/koordlet/runtimehooks/runtimehooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"

"github.com/koordinator-sh/koordinator/pkg/features"
mockstatesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer"
Expand Down Expand Up @@ -116,7 +118,10 @@ func Test_runtimeHook_Run(t *testing.T) {
defer ctrl.Finish()
si := mockstatesinformer.NewMockStatesInformer(ctrl)
si.EXPECT().RegisterCallbacks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
r, err := NewRuntimeHook(si, tt.fields.config)
scheme := apiruntime.NewScheme()
kubeClient := &kubernetes.Clientset{}
nodeName := "test-node"
r, err := NewRuntimeHook(si, tt.fields.config, scheme, kubeClient, nodeName)
assert.NoError(t, err)
stop := make(chan struct{})

Expand Down