Skip to content

Commit 1517ffb

Browse files
Merge pull request #116623 from lavalamp/xfrmr
change where transformers are called Kubernetes-commit: 76d351065e504e23b7962158ef0e9067b6bd96ed
2 parents 308e6b1 + c3b84f0 commit 1517ffb

File tree

6 files changed

+135
-30
lines changed

6 files changed

+135
-30
lines changed

tools/cache/controller.go

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -394,17 +394,6 @@ func NewIndexerInformer(
394394
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
395395
}
396396

397-
// TransformFunc allows for transforming an object before it will be processed
398-
// and put into the controller cache and before the corresponding handlers will
399-
// be called on it.
400-
// TransformFunc (similarly to ResourceEventHandler functions) should be able
401-
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown
402-
//
403-
// The most common usage pattern is to clean-up some parts of the object to
404-
// reduce component memory usage if a given component doesn't care about them.
405-
// given controller doesn't care for them
406-
type TransformFunc func(interface{}) (interface{}, error)
407-
408397
// NewTransformingInformer returns a Store and a controller for populating
409398
// the store while also providing event notifications. You should only used
410399
// the returned Store for Get/List operations; Add/Modify/Deletes will cause
@@ -452,20 +441,12 @@ func processDeltas(
452441
// Object which receives event notifications from the given deltas
453442
handler ResourceEventHandler,
454443
clientState Store,
455-
transformer TransformFunc,
456444
deltas Deltas,
457445
isInInitialList bool,
458446
) error {
459447
// from oldest to newest
460448
for _, d := range deltas {
461449
obj := d.Object
462-
if transformer != nil {
463-
var err error
464-
obj, err = transformer(obj)
465-
if err != nil {
466-
return err
467-
}
468-
}
469450

470451
switch d.Type {
471452
case Sync, Replaced, Added, Updated:
@@ -517,6 +498,7 @@ func newInformer(
517498
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
518499
KnownObjects: clientState,
519500
EmitDeltaTypeReplaced: true,
501+
Transformer: transformer,
520502
})
521503

522504
cfg := &Config{
@@ -528,7 +510,7 @@ func newInformer(
528510

529511
Process: func(obj interface{}, isInInitialList bool) error {
530512
if deltas, ok := obj.(Deltas); ok {
531-
return processDeltas(h, clientState, transformer, deltas, isInInitialList)
513+
return processDeltas(h, clientState, deltas, isInInitialList)
532514
}
533515
return errors.New("object given as Process argument is not Deltas")
534516
},

tools/cache/controller_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
"testing"
2424
"time"
2525

26-
"k8s.io/api/core/v1"
26+
v1 "k8s.io/api/core/v1"
2727
apiequality "k8s.io/apimachinery/pkg/api/equality"
2828
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2929
"k8s.io/apimachinery/pkg/runtime"
@@ -32,7 +32,7 @@ import (
3232
"k8s.io/apimachinery/pkg/watch"
3333
fcache "k8s.io/client-go/tools/cache/testing"
3434

35-
"github.com/google/gofuzz"
35+
fuzz "github.com/google/gofuzz"
3636
)
3737

3838
func Example() {

tools/cache/delta_fifo.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type DeltaFIFOOptions struct {
5151
// When true, `Replaced` events will be sent for items passed to a Replace() call.
5252
// When false, `Sync` events will be sent instead.
5353
EmitDeltaTypeReplaced bool
54+
55+
// If set, will be called for objects before enqueueing them. Please
56+
// see the comment on TransformFunc for details.
57+
Transformer TransformFunc
5458
}
5559

5660
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -129,8 +133,32 @@ type DeltaFIFO struct {
129133
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
130134
// DeltaType when Replace() is called (to preserve backwards compat).
131135
emitDeltaTypeReplaced bool
136+
137+
// Called with every object if non-nil.
138+
transformer TransformFunc
132139
}
133140

141+
// TransformFunc allows for transforming an object before it will be processed.
142+
// TransformFunc (similarly to ResourceEventHandler functions) should be able
143+
// to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
144+
//
145+
// New in v1.27: In such cases, the contained object will already have gone
146+
// through the transform object separately (when it was added / updated prior
147+
// to the delete), so the TransformFunc can likely safely ignore such objects
148+
// (i.e., just return the input object).
149+
//
150+
// The most common usage pattern is to clean-up some parts of the object to
151+
// reduce component memory usage if a given component doesn't care about them.
152+
//
153+
// New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
154+
// sees the object before any other actor, and it is now safe to mutate the
155+
// object in place instead of making a copy.
156+
//
157+
// Note that TransformFunc is called while inserting objects into the
158+
// notification queue and is therefore extremely performance sensitive; please
159+
// do not do anything that will take a long time.
160+
type TransformFunc func(interface{}) (interface{}, error)
161+
134162
// DeltaType is the type of a change (addition, deletion, etc)
135163
type DeltaType string
136164

@@ -227,6 +255,7 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
227255
knownObjects: opts.KnownObjects,
228256

229257
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
258+
transformer: opts.Transformer,
230259
}
231260
f.cond.L = &f.lock
232261
return f
@@ -415,6 +444,21 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
415444
if err != nil {
416445
return KeyError{obj, err}
417446
}
447+
448+
// Every object comes through this code path once, so this is a good
449+
// place to call the transform func. If obj is a
450+
// DeletedFinalStateUnknown tombstone, then the containted inner object
451+
// will already have gone through the transformer, but we document that
452+
// this can happen. In cases involving Replace(), such an object can
453+
// come through multiple times.
454+
if f.transformer != nil {
455+
var err error
456+
obj, err = f.transformer(obj)
457+
if err != nil {
458+
return err
459+
}
460+
}
461+
418462
oldDeltas := f.items[id]
419463
newDeltas := append(oldDeltas, Delta{actionType, obj})
420464
newDeltas = dedupDeltas(newDeltas)

tools/cache/delta_fifo_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,88 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
327327
}
328328
}
329329

330+
type rvAndXfrm struct {
331+
rv int
332+
xfrm int
333+
}
334+
335+
func TestDeltaFIFO_transformer(t *testing.T) {
336+
mk := func(name string, rv int) testFifoObject {
337+
return mkFifoObj(name, &rvAndXfrm{rv, 0})
338+
}
339+
xfrm := TransformFunc(func(obj interface{}) (interface{}, error) {
340+
switch v := obj.(type) {
341+
case testFifoObject:
342+
v.val.(*rvAndXfrm).xfrm++
343+
case DeletedFinalStateUnknown:
344+
if x := v.Obj.(testFifoObject).val.(*rvAndXfrm).xfrm; x != 1 {
345+
return nil, fmt.Errorf("object has been transformed wrong number of times: %#v", obj)
346+
}
347+
default:
348+
return nil, fmt.Errorf("unexpected object: %#v", obj)
349+
}
350+
return obj, nil
351+
})
352+
353+
must := func(err error) {
354+
if err != nil {
355+
t.Fatal(err)
356+
}
357+
}
358+
359+
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
360+
KeyFunction: testFifoObjectKeyFunc,
361+
Transformer: xfrm,
362+
})
363+
must(f.Add(mk("foo", 10)))
364+
must(f.Add(mk("bar", 11)))
365+
must(f.Update(mk("foo", 12)))
366+
must(f.Delete(mk("foo", 15)))
367+
must(f.Replace([]interface{}{}, ""))
368+
must(f.Add(mk("bar", 16)))
369+
must(f.Replace([]interface{}{}, ""))
370+
371+
// Should be empty
372+
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
373+
t.Errorf("Expected %+v, got %+v", e, a)
374+
}
375+
376+
for i := 0; i < 2; i++ {
377+
obj, err := f.Pop(func(o interface{}, isInInitialList bool) error { return nil })
378+
if err != nil {
379+
t.Fatalf("got nothing on try %v?", i)
380+
}
381+
obj = obj.(Deltas).Newest().Object
382+
switch v := obj.(type) {
383+
case testFifoObject:
384+
if v.name != "foo" {
385+
t.Errorf("expected regular deletion of foo, got %q", v.name)
386+
}
387+
rx := v.val.(*rvAndXfrm)
388+
if rx.rv != 15 {
389+
t.Errorf("expected last message, got %#v", obj)
390+
}
391+
if rx.xfrm != 1 {
392+
t.Errorf("obj %v transformed wrong number of times.", obj)
393+
}
394+
case DeletedFinalStateUnknown:
395+
tf := v.Obj.(testFifoObject)
396+
rx := tf.val.(*rvAndXfrm)
397+
if tf.name != "bar" {
398+
t.Errorf("expected tombstone deletion of bar, got %q", tf.name)
399+
}
400+
if rx.rv != 16 {
401+
t.Errorf("expected last message, got %#v", obj)
402+
}
403+
if rx.xfrm != 1 {
404+
t.Errorf("tombstoned obj %v transformed wrong number of times.", obj)
405+
}
406+
default:
407+
t.Errorf("unknown item %#v", obj)
408+
}
409+
}
410+
}
411+
330412
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
331413
f := NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: testFifoObjectKeyFunc})
332414
f.Add(mkFifoObj("foo", 10))

tools/cache/shared_informer.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,7 @@ type SharedInformer interface {
205205
//
206206
// Must be set before starting the informer.
207207
//
208-
// Note: Since the object given to the handler may be already shared with
209-
// other goroutines, it is advisable to copy the object being
210-
// transform before mutating it at all and returning the copy to prevent
211-
// data races.
208+
// Please see the comment on TransformFunc for more details.
212209
SetTransform(handler TransformFunc) error
213210

214211
// IsStopped reports whether the informer has already been stopped.
@@ -465,6 +462,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
465462
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
466463
KnownObjects: s.indexer,
467464
EmitDeltaTypeReplaced: true,
465+
Transformer: s.transform,
468466
})
469467

470468
cfg := &Config{
@@ -637,7 +635,7 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool
637635
defer s.blockDeltas.Unlock()
638636

639637
if deltas, ok := obj.(Deltas); ok {
640-
return processDeltas(s, s.indexer, s.transform, deltas, isInInitialList)
638+
return processDeltas(s, s.indexer, deltas, isInInitialList)
641639
}
642640
return errors.New("object given as Process argument is not Deltas")
643641
}

tools/cache/shared_informer_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,8 @@ func TestSharedInformerTransformer(t *testing.T) {
406406
name := pod.GetName()
407407

408408
if upper := strings.ToUpper(name); upper != name {
409-
copied := pod.DeepCopyObject().(*v1.Pod)
410-
copied.SetName(upper)
411-
return copied, nil
409+
pod.SetName(upper)
410+
return pod, nil
412411
}
413412
}
414413
return obj, nil

0 commit comments

Comments
 (0)