Skip to content

Commit cc38429

Browse files
inteonFillZpp
andcommitted
Support shutdown watches dynamically
Co-authored-by: FillZpp <[email protected]> Signed-off-by: Tim Ramlot <[email protected]>
1 parent 1e343fa commit cc38429

File tree

12 files changed

+870
-102
lines changed

12 files changed

+870
-102
lines changed

pkg/cache/informertest/fake_cache.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/runtime/schema"
2424
"k8s.io/client-go/kubernetes/scheme"
2525
toolscache "k8s.io/client-go/tools/cache"
26+
"k8s.io/utils/ptr"
2627

2728
"sigs.k8s.io/controller-runtime/pkg/cache"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -89,10 +90,7 @@ func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) e
8990

9091
// WaitForCacheSync implements Informers.
9192
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
92-
if c.Synced == nil {
93-
return true
94-
}
95-
return *c.Synced
93+
return ptr.Deref(c.Synced, true)
9694
}
9795

9896
// FakeInformerFor implements Informers.
@@ -116,7 +114,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
116114
return informer, nil
117115
}
118116

119-
c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
117+
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: ptr.Deref(c.Synced, true)}
120118
return c.InformersByGVK[gvk], nil
121119
}
122120

pkg/controller/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ type Controller interface {
7474
// in response to the events.
7575
Watch(src source.Source) error
7676

77+
// StopWatch stops watching a source that was previously registered by Watch().
78+
//
79+
// StopWatch may be called multiple times, even concurrently. All such calls will
80+
// block until all goroutines have terminated.
81+
StopWatch(src source.Source) error
82+
7783
// Start starts the controller. Start blocks until the context is closed or a
7884
// controller has an error starting.
7985
Start(ctx context.Context) error

pkg/controller/controller_integration_test.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,102 @@ var _ = Describe("controller", func() {
172172
List(context.Background(), &controllertest.UnconventionalListTypeList{})
173173
Expect(err).NotTo(HaveOccurred())
174174
})
175+
176+
It("should not reconcile after watch is stopped", func() {
177+
By("Creating the Manager")
178+
cm, err := manager.New(cfg, manager.Options{})
179+
Expect(err).NotTo(HaveOccurred())
180+
181+
By("Creating the Controller")
182+
instance, err := controller.New("foo-controller", cm, controller.Options{
183+
Reconciler: reconcile.Func(
184+
func(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
185+
reconciled <- request
186+
return reconcile.Result{}, nil
187+
}),
188+
})
189+
Expect(err).NotTo(HaveOccurred())
190+
191+
By("Watching Resources")
192+
deploySource := source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})
193+
err = instance.Watch(
194+
deploySource,
195+
)
196+
Expect(err).NotTo(HaveOccurred())
197+
198+
By("Starting the Manager")
199+
ctx, cancel := context.WithCancel(context.Background())
200+
defer cancel()
201+
go func() {
202+
defer GinkgoRecover()
203+
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
204+
}()
205+
206+
deploymentDefinition := &appsv1.Deployment{
207+
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
208+
Spec: appsv1.DeploymentSpec{
209+
Selector: &metav1.LabelSelector{
210+
MatchLabels: map[string]string{"foo": "bar"},
211+
},
212+
Template: corev1.PodTemplateSpec{
213+
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
214+
Spec: corev1.PodSpec{
215+
Containers: []corev1.Container{
216+
{
217+
Name: "nginx",
218+
Image: "nginx",
219+
SecurityContext: &corev1.SecurityContext{
220+
Privileged: truePtr(),
221+
},
222+
},
223+
},
224+
},
225+
},
226+
},
227+
}
228+
deployment := deploymentDefinition.DeepCopy()
229+
expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{
230+
Namespace: "default",
231+
Name: "deployment-name",
232+
}}
233+
234+
By("Invoking Reconciling for Create")
235+
deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
236+
Expect(err).NotTo(HaveOccurred())
237+
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
238+
239+
By("Stopping the deployment watch")
240+
Expect(instance.StopWatch(deploySource)).NotTo(HaveOccurred())
241+
242+
By("Test No Reconciling for Update")
243+
newDeployment := deployment.DeepCopy()
244+
newDeployment.Labels = map[string]string{"foo": "bar"}
245+
_, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{})
246+
Expect(err).NotTo(HaveOccurred())
247+
Consistently(reconciled).ShouldNot(Receive())
248+
249+
By("Test No Reconciling for Delete")
250+
err = clientset.AppsV1().Deployments("default").
251+
Delete(ctx, "deployment-name", metav1.DeleteOptions{})
252+
Expect(err).NotTo(HaveOccurred())
253+
Consistently(reconciled).ShouldNot(Receive())
254+
255+
By("Try starting the old deployment watch")
256+
Expect(instance.Watch(deploySource)).To(MatchError("cannot start an already started Kind source"))
257+
258+
By("Starting a new deployment watch")
259+
deploySource = source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})
260+
Expect(instance.Watch(deploySource)).NotTo(HaveOccurred())
261+
262+
deployment = deploymentDefinition.DeepCopy()
263+
264+
By("Invoking Reconciling for Update")
265+
newDeployment = deployment.DeepCopy()
266+
newDeployment.Labels = map[string]string{"foo": "bar"}
267+
_, err = clientset.AppsV1().Deployments("default").Create(ctx, newDeployment, metav1.CreateOptions{})
268+
Expect(err).NotTo(HaveOccurred())
269+
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
270+
})
175271
})
176272
})
177273

