Skip to content

✨ Leverage isInInitialList of ResourceEventHandler.OnAdd, Add IsInInitialList to TypedCreateEvent #3162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions pkg/cache/multi_namespace_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,18 +337,11 @@ type handlerRegistration struct {
handles map[string]toolscache.ResourceEventHandlerRegistration
}

type syncer interface {
HasSynced() bool
}

// HasSynced asserts that the handler has been called for the full initial state of the informer.
// This uses syncer to be compatible between client-go 1.27+ and older versions when the interface changed.
func (h handlerRegistration) HasSynced() bool {
for _, reg := range h.handles {
if s, ok := reg.(syncer); ok {
if !s.HasSynced() {
return false
}
for _, h := range h.handles {
if !h.HasSynced() {
return false
}
}
return true
Expand Down
52 changes: 5 additions & 47 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,7 @@ type FakeInformer struct {
// RunCount is incremented each time RunInformersAndControllers is called
RunCount int

handlers []eventHandlerWrapper
}

type modernResourceEventHandler interface {
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

type legacyResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older.
// The interface was changed in these versions.
type eventHandlerWrapper struct {
handler any
}

func (e eventHandlerWrapper) OnAdd(obj interface{}) {
if m, ok := e.handler.(modernResourceEventHandler); ok {
m.OnAdd(obj, false)
return
}
e.handler.(legacyResourceEventHandler).OnAdd(obj)
}

func (e eventHandlerWrapper) OnUpdate(oldObj, newObj interface{}) {
if m, ok := e.handler.(modernResourceEventHandler); ok {
m.OnUpdate(oldObj, newObj)
return
}
e.handler.(legacyResourceEventHandler).OnUpdate(oldObj, newObj)
}

func (e eventHandlerWrapper) OnDelete(obj interface{}) {
if m, ok := e.handler.(modernResourceEventHandler); ok {
m.OnDelete(obj)
return
}
e.handler.(legacyResourceEventHandler).OnDelete(obj)
handlers []cache.ResourceEventHandler
}

// AddIndexers does nothing. TODO(community): Implement this.
Expand All @@ -101,19 +59,19 @@ func (f *FakeInformer) HasSynced() bool {

// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
f.handlers = append(f.handlers, handler)
return nil, nil
}

// AddEventHandlerWithResyncPeriod implements the Informer interface. Adds an EventHandler to the fake Informers (ignores resyncPeriod). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, _ time.Duration) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
f.handlers = append(f.handlers, handler)
return nil, nil
}

// AddEventHandlerWithOptions implements the Informer interface. Adds an EventHandler to the fake Informers (ignores options). TODO(community): Implement Registration.
func (f *FakeInformer) AddEventHandlerWithOptions(handler cache.ResourceEventHandler, _ cache.HandlerOptions) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
f.handlers = append(f.handlers, handler)
return nil, nil
}

Expand All @@ -129,7 +87,7 @@ func (f *FakeInformer) RunWithContext(_ context.Context) {
// Add fakes an Add event for obj.
func (f *FakeInformer) Add(obj metav1.Object) {
for _, h := range f.handlers {
h.OnAdd(obj)
h.OnAdd(obj, false)
}
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type GenericEvent = TypedGenericEvent[client.Object]
type TypedCreateEvent[object any] struct {
// Object is the object from the event
Object object

// IsInInitialList is true if the Create event was triggered by the initial list.
IsInInitialList bool
}

// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
Expand Down
5 changes: 2 additions & 3 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,8 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
reqs := map[request]empty{}

var lowPriority bool
if e.objectImplementsClientObject && isPriorityQueue(q) && !isNil(evt.Object) {
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
if isObjectUnchanged(clientObjectEvent) {
if isPriorityQueue(q) && !isNil(evt.Object) {
if evt.IsInInitialList {
lowPriority = true
}
}
Expand Down
18 changes: 2 additions & 16 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package handler
import (
"context"
"reflect"
"time"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -132,14 +131,8 @@ func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCr
// We already know that we have a priority queue, that event.Object implements
// client.Object and that its not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
// We construct a new event typed to client.Object because isObjectUnchanged
// is a generic and hence has to know at compile time the type of the event
// it gets. We only figure that out at runtime though, but we know for sure
// that it implements client.Object at this point so we can hardcode the event
// type to that.
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
var priority int
if isObjectUnchanged(evt) {
if e.IsInInitialList {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
Expand Down Expand Up @@ -217,13 +210,6 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
w.addFunc(item, w.TypedRateLimitingInterface)
}

// isObjectUnchanged checks if the object in a create event is unchanged, for example because
// we got it in our initial listwatch. The heuristic it uses is to check if the object is older
// than one minute.
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
}

// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
Expand All @@ -234,7 +220,7 @@ func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRate
}

var priority int
if isObjectUnchanged(evt) {
if evt.IsInInitialList {
priority = LowPriority
}
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
Expand Down
9 changes: 6 additions & 3 deletions pkg/handler/eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,7 +831,7 @@ var _ = Describe("Eventhandler", func() {
}
for _, test := range handlerPriorityTests {
When("handler is "+test.name, func() {
It("should lower the priority of a create request for an object that was created more than one minute in the past", func() {
It("should lower the priority of a create request for an object that was part of the initial list", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
Expand All @@ -843,19 +843,21 @@ var _ = Describe("Eventhandler", func() {

test.handler().Create(ctx, event.CreateEvent{
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "my-pod",
Name: "my-pod",
CreationTimestamp: metav1.Now(),
OwnerReferences: []metav1.OwnerReference{{
Kind: "Pod",
Name: "my-pod",
}},
}},
IsInInitialList: true,
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
})

It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
It("should not lower the priority of a create request for an object that was not part of the initial list", func() {
actualOpts := priorityqueue.AddOpts{}
var actualRequests []reconcile.Request
wq := &fakePriorityQueue{
Expand All @@ -874,6 +876,7 @@ var _ = Describe("Eventhandler", func() {
Name: "my-pod",
}},
}},
IsInInitialList: false,
}, wq)

Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
Expand Down
18 changes: 6 additions & 12 deletions pkg/internal/source/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (

var log = logf.RuntimeLog.WithName("source").WithName("EventHandler")

var _ cache.ResourceEventHandler = &EventHandler[client.Object, any]{}

// NewEventHandler creates a new EventHandler.
func NewEventHandler[object client.Object, request comparable](
ctx context.Context,
Expand All @@ -57,19 +59,11 @@ type EventHandler[object client.Object, request comparable] struct {
predicates []predicate.TypedPredicate[object]
}

// HandlerFuncs converts EventHandler to a ResourceEventHandlerFuncs
// TODO: switch to ResourceEventHandlerDetailedFuncs with client-go 1.27
func (e *EventHandler[object, request]) HandlerFuncs() cache.ResourceEventHandlerFuncs {
return cache.ResourceEventHandlerFuncs{
AddFunc: e.OnAdd,
UpdateFunc: e.OnUpdate,
DeleteFunc: e.OnDelete,
}
}

// OnAdd creates CreateEvent and calls Create on EventHandler.
func (e *EventHandler[object, request]) OnAdd(obj interface{}) {
c := event.TypedCreateEvent[object]{}
func (e *EventHandler[object, request]) OnAdd(obj interface{}, isInInitialList bool) {
c := event.TypedCreateEvent[object]{
IsInInitialList: isInInitialList,
}

// Pull Object out of the object
if o, ok := obj.(object); ok {
Expand Down
18 changes: 9 additions & 9 deletions pkg/internal/source/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,55 +97,55 @@ var _ = Describe("Internal", func() {
defer GinkgoRecover()
Expect(evt.Object).To(Equal(pod))
}
instance.OnAdd(pod)
instance.OnAdd(pod, false)
})

It("should used Predicates to filter CreateEvents", func() {
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
set = false
instance.OnAdd(pod)
instance.OnAdd(pod, false)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
instance.OnAdd(pod, false)
Expect(set).To(BeTrue())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
})
instance.OnAdd(pod)
instance.OnAdd(pod, false)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
instance.OnAdd(pod, false)
Expect(set).To(BeFalse())

set = false
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
})
instance.OnAdd(pod)
instance.OnAdd(pod, false)
Expect(set).To(BeTrue())
})

It("should not call Create EventHandler if the object is not a runtime.Object", func() {
instance.OnAdd(&metav1.ObjectMeta{})
instance.OnAdd(&metav1.ObjectMeta{}, false)
})

It("should not call Create EventHandler if the object does not have metadata", func() {
instance.OnAdd(FooRuntimeObject{})
instance.OnAdd(FooRuntimeObject{}, false)
})

It("should create an UpdateEvent", func() {
Expand Down Expand Up @@ -281,7 +281,7 @@ var _ = Describe("Internal", func() {
instance.OnDelete(tombstone)
})
It("should ignore objects without meta", func() {
instance.OnAdd(Foo{})
instance.OnAdd(Foo{}, false)
instance.OnUpdate(Foo{}, Foo{})
instance.OnDelete(Foo{})
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
return
}

_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates).HandlerFuncs(), toolscache.HandlerOptions{
_, err := i.AddEventHandlerWithOptions(NewEventHandler(ctx, queue, ks.Handler, ks.Predicates), toolscache.HandlerOptions{
Logger: &logKind,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (is *Informer) Start(ctx context.Context, queue workqueue.TypedRateLimiting
return errors.New("must specify Informer.Handler")
}

_, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs(), toolscache.HandlerOptions{
_, err := is.Informer.AddEventHandlerWithOptions(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates), toolscache.HandlerOptions{
Logger: &logInformer,
})
if err != nil {
Expand Down
Loading