Skip to content

🌱 Handlers: Use low priority when object is unchanged and priority queue #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions pkg/handler/enqueue_mapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package handler

import (
"context"
"reflect"

"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
Expand Down Expand Up @@ -81,7 +83,15 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)

var lowPriority bool
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.Object) {
clientObjectEvent := event.CreateEvent{Object: any(evt.Object).(client.Object)}
if isObjectUnchanged(clientObjectEvent) {
lowPriority = true
}
}
e.mapAndEnqueue(ctx, q, evt.Object, reqs, lowPriority)
}

// Update implements EventHandler.
Expand All @@ -90,9 +100,13 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
evt event.TypedUpdateEvent[object],
q workqueue.TypedRateLimitingInterface[request],
) {
var lowPriority bool
if reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) && isPriorityQueue(q) && !isNil(evt.ObjectOld) && !isNil(evt.ObjectNew) {
lowPriority = any(evt.ObjectOld).(client.Object).GetResourceVersion() == any(evt.ObjectNew).(client.Object).GetResourceVersion()
}
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs, lowPriority)
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs, lowPriority)
}

// Delete implements EventHandler.
Expand All @@ -102,7 +116,7 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Delete(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
}

// Generic implements EventHandler.
Expand All @@ -112,14 +126,26 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Generic(
q workqueue.TypedRateLimitingInterface[request],
) {
reqs := map[request]empty{}
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
e.mapAndEnqueue(ctx, q, evt.Object, reqs, false)
}

func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(
ctx context.Context,
q workqueue.TypedRateLimitingInterface[request],
o object,
reqs map[request]empty,
unchanged bool,
) {
for _, req := range e.toRequests(ctx, o) {
_, ok := reqs[req]
if !ok {
q.Add(req)
if unchanged {
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(priorityqueue.AddOpts{
Priority: LowPriority,
}, req)
} else {
q.Add(req)
}
reqs[req] = empty{}
}
}
Expand Down
76 changes: 56 additions & 20 deletions pkg/handler/eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package handler

import (
"context"
"reflect"
"time"

"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -108,10 +109,40 @@ type TypedFuncs[object any, request comparable] struct {
GenericFunc func(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
}

func isPriorityQueue[request comparable](q workqueue.TypedRateLimitingInterface[request]) bool {
_, ok := q.(priorityqueue.PriorityQueue[request])
return ok
}

// Create implements EventHandler.
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.CreateFunc != nil {
h.CreateFunc(ctx, e, q)
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.Object) {
h.CreateFunc(ctx, e, q)
return
}
wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,
// We already know that we have a priority queue, that event.Object implements
// client.Object and that its not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
// We construct a new event typed to client.Object because isObjectUnchanged
// is a generic and hence has to know at compile time the type of the event
// it gets. We only figure that out at runtime though, but we know for sure
// that it implements client.Object at this point so we can hardcode the event
// type to that.
evt := event.CreateEvent{Object: any(e.Object).(client.Object)}
var priority int
if isObjectUnchanged(evt) {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
}
h.CreateFunc(ctx, e, wq)
}
}

Expand All @@ -125,7 +156,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
// Update implements EventHandler.
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
if h.UpdateFunc != nil {
h.UpdateFunc(ctx, e, q)
if !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) || !isPriorityQueue(q) || isNil(e.ObjectOld) || isNil(e.ObjectNew) {
h.UpdateFunc(ctx, e, q)
return
}

wq := workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: q,
// We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
// client.Object and that they are not nil
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
var priority int
if any(e.ObjectOld).(client.Object).GetResourceVersion() == any(e.ObjectNew).(client.Object).GetResourceVersion() {
priority = LowPriority
}
q.(priorityqueue.PriorityQueue[request]).AddWithOpts(
priorityqueue.AddOpts{Priority: priority},
item,
)
},
}
h.UpdateFunc(ctx, e, wq)
}
}

Expand All @@ -142,25 +193,10 @@ const LowPriority = -100
// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u TypedEventHandler[object, request]) TypedEventHandler[object, request] {
// TypedFuncs already implements this so just wrap
return TypedFuncs[object, request]{
CreateFunc: func(ctx context.Context, tce event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
// Due to how the handlers are factored, we have to wrap the workqueue to be able
// to inject custom behavior.
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
addToQueueCreate(q, tce, item)
},
})
},
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
TypedRateLimitingInterface: trli,
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
addToQueueUpdate(q, tue, item)
},
})
},
CreateFunc: u.Create,
UpdateFunc: u.Update,
DeleteFunc: u.Delete,
GenericFunc: u.Generic,
}
Expand Down
Loading
Loading