Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ func NewCommand() *cobra.Command {
eventBroadcaster.StartLogging(func(format string, args ...interface{}) { mlog.V(3).Info(fmt.Sprintf(format, args...)) })
eventBroadcaster.StartRecordingToSink(&clientv1.EventSinkImpl{Interface: cl.CoreV1().Events("")})

scheme := trustapi.GlobalScheme
mgr, err := ctrl.NewManager(opts.RestConfig, ctrl.Options{
Scheme: trustapi.GlobalScheme,
Scheme: scheme,
EventBroadcaster: eventBroadcaster,
LeaderElection: true,
LeaderElectionNamespace: opts.Bundle.Namespace,
NewCache: bundle.NewCacheFunc(opts.Bundle),
ClientDisableCacheFor: bundle.ClientDisableCacheFor(),
NewCache: bundle.NewCacheFunc(scheme, opts.Bundle),
LeaderElectionID: "trust-manager-leader-election",
LeaderElectionReleaseOnCancel: true,
ReadinessEndpointName: opts.ReadyzPath,
Expand Down
11 changes: 3 additions & 8 deletions pkg/bundle/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -194,12 +195,6 @@ func (b *bundle) mustBundleList(ctx context.Context) *trustapi.BundleList {

// NewCacheFunc will return a multi-scoped controller-runtime NewCacheFunc
// where Secret resources will only be watched within the trust Namespace.
func NewCacheFunc(opts Options) cache.NewCacheFunc {
return internal.NewMultiScopedCache(opts.Namespace, []schema.GroupKind{{Kind: "Secret"}})
}

// ClientDisableCacheFor returns resources which should only be watched within
// the Trust Namespace, and not at the cluster level.
func ClientDisableCacheFor() []client.Object {
return []client.Object{new(corev1.Secret)}
func NewCacheFunc(scheme *runtime.Scheme, opts Options) cache.NewCacheFunc {
return internal.NewMultiScopedCache(scheme, opts.Namespace, []schema.GroupKind{{Kind: "Secret"}})
}
62 changes: 53 additions & 9 deletions pkg/bundle/internal/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package internal

import (
"context"
"fmt"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
Expand All @@ -36,6 +38,9 @@ var _ cache.Cache = &multiScopedCache{}
// namespace, whilst the other Namespaced resources in all namespaces.
// It wraps both the default and Namespaced controller-runtime Cache.
type multiScopedCache struct {
// scheme is the scheme used to determine the GVK for objects.
scheme *runtime.Scheme

// namespacedInformers is the set of resource types that should only be
// watched in the namespace pool.
namespacedInformers []schema.GroupKind
Expand All @@ -51,23 +56,23 @@ type multiScopedCache struct {
// resources in the given namespace. namespacedInformers is the set of resource
// types which should only be watched in the given namespace.
// namespacedInformers expects Namespaced resource types.
func NewMultiScopedCache(namespace string, namespacedInformers []schema.GroupKind) cache.NewCacheFunc {
func NewMultiScopedCache(scheme *runtime.Scheme, namespace string, namespacedInformers []schema.GroupKind) cache.NewCacheFunc {
return func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
namespacedCache, err := cache.New(config, cache.Options{
Scheme: opts.Scheme,
Mapper: opts.Mapper,
Namespace: namespace,
Resync: opts.Resync,
SelectorsByObject: opts.SelectorsByObject,
})
namespacedOpts := opts
namespacedOpts.Namespace = namespace
clusterOpts := opts
clusterOpts.Namespace = ""

namespacedCache, err := cache.New(config, namespacedOpts)
if err != nil {
return nil, err
}
clusterCache, err := cache.New(config, opts)
clusterCache, err := cache.New(config, clusterOpts)
if err != nil {
return nil, err
}
return &multiScopedCache{
scheme: scheme,
namespacedCache: namespacedCache,
clusterCache: clusterCache,
namespacedInformers: namespacedInformers,
Expand All @@ -77,6 +82,9 @@ func NewMultiScopedCache(namespace string, namespacedInformers []schema.GroupKin

// GetInformer returns the underlying cache's GetInformer based on resource type.
func (b *multiScopedCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
if err := setGroupVersionKind(b.scheme, obj); err != nil {
return nil, err
}
return b.cacheFromGVK(obj.GetObjectKind().GroupVersionKind()).GetInformer(ctx, obj)
}

Expand Down Expand Up @@ -126,26 +134,62 @@ func (b *multiScopedCache) WaitForCacheSync(ctx context.Context) bool {

// IndexField returns the underlying cache's IndexField based on resource type.
func (b *multiScopedCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
if err := setGroupVersionKind(b.scheme, obj); err != nil {
return err
}
return b.cacheFromGVK(obj.GetObjectKind().GroupVersionKind()).IndexField(ctx, obj, field, extractValue)
}

// Get returns the underlying cache's Get based on resource type.
func (b *multiScopedCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
if err := setGroupVersionKind(b.scheme, obj); err != nil {
return err
}
return b.cacheFromGVK(obj.GetObjectKind().GroupVersionKind()).Get(ctx, key, obj)
}

// List returns the underlying cache's List based on resource type.
func (b *multiScopedCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
if err := setGroupVersionKind(b.scheme, list); err != nil {
return err
}
return b.cacheFromGVK(list.GetObjectKind().GroupVersionKind()).List(ctx, list, opts...)
}

// cacheFromGVK returns either the cluster or namespaced cache, based on the
// resource type given.
func (b *multiScopedCache) cacheFromGVK(gvk schema.GroupVersionKind) cache.Cache {
if gvk.Group == "" && gvk.Kind == "" {
panic("The Group and/or Kind must be set")
}

for _, namespacedInformer := range b.namespacedInformers {
if namespacedInformer.Group == gvk.Group && namespacedInformer.Kind == gvk.Kind {
return b.namespacedCache
}
}
return b.clusterCache
}

// setGroupVersionKind populates the Group and Kind fields of obj using the
// scheme type registry.
// Inspired by https://github.com/kubernetes-sigs/controller-runtime/issues/1735#issuecomment-984763173
func setGroupVersionKind(scheme *runtime.Scheme, obj runtime.Object) error {
gvk := obj.GetObjectKind().GroupVersionKind()
if gvk.Group != "" || gvk.Kind != "" {
return nil // eg. in case of PartialMetadata, we don't want to overwrite the Group/ Kind
}

gvks, unversioned, err := scheme.ObjectKinds(obj)
if err != nil {
return err
}
if unversioned {
return fmt.Errorf("ObjectKinds unexpectedly returned unversioned: %#v", unversioned)
}
if len(gvks) != 1 {
return fmt.Errorf("ObjectKinds unexpectedly returned zero or multiple gvks: %#v", gvks)
}
obj.GetObjectKind().SetGroupVersionKind(gvks[0])
return nil
}
5 changes: 3 additions & 2 deletions test/integration/bundle/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ var _ = Describe("Integration", func() {
DefaultPackageLocation: tmpFileName,
}

scheme := trustapi.GlobalScheme
mgr, err = ctrl.NewManager(env.Config, ctrl.Options{
Scheme: trustapi.GlobalScheme,
NewCache: bundle.NewCacheFunc(opts),
Scheme: scheme,
NewCache: bundle.NewCacheFunc(scheme, opts),
// TODO: can we disable leader election here? The mgr goroutine prints extra output we probably don't need
// and it might not be valuable to enable leader election here
LeaderElection: true,
Expand Down