pkg/controller/controllertest/util.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package controllertest
1818

1919
import (
20+
"fmt"
21+
"sync"
2022
"time"
2123

2224
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,7 +35,8 @@ type FakeInformer struct {
3335
// RunCount is incremented each time RunInformersAndControllers is called
3436
RunCount int
3537

36-
handlers []eventHandlerWrapper
38+
mu sync.RWMutex
39+
handlers []*eventHandlerWrapper
3740
}
3841

3942
type modernResourceEventHandler interface {
@@ -51,7 +54,8 @@ type legacyResourceEventHandler interface {
5154
// eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older.
5255
// The interface was changed in these versions.
5356
type eventHandlerWrapper struct {
54-
handler any
57+
handler any
58+
hasSynced bool
5559
}
5660

5761
func (e eventHandlerWrapper) OnAdd(obj interface{}) {
@@ -78,6 +82,10 @@ func (e eventHandlerWrapper) OnDelete(obj interface{}) {
7882
e.handler.(legacyResourceEventHandler).OnDelete(obj)
7983
}
8084

85+
func (e eventHandlerWrapper) HasSynced() bool {
86+
return e.hasSynced
87+
}
88+
8189
// AddIndexers does nothing. TODO(community): Implement this.
8290
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
8391
return nil
@@ -98,10 +106,13 @@ func (f *FakeInformer) HasSynced() bool {
98106
return f.Synced
99107
}
100108

101-
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
109+
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers.
102110
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
103-
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
104-
return nil, nil
111+
f.mu.Lock()
112+
defer f.mu.Unlock()
113+
eh := &eventHandlerWrapper{handler, f.Synced}
114+
f.handlers = append(f.handlers, eh)
115+
return eh, nil
105116
}
106117

107118
// Run implements the Informer interface. Increments f.RunCount.
@@ -111,20 +122,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {
111122

112123
// Add fakes an Add event for obj.
113124
func (f *FakeInformer) Add(obj metav1.Object) {
125+
f.mu.RLock()
126+
defer f.mu.RUnlock()
114127
for _, h := range f.handlers {
115128
h.OnAdd(obj)
116129
}
117130
}
118131

119132
// Update fakes an Update event for obj.
120133
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
134+
f.mu.RLock()
135+
defer f.mu.RUnlock()
121136
for _, h := range f.handlers {
122137
h.OnUpdate(oldObj, newObj)
123138
}
124139
}
125140

126141
// Delete fakes an Delete event for obj.
127142
func (f *FakeInformer) Delete(obj metav1.Object) {
143+
f.mu.RLock()
144+
defer f.mu.RUnlock()
128145
for _, h := range f.handlers {
129146
h.OnDelete(obj)
130147
}
@@ -135,8 +152,25 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
135152
return nil, nil
136153
}
137154

138-
// RemoveEventHandler does nothing. TODO(community): Implement this.
155+
// RemoveEventHandler removes an EventHandler to the fake Informers.
139156
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
157+
eh, ok := handle.(*eventHandlerWrapper)
158+
if !ok {
159+
return fmt.Errorf("invalid registration type %t", handle)
160+
}
161+
162+
f.mu.Lock()
163+
defer f.mu.Unlock()
164+
165+
handlers := make([]*eventHandlerWrapper, 0, len(f.handlers))
166+
for _, h := range f.handlers {
167+
if h == eh {
168+
continue
169+
}
170+
handlers = append(handlers, h)
171+
}
172+
f.handlers = handlers
173+
140174
return nil
141175
}
142176

0 commit comments

Comments
 (0)