Skip to content

Commit 1e343fa

Browse files
committed
move arguments from Start() to Source objects
Signed-off-by: Tim Ramlot <[email protected]>
1 parent 0cebcae commit 1e343fa

23 files changed

+474
-442
lines changed

examples/builtins/main.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,17 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
6363
entryLog.Error(err, "unable to watch ReplicaSets")
6464
os.Exit(1)
6565
}
6666

6767
// Watch Pods and enqueue owning ReplicaSet key
68-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
69-
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
68+
if err := c.Watch(source.Kind(
69+
mgr.GetCache(),
70+
&corev1.Pod{},
71+
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()),
72+
)); err != nil {
7073
entryLog.Error(err, "unable to watch Pods")
7174
os.Exit(1)
7275
}

pkg/builder/controller.go

Lines changed: 65 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,15 @@ const (
5353

5454
// Builder builds a Controller.
5555
type Builder struct {
56-
forInput ForInput
57-
ownsInput []OwnsInput
58-
watchesInput []WatchesInput
59-
mgr manager.Manager
60-
globalPredicates []predicate.Predicate
61-
ctrl controller.Controller
62-
ctrlOptions controller.Options
63-
name string
56+
forInput ForInput
57+
ownsInput []OwnsInput
58+
watchesObjectInput []WatchesObjectInput
59+
watchesSourceInput []WatchesSourceInput
60+
mgr manager.Manager
61+
globalPredicates []predicate.Predicate
62+
ctrl controller.Controller
63+
ctrlOptions controller.Options
64+
name string
6465
}
6566

6667
// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
@@ -79,7 +80,7 @@ type ForInput struct {
7980
// For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
8081
// update events by *reconciling the object*.
8182
// This is the equivalent of calling
82-
// Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
83+
// Watches(source.Kind(mgr.GetCache(), apiType, &handler.EnqueueRequestForObject{})).
8384
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
8485
if blder.forInput.object != nil {
8586
blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
@@ -120,10 +121,10 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
120121
return blder
121122
}
122123

123-
// WatchesInput represents the information set by Watches method.
124-
type WatchesInput struct {
125-
src source.Source
126-
eventHandler handler.EventHandler
124+
// WatchesObjectInput represents the information set by Watches method.
125+
type WatchesObjectInput struct {
126+
object client.Object
127+
eventhandler handler.EventHandler
127128
predicates []predicate.Predicate
128129
objectProjection objectProjection
129130
}
@@ -132,10 +133,15 @@ type WatchesInput struct {
132133
// update events by *reconciling the object* with the given EventHandler.
133134
//
134135
// This is the equivalent of calling
135-
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
136-
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
137-
src := source.Kind(blder.mgr.GetCache(), object)
138-
return blder.WatchesRawSource(src, eventHandler, opts...)
136+
// WatchesRawSource(source.Kind(scheme, object, eventhandler, opts...)).
137+
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
138+
input := WatchesObjectInput{object: object, eventhandler: eventhandler}
139+
for _, opt := range opts {
140+
opt.ApplyToWatchesObject(&input)
141+
}
142+
143+
blder.watchesObjectInput = append(blder.watchesObjectInput, input)
144+
return blder
139145
}
140146

141147
// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
@@ -165,29 +171,30 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa
165171
// In the first case, controller-runtime will create another cache for the
166172
// concrete type on top of the metadata cache; this increases memory
167173
// consumption and leads to race conditions as caches are not in sync.
168-
func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
174+
func (blder *Builder) WatchesMetadata(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
169175
opts = append(opts, OnlyMetadata)
170-
return blder.Watches(object, eventHandler, opts...)
176+
return blder.Watches(object, eventhandler, opts...)
177+
}
178+
179+
// WatchesSourceInput represents the information set by Watches method.
180+
type WatchesSourceInput struct {
181+
src source.Source
171182
}
172183

173184
// WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
174-
// Specified predicates are registered only for given source.
175185
//
176186
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
177-
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
178-
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
179-
input := WatchesInput{src: src, eventHandler: eventHandler}
180-
for _, opt := range opts {
181-
opt.ApplyToWatches(&input)
182-
}
183-
184-
blder.watchesInput = append(blder.watchesInput, input)
187+
// This method is only exposed for more advanced use cases, most users should use higher level functions.
188+
// This method does generally disregard all the global configuration set by the builder.
189+
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
190+
blder.watchesSourceInput = append(blder.watchesSourceInput, WatchesSourceInput{src: src})
185191
return blder
186192
}
187193

188194
// WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
189195
// trigger reconciliations. For example, filtering on whether the resource version has changed.
190196
// Given predicate is added for all watched objects.
197+
// The predicates are not applied to sources watched with WatchesRawSource(...).
191198
// Defaults to the empty list.
192199
func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
193200
blder.globalPredicates = append(blder.globalPredicates, p)
@@ -271,11 +278,14 @@ func (blder *Builder) doWatch() error {
271278
if err != nil {
272279
return err
273280
}
274-
src := source.Kind(blder.mgr.GetCache(), obj)
275-
hdler := &handler.EnqueueRequestForObject{}
276281
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
277282
allPredicates = append(allPredicates, blder.forInput.predicates...)
278-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
283+
src := source.Kind(
284+
blder.mgr.GetCache(),
285+
obj,
286+
handler.WithPredicates(&handler.EnqueueRequestForObject{}, allPredicates...),
287+
)
288+
if err := blder.ctrl.Watch(src); err != nil {
279289
return err
280290
}
281291
}
@@ -289,7 +299,6 @@ func (blder *Builder) doWatch() error {
289299
if err != nil {
290300
return err
291301
}
292-
src := source.Kind(blder.mgr.GetCache(), obj)
293302
opts := []handler.OwnerOption{}
294303
if !own.matchEveryOwner {
295304
opts = append(opts, handler.OnlyControllerOwner())
@@ -301,32 +310,43 @@ func (blder *Builder) doWatch() error {
301310
)
302311
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
303312
allPredicates = append(allPredicates, own.predicates...)
304-
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
313+
src := source.Kind(
314+
blder.mgr.GetCache(),
315+
obj,
316+
handler.WithPredicates(hdler, allPredicates...),
317+
)
318+
if err := blder.ctrl.Watch(src); err != nil {
305319
return err
306320
}
307321
}
308322

309323
// Do the watch requests
310-
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
324+
if len(blder.watchesObjectInput) == 0 && len(blder.watchesSourceInput) == 0 && blder.forInput.object == nil {
311325
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
312326
}
313-
for _, w := range blder.watchesInput {
314-
// If the source of this watch is of type Kind, project it.
315-
if srcKind, ok := w.src.(interface {
316-
ProjectObject(func(client.Object) (client.Object, error)) error
317-
}); ok {
318-
if err := srcKind.ProjectObject(func(o client.Object) (client.Object, error) {
319-
return blder.project(o, w.objectProjection)
320-
}); err != nil {
321-
return err
322-
}
327+
for _, w := range blder.watchesObjectInput {
328+
obj, err := blder.project(w.object, w.objectProjection)
329+
if err != nil {
330+
return err
323331
}
332+
324333
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
325334
allPredicates = append(allPredicates, w.predicates...)
326-
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
335+
src := source.Kind(
336+
blder.mgr.GetCache(), obj,
337+
handler.WithPredicates(w.eventhandler, allPredicates...),
338+
)
339+
if err := blder.ctrl.Watch(src); err != nil {
327340
return err
328341
}
329342
}
343+
344+
for _, w := range blder.watchesSourceInput {
345+
if err := blder.ctrl.Watch(w.src); err != nil {
346+
return err
347+
}
348+
}
349+
330350
return nil
331351
}
332352

pkg/builder/options.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ type OwnsOption interface {
3434
ApplyToOwns(*OwnsInput)
3535
}
3636

37-
// WatchesOption is some configuration that modifies options for a watches request.
38-
type WatchesOption interface {
39-
// ApplyToWatches applies this configuration to the given watches options.
40-
ApplyToWatches(*WatchesInput)
37+
// WatchesObjectOption is some configuration that modifies options for a watches request.
38+
type WatchesObjectOption interface {
39+
// ApplyToWatchesObject applies this configuration to the given watches options.
40+
ApplyToWatchesObject(*WatchesObjectInput)
4141
}
4242

4343
// }}}
@@ -66,14 +66,14 @@ func (w Predicates) ApplyToOwns(opts *OwnsInput) {
6666
opts.predicates = w.predicates
6767
}
6868

69-
// ApplyToWatches applies this configuration to the given WatchesInput options.
70-
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
69+
// ApplyToWatchesObject applies this configuration to the given WatchesInput options.
70+
func (w Predicates) ApplyToWatchesObject(opts *WatchesObjectInput) {
7171
opts.predicates = w.predicates
7272
}
7373

7474
var _ ForOption = &Predicates{}
7575
var _ OwnsOption = &Predicates{}
76-
var _ WatchesOption = &Predicates{}
76+
var _ WatchesObjectOption = &Predicates{}
7777

7878
// }}}
7979

@@ -94,8 +94,8 @@ func (p projectAs) ApplyToOwns(opts *OwnsInput) {
9494
opts.objectProjection = objectProjection(p)
9595
}
9696

97-
// ApplyToWatches applies this configuration to the given WatchesInput options.
98-
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
97+
// ApplyToWatchesObject applies this configuration to the given WatchesObjectInput options.
98+
func (p projectAs) ApplyToWatchesObject(opts *WatchesObjectInput) {
9999
opts.objectProjection = objectProjection(p)
100100
}
101101

@@ -132,9 +132,9 @@ var (
132132
// consumption and leads to race conditions as caches are not in sync.
133133
OnlyMetadata = projectAs(projectAsMetadata)
134134

135-
_ ForOption = OnlyMetadata
136-
_ OwnsOption = OnlyMetadata
137-
_ WatchesOption = OnlyMetadata
135+
_ ForOption = OnlyMetadata
136+
_ OwnsOption = OnlyMetadata
137+
_ WatchesObjectOption = OnlyMetadata
138138
)
139139

140140
// }}}

pkg/controller/controller.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@ import (
2525
"k8s.io/client-go/util/workqueue"
2626
"k8s.io/klog/v2"
2727

28-
"sigs.k8s.io/controller-runtime/pkg/handler"
2928
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
3029
"sigs.k8s.io/controller-runtime/pkg/manager"
31-
"sigs.k8s.io/controller-runtime/pkg/predicate"
3230
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
3331
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3432
"sigs.k8s.io/controller-runtime/pkg/source"
@@ -72,13 +70,9 @@ type Controller interface {
7270
// Reconciler is called to reconcile an object by Namespace/Name
7371
reconcile.Reconciler
7472

75-
// Watch takes events provided by a Source and uses the EventHandler to
76-
// enqueue reconcile.Requests in response to the events.
77-
//
78-
// Watch may be provided one or more Predicates to filter events before
79-
// they are given to the EventHandler. Events will be passed to the
80-
// EventHandler if all provided Predicates evaluate to true.
81-
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
73+
// Watch takes events provided by a Source and enqueues reconcile.Requests
74+
// in response to the events.
75+
Watch(src source.Source) error
8276

8377
// Start starts the controller. Start blocks until the context is closed or a
8478
// controller has an error starting.

pkg/controller/controller_integration_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,14 @@ var _ = Describe("controller", func() {
6464
Expect(err).NotTo(HaveOccurred())
6565

6666
By("Watching Resources")
67-
err = instance.Watch(
68-
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
67+
err = instance.Watch(source.Kind(
68+
cm.GetCache(),
69+
&appsv1.ReplicaSet{},
6970
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
70-
)
71+
))
7172
Expect(err).NotTo(HaveOccurred())
7273

73-
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
74+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
7475
Expect(err).NotTo(HaveOccurred())
7576

7677
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

pkg/controller/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {
7777

7878
ctx, cancel := context.WithCancel(context.Background())
7979
watchChan := make(chan event.GenericEvent, 1)
80-
watch := source.Channel(source.NewChannelBroadcaster(watchChan))
80+
watch := source.Channel(source.NewChannelBroadcaster(watchChan), &handler.EnqueueRequestForObject{})
8181
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
8282

8383
reconcileStarted := make(chan struct{})
@@ -99,7 +99,7 @@ var _ = Describe("controller.Controller", func() {
9999
Expect(err).NotTo(HaveOccurred())
100100

101101
c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
102-
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
102+
Expect(c.Watch(watch)).To(Succeed())
103103
Expect(err).NotTo(HaveOccurred())
104104

105105
go func() {

pkg/controller/example_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
7575
if err != nil {
7676
log.Error(err, "unable to watch pods")
7777
os.Exit(1)
@@ -108,7 +108,7 @@ func ExampleController_unstructured() {
108108
Version: "v1",
109109
})
110110
// Watch for Pod create / update / delete events and call Reconcile
111-
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
111+
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
112112
if err != nil {
113113
log.Error(err, "unable to watch pods")
114114
os.Exit(1)
@@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
139139
os.Exit(1)
140140
}
141141

142-
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
142+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
143143
log.Error(err, "unable to watch pods")
144144
os.Exit(1)
145145
}

0 commit comments

Comments
 (0)