@@ -18,6 +18,7 @@ package handler
1818
1919import (
2020 "context"
21+ "reflect"
2122 "time"
2223
2324 "k8s.io/client-go/util/workqueue"
@@ -108,10 +109,46 @@ type TypedFuncs[object any, request comparable] struct {
108109 GenericFunc func (context.Context , event.TypedGenericEvent [object ], workqueue.TypedRateLimitingInterface [request ])
109110}
110111
112+ var typeForClientObject = reflect .TypeFor [client.Object ]()
113+
114+ func implementsClientObject [object any ]() bool {
115+ return reflect .TypeFor [object ]().Implements (typeForClientObject )
116+ }
117+
118+ func isPriorityQueue [request comparable ](q workqueue.TypedRateLimitingInterface [request ]) bool {
119+ _ , ok := q .(priorityqueue.PriorityQueue [request ])
120+ return ok
121+ }
122+
111123// Create implements EventHandler.
112124func (h TypedFuncs [object , request ]) Create (ctx context.Context , e event.TypedCreateEvent [object ], q workqueue.TypedRateLimitingInterface [request ]) {
113125 if h .CreateFunc != nil {
114- h .CreateFunc (ctx , e , q )
126+ if ! implementsClientObject [object ]() || ! isPriorityQueue (q ) || isNil (e .Object ) {
127+ h .CreateFunc (ctx , e , q )
128+ return
129+ }
130+ wq := workqueueWithCustomAddFunc [request ]{
131+ TypedRateLimitingInterface : q ,
132+ // We already know that we have a priority queue, that event.Object implements
133+ // client.Object and that its not nil
134+ addFunc : func (item request , q workqueue.TypedRateLimitingInterface [request ]) {
135+ // We construct a new event typed to client.Object because isObjectUnchanged
136+ // is a generic and hence has to know at compile time the type of the event
137+ // it gets. We only figure that out at runtime though, but we know for sure
138+ // that it implements client.Object at this point so we can hardcode the event
139+ // type to that.
140+ evt := event.CreateEvent {Object : any (e .Object ).(client.Object )}
141+ var priority int
142+ if isObjectUnchanged (evt ) {
143+ priority = LowPriority
144+ }
145+ q .(priorityqueue.PriorityQueue [request ]).AddWithOpts (
146+ priorityqueue.AddOpts {Priority : priority },
147+ item ,
148+ )
149+ },
150+ }
151+ h .CreateFunc (ctx , e , wq )
115152 }
116153}
117154
@@ -125,7 +162,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
125162// Update implements EventHandler.
126163func (h TypedFuncs [object , request ]) Update (ctx context.Context , e event.TypedUpdateEvent [object ], q workqueue.TypedRateLimitingInterface [request ]) {
127164 if h .UpdateFunc != nil {
128- h .UpdateFunc (ctx , e , q )
165+ if ! implementsClientObject [object ]() || ! isPriorityQueue (q ) || isNil (e .ObjectOld ) || isNil (e .ObjectNew ) {
166+ h .UpdateFunc (ctx , e , q )
167+ return
168+ }
169+
170+ wq := workqueueWithCustomAddFunc [request ]{
171+ TypedRateLimitingInterface : q ,
172+ // We already know that we have a priority queue, that event.ObjectOld and ObjectNew implement
173+ // client.Object and that they are not nil
174+ addFunc : func (item request , q workqueue.TypedRateLimitingInterface [request ]) {
175+ var priority int
176+ if any (e .ObjectOld ).(client.Object ).GetResourceVersion () == any (e .ObjectNew ).(client.Object ).GetResourceVersion () {
177+ priority = LowPriority
178+ }
179+ q .(priorityqueue.PriorityQueue [request ]).AddWithOpts (
180+ priorityqueue.AddOpts {Priority : priority },
181+ item ,
182+ )
183+ },
184+ }
185+ h .UpdateFunc (ctx , e , wq )
129186 }
130187}
131188
@@ -142,43 +199,10 @@ const LowPriority = -100
142199// WithLowPriorityWhenUnchanged reduces the priority of events stemming from the initial listwatch or from a resync if
143200// and only if a priorityqueue.PriorityQueue is used. If not, it does nothing.
144201func WithLowPriorityWhenUnchanged [object client.Object , request comparable ](u TypedEventHandler [object , request ]) TypedEventHandler [object , request ] {
202+ // TypedFuncs already implements this so just wrap
145203 return TypedFuncs [object , request ]{
146- CreateFunc : func (ctx context.Context , tce event.TypedCreateEvent [object ], trli workqueue.TypedRateLimitingInterface [request ]) {
147- // Due to how the handlers are factored, we have to wrap the workqueue to be able
148- // to inject custom behavior.
149- u .Create (ctx , tce , workqueueWithCustomAddFunc [request ]{
150- TypedRateLimitingInterface : trli ,
151- addFunc : func (item request , q workqueue.TypedRateLimitingInterface [request ]) {
152- priorityQueue , isPriorityQueue := q .(priorityqueue.PriorityQueue [request ])
153- if ! isPriorityQueue {
154- q .Add (item )
155- return
156- }
157- var priority int
158- if isObjectUnchanged (tce ) {
159- priority = LowPriority
160- }
161- priorityQueue .AddWithOpts (priorityqueue.AddOpts {Priority : priority }, item )
162- },
163- })
164- },
165- UpdateFunc : func (ctx context.Context , tue event.TypedUpdateEvent [object ], trli workqueue.TypedRateLimitingInterface [request ]) {
166- u .Update (ctx , tue , workqueueWithCustomAddFunc [request ]{
167- TypedRateLimitingInterface : trli ,
168- addFunc : func (item request , q workqueue.TypedRateLimitingInterface [request ]) {
169- priorityQueue , isPriorityQueue := q .(priorityqueue.PriorityQueue [request ])
170- if ! isPriorityQueue {
171- q .Add (item )
172- return
173- }
174- var priority int
175- if tue .ObjectOld .GetResourceVersion () == tue .ObjectNew .GetResourceVersion () {
176- priority = LowPriority
177- }
178- priorityQueue .AddWithOpts (priorityqueue.AddOpts {Priority : priority }, item )
179- },
180- })
181- },
204+ CreateFunc : u .Create ,
205+ UpdateFunc : u .Update ,
182206 DeleteFunc : u .Delete ,
183207 GenericFunc : u .Generic ,
184208 }
@@ -199,3 +223,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199223func isObjectUnchanged [object client.Object ](e event.TypedCreateEvent [object ]) bool {
200224 return e .Object .GetCreationTimestamp ().Time .Before (time .Now ().Add (- time .Minute ))
201225}
226+
227+ // addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
228+ // for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
229+ func addToQueueCreate [T client.Object , request comparable ](q workqueue.TypedRateLimitingInterface [request ], evt event.TypedCreateEvent [T ], item request ) {
230+ priorityQueue , isPriorityQueue := q .(priorityqueue.PriorityQueue [request ])
231+ if ! isPriorityQueue {
232+ q .Add (item )
233+ return
234+ }
235+
236+ var priority int
237+ if isObjectUnchanged (evt ) {
238+ priority = LowPriority
239+ }
240+ priorityQueue .AddWithOpts (priorityqueue.AddOpts {Priority : priority }, item )
241+ }
242+
243+ // addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
244+ // for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
245+ func addToQueueUpdate [T client.Object , request comparable ](q workqueue.TypedRateLimitingInterface [request ], evt event.TypedUpdateEvent [T ], item request ) {
246+ priorityQueue , isPriorityQueue := q .(priorityqueue.PriorityQueue [request ])
247+ if ! isPriorityQueue {
248+ q .Add (item )
249+ return
250+ }
251+
252+ var priority int
253+ if evt .ObjectOld .GetResourceVersion () == evt .ObjectNew .GetResourceVersion () {
254+ priority = LowPriority
255+ }
256+ priorityQueue .AddWithOpts (priorityqueue.AddOpts {Priority : priority }, item )
257+ }
0 commit comments