Skip to content

Commit 7272af7

Browse files
committed
Let users specify their own EventBroadcaster for the manager
1 parent e9f3cfc commit 7272af7

File tree

4 files changed

+17
-9
lines changed

4 files changed

+17
-9
lines changed

pkg/internal/recorder/recorder.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@ type provider struct {
3939
}
4040

4141
// NewProvider create a new Provider instance.
42-
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
42+
func NewProvider(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
4343
clientSet, err := kubernetes.NewForConfig(config)
4444
if err != nil {
4545
return nil, fmt.Errorf("failed to init clientSet: %v", err)
4646
}
4747

48-
p := &provider{scheme: scheme, logger: logger}
49-
p.eventBroadcaster = record.NewBroadcaster()
48+
p := &provider{scheme: scheme, logger: logger, eventBroadcaster: broadcaster}
5049
p.eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: clientSet.CoreV1().Events("")})
5150
p.eventBroadcaster.StartEventWatcher(
5251
func(e *corev1.Event) {

pkg/internal/recorder/recorder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
var _ = Describe("recorder.Provider", func() {
2828
Describe("NewProvider", func() {
2929
It("should return a provider instance and a nil error.", func() {
30-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
30+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, nil)
3131
Expect(provider).NotTo(BeNil())
3232
Expect(err).NotTo(HaveOccurred())
3333
})
@@ -36,13 +36,13 @@ var _ = Describe("recorder.Provider", func() {
3636
// Invalid the config
3737
cfg1 := *cfg
3838
cfg1.ContentType = "invalid-type"
39-
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{})
39+
_, err := recorder.NewProvider(&cfg1, scheme.Scheme, tlog.NullLogger{}, nil)
4040
Expect(err.Error()).To(ContainSubstring("failed to init clientSet"))
4141
})
4242
})
4343
Describe("GetEventRecorder", func() {
4444
It("should return a recorder instance.", func() {
45-
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{})
45+
provider, err := recorder.NewProvider(cfg, scheme.Scheme, tlog.NullLogger{}, nil)
4646
Expect(err).NotTo(HaveOccurred())
4747

4848
recorder := provider.GetEventRecorderFor("test")

pkg/manager/manager.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,12 @@ type Options struct {
160160
// use the cache for reads and the client for writes.
161161
NewClient NewClientFunc
162162

163+
// EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API
164+
// Use this to customize the event correlator and spam filter
165+
EventBroadcaster record.EventBroadcaster
166+
163167
// Dependency injection for testing
164-
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error)
168+
newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error)
165169
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
166170
newMetricsListener func(addr string) (net.Listener, error)
167171
}
@@ -231,7 +235,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
231235
// Create the recorder provider to inject event recorders for the components.
232236
// TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific
233237
// to the particular controller that it's being injected into, rather than a generic one like is here.
234-
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"))
238+
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, log.WithName("events"), options.EventBroadcaster)
235239
if err != nil {
236240
return nil, err
237241
}
@@ -342,5 +346,9 @@ func setOptionsDefaults(options Options) Options {
342346
options.RetryPeriod = &retryPeriod
343347
}
344348

349+
if options.EventBroadcaster == nil {
350+
options.EventBroadcaster = record.NewBroadcaster()
351+
}
352+
345353
return options
346354
}

pkg/manager/manager_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/client-go/rest"
3232
"k8s.io/client-go/tools/leaderelection/resourcelock"
33+
"k8s.io/client-go/tools/record"
3334
"sigs.k8s.io/controller-runtime/pkg/cache"
3435
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -111,7 +112,7 @@ var _ = Describe("manger.Manager", func() {
111112

112113
It("should return an error it can't create a recorder.Provider", func(done Done) {
113114
m, err := New(cfg, Options{
114-
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger) (recorder.Provider, error) {
115+
newRecorderProvider: func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, broadcaster record.EventBroadcaster) (recorder.Provider, error) {
115116
return nil, fmt.Errorf("expected error")
116117
},
117118
})

0 commit comments

Comments
 (0)