From f4ad346f74a56bbacf62028a2653ce72eea8a645 Mon Sep 17 00:00:00 2001 From: Stefan Bueringer Date: Sun, 16 Mar 2025 10:48:52 +0100 Subject: [PATCH] Leverage Informer OnAdd IsInInitialList MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Stefan Büringer buringerst@vmware.com --- pkg/cache/multi_namespace_cache.go | 13 ++----- pkg/controller/controllertest/util.go | 52 +++------------------------ pkg/event/event.go | 3 ++ pkg/handler/enqueue_mapped.go | 5 ++- pkg/handler/eventhandler.go | 18 ++-------- pkg/handler/eventhandler_test.go | 9 +++-- pkg/internal/source/event_handler.go | 18 ++++------ pkg/internal/source/internal_test.go | 18 +++++----- pkg/internal/source/kind.go | 2 +- pkg/source/source.go | 2 +- 10 files changed, 38 insertions(+), 102 deletions(-) diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f033f85e77..525d93e0ab 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -337,18 +337,11 @@ type handlerRegistration struct { handles map[string]toolscache.ResourceEventHandlerRegistration } -type syncer interface { - HasSynced() bool -} - // HasSynced asserts that the handler has been called for the full initial state of the informer. -// This uses syncer to be compatible between client-go 1.27+ and older versions when the interface changed. func (h handlerRegistration) HasSynced() bool { - for _, reg := range h.handles { - if s, ok := reg.(syncer); ok { - if !s.HasSynced() { - return false - } + for _, h := range h.handles { + if !h.HasSynced() { + return false } } return true diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 0b9c43c347..2c9a248899 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -34,49 +34,7 @@ type FakeInformer struct { // RunCount is incremented each time RunInformersAndControllers is called RunCount int - handlers []eventHandlerWrapper -} - -type modernResourceEventHandler interface { - OnAdd(obj interface{}, isInInitialList bool) - OnUpdate(oldObj, newObj interface{}) - OnDelete(obj interface{}) -} - -type legacyResourceEventHandler interface { - OnAdd(obj interface{}) - OnUpdate(oldObj, newObj interface{}) - OnDelete(obj interface{}) -} - -// eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older. -// The interface was changed in these versions. -type eventHandlerWrapper struct { - handler any -} - -func (e eventHandlerWrapper) OnAdd(obj interface{}) { - if m, ok := e.handler.(modernResourceEventHandler); ok { - m.OnAdd(obj, false) - return - } - e.handler.(legacyResourceEventHandler).OnAdd(obj) -} - -func (e eventHandlerWrapper) OnUpdate(oldObj, newObj interface{}) { - if m, ok := e.handler.(modernResourceEventHandler); ok { - m.OnUpdate(oldObj, newObj) - return - } - e.handler.(legacyResourceEventHandler).OnUpdate(oldObj, newObj) -} - -func (e eventHandlerWrapper) OnDelete(obj interface{}) { - if m, ok := e.handler.(modernResourceEventHandler); ok { - m.OnDelete(obj) - return - } - e.handler.(legacyResourceEventHandler).OnDelete(obj) + handlers []cache.ResourceEventHandler } // AddIndexers does nothing. TODO(community): Implement this. @@ -101,19 +59,19 @@ func (f *FakeInformer) HasSynced() bool { // AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { - f.handlers = append(f.handlers, eventHandlerWrapper{handler}) + f.handlers = append(f.handlers, handler) return nil, nil } // AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) { - f.handlers = append(f.handlers, eventHandlerWrapper{handler}) + f.handlers = append(f.handlers, handler) return nil, nil } // AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration. func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) { - f.handlers = append(f.handlers, eventHandlerWrapper{handler}) + f.handlers = append(f.handlers, handler) return nil, nil } @@ -129,7 +87,7 @@ func (f *FakeInformer) RunWithContext(_ context.Context) { // Add fakes an Add event for obj. func (f *FakeInformer) Add(obj metav1.Object) { for _, h := range f.handlers { - h.OnAdd(obj) + h.OnAdd(obj, false) } } diff --git a/pkg/event/event.go b/pkg/event/event.go index 81229fc2d3..82b1793f53 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -40,6 +40,9 @@ type GenericEvent = TypedGenericEvent[client.Object] type TypedCreateEvent[object any] struct { // Object is the object from the event Object object + + // IsInInitialList is true if the Create event was triggered by the initial list. + IsInInitialList bool } // TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated diff --git a/pkg/handler/enqueue_mapped.go b/pkg/handler/enqueue_mapped.go index be97fa3781..fe78f21a2c 100644 --- a/pkg/handler/enqueue_mapped.go +++ b/pkg/handler/enqueue_mapped.go @@ -86,9 +86,8 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create( reqs := map[request]empty{} var lowPriority bool - if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) { - clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)} - if isObjectUnchanged(clientObjectEvent) { + if isPriorityQueue(q) && !isNil(evt.Object) { + if evt.IsInInitialList { lowPriority = true } } diff --git a/pkg/handler/eventhandler.go b/pkg/handler/eventhandler.go index e41b69d2b6..7e63030371 100644 --- a/pkg/handler/eventhandler.go +++ b/pkg/handler/eventhandler.go @@ -19,7 +19,6 @@ package handler import ( "context" "reflect" - "time" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" @@ -132,14 +131,8 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr // We already know that we have a priority queue, that event.Object implements // client.Object and that its not nil addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) { - // We construct a new event typed to client.Object because isObjectUnchanged - // is a generic and hence has to know at compile time the type of the event - // it gets. We only figure that out at runtime though, but we know for sure - // that it implements client.Object at this point so we can hardcode the event - // type to that. - evt := event.CreateEvent{Object: any(e.Object).(client.Object)} var priority int - if isObjectUnchanged(evt) { + if e.IsInInitialList { priority = LowPriority } q.(priorityqueue.PriorityQueue[request]).AddWithOpts( @@ -217,13 +210,6 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) { w.addFunc(item, w.TypedRateLimitingInterface) } -// isObjectUnchanged checks if the object in a create event is unchanged, for example because -// we got it in our initial listwatch. The heuristic it uses is to check if the object is older -// than one minute. -func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool { - return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute)) -} - // addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler // for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request] func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) { @@ -234,7 +220,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate } var priority int - if isObjectUnchanged(evt) { + if evt.IsInInitialList { priority = LowPriority } priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item) diff --git a/pkg/handler/eventhandler_test.go b/pkg/handler/eventhandler_test.go index e4dfb44977..38f76dab37 100644 --- a/pkg/handler/eventhandler_test.go +++ b/pkg/handler/eventhandler_test.go @@ -831,7 +831,7 @@ var _ = Describe("Eventhandler", func() { } for _, test := range handlerPriorityTests { When("handler is "+test.name, func() { - It("should lower the priority of a create request for an object that was created more than one minute in the past", func() { + It("should lower the priority of a create request for an object that was part of the initial list", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{ @@ -843,19 +843,21 @@ var _ = Describe("Eventhandler", func() { test.handler().Create(ctx, event.CreateEvent{ Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{ - Name: "my-pod", + Name: "my-pod", + CreationTimestamp: metav1.Now(), OwnerReferences: []metav1.OwnerReference{{ Kind: "Pod", Name: "my-pod", }}, }}, + IsInInitialList: true, }, wq) Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority})) Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}})) }) - It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() { + It("should not lower the priority of a create request for an object that was not part of the initial list", func() { actualOpts := priorityqueue.AddOpts{} var actualRequests []reconcile.Request wq := &fakePriorityQueue{ @@ -874,6 +876,7 @@ var _ = Describe("Eventhandler", func() { Name: "my-pod", }}, }}, + IsInInitialList: false, }, wq) Expect(actualOpts).To(Equal(priorityqueue.AddOpts{})) diff --git a/pkg/internal/source/event_handler.go b/pkg/internal/source/event_handler.go index 38432a1a79..7cc8c51555 100644 --- a/pkg/internal/source/event_handler.go +++ b/pkg/internal/source/event_handler.go @@ -32,6 +32,8 @@ import ( var log = logf.RuntimeLog.WithName("source").WithName("EventHandler") +var _ cache.ResourceEventHandler = &EventHandler[client.Object, any]{} + // NewEventHandler creates a new EventHandler. func NewEventHandler[object client.Object, request comparable]( ctx context.Context, @@ -57,19 +59,11 @@ type EventHandler[object client.Object, request comparable] struct { predicates []predicate.TypedPredicate[object] } -// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs -// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27 -func (e *EventHandler[object, request]) HandlerFuncs() cache.ResourceEventHandlerFuncs { - return cache.ResourceEventHandlerFuncs{ - AddFunc: e.OnAdd, - UpdateFunc: e.OnUpdate, - DeleteFunc: e.OnDelete, - } -} - // OnAdd creates CreateEvent and calls Create on EventHandler. -func (e *EventHandler[object, request]) OnAdd(obj interface{}) { - c := event.TypedCreateEvent[object]{} +func (e *EventHandler[object, request]) OnAdd(obj interface{}, isInInitialList bool) { + c := event.TypedCreateEvent[object]{ + IsInInitialList: isInInitialList, + } // Pull Object out of the object if o, ok := obj.(object); ok { diff --git a/pkg/internal/source/internal_test.go b/pkg/internal/source/internal_test.go index 4de8628ebf..6e4e8924da 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/internal/source/internal_test.go @@ -97,7 +97,7 @@ var _ = Describe("Internal", func() { defer GinkgoRecover() Expect(evt.Object).To(Equal(pod)) } - instance.OnAdd(pod) + instance.OnAdd(pod, false) }) It("should used Predicates to filter CreateEvents", func() { @@ -105,14 +105,14 @@ var _ = Describe("Internal", func() { predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, }) set = false - instance.OnAdd(pod) + instance.OnAdd(pod, false) Expect(set).To(BeFalse()) set = false instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{ predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) - instance.OnAdd(pod) + instance.OnAdd(pod, false) Expect(set).To(BeTrue()) set = false @@ -120,7 +120,7 @@ var _ = Describe("Internal", func() { predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, }) - instance.OnAdd(pod) + instance.OnAdd(pod, false) Expect(set).To(BeFalse()) set = false @@ -128,7 +128,7 @@ var _ = Describe("Internal", func() { predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) - instance.OnAdd(pod) + instance.OnAdd(pod, false) Expect(set).To(BeFalse()) set = false @@ -136,16 +136,16 @@ var _ = Describe("Internal", func() { predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }}, }) - instance.OnAdd(pod) + instance.OnAdd(pod, false) Expect(set).To(BeTrue()) }) It("should not call Create EventHandler if the object is not a runtime.Object", func() { - instance.OnAdd(&metav1.ObjectMeta{}) + instance.OnAdd(&metav1.ObjectMeta{}, false) }) It("should not call Create EventHandler if the object does not have metadata", func() { - instance.OnAdd(FooRuntimeObject{}) + instance.OnAdd(FooRuntimeObject{}, false) }) It("should create an UpdateEvent", func() { @@ -281,7 +281,7 @@ var _ = Describe("Internal", func() { instance.OnDelete(tombstone) }) It("should ignore objects without meta", func() { - instance.OnAdd(Foo{}) + instance.OnAdd(Foo{}, false) instance.OnUpdate(Foo{}, Foo{}) instance.OnDelete(Foo{}) }) diff --git a/pkg/internal/source/kind.go b/pkg/internal/source/kind.go index 6844239180..2854244523 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/internal/source/kind.go @@ -91,7 +91,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type return } - _, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs(), toolscache.HandlerOptions{ + _, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{ Logger: &logKind, }) if err != nil { diff --git a/pkg/source/source.go b/pkg/source/source.go index ed59925eef..c2c2dc4e07 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -286,7 +286,7 @@ func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimiting return errors.New("must specify Informer.Handler") } - _, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs(), toolscache.HandlerOptions{ + _, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates), toolscache.HandlerOptions{ Logger: &logInformer, }) if err != nil {