Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions controllers/vaultdynamicsecret_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func (r *VaultDynamicSecretReconciler) Reconcile(ctx context.Context, req ctrl.R
} else if !vault.IsLeaseNotFoundError(err) {
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonSecretLeaseRenewalError,
"Could not renew lease, lease_id=%s, err=%s", leaseID, err)
} else if !vault.IsForbiddenError(err) {
} else if vault.IsForbiddenError(err) {
logger.V(consts.LogLevelWarning).Info("Tainting client", "err", err)
vClient.Taint()
}
syncReason = consts.ReasonSecretLeaseRenewalError
Expand All @@ -275,6 +276,10 @@ func (r *VaultDynamicSecretReconciler) Reconcile(ctx context.Context, req ctrl.R
secretLease, staticCredsUpdated, err := r.syncSecret(ctx, vClient, o, transOption)
if err != nil {
r.SyncRegistry.Add(req.NamespacedName)
if vault.IsForbiddenError(err) {
logger.V(consts.LogLevelWarning).Info("Tainting client", "err", err)
vClient.Taint()
}
entry, _ := r.BackOffRegistry.Get(req.NamespacedName)
horizon := entry.NextBackOff()
r.Recorder.Eventf(o, corev1.EventTypeWarning, consts.ReasonSecretSyncError,
Expand Down Expand Up @@ -504,7 +509,7 @@ func (r *VaultDynamicSecretReconciler) SetupWithManager(mgr ctrl.Manager, opts c

r.ClientFactory.RegisterClientCallbackHandler(
vault.ClientCallbackHandler{
On: vault.ClientCallbackOnLifetimeWatcherDone,
On: vault.ClientCallbackOnLifetimeWatcherDone | vault.ClientCallbackOnCacheRemoval,
Callback: r.vaultClientCallback,
},
)
Expand Down
40 changes: 31 additions & 9 deletions internal/vault/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

type ClientOptions struct {
SkipRenewal bool
WatcherDoneCh chan<- Client
WatcherDoneCh chan<- *ClientCallbackHandlerRequest
}

func defaultClientOptions() *ClientOptions {
Expand Down Expand Up @@ -129,11 +129,10 @@ func NewClientFromStorageEntry(ctx context.Context, client ctrlclient.Client, en
return nil, fmt.Errorf("restored client's cacheKey %s does not match expected %s", cacheKey, entry.CacheKey)
}

if err := c.Validate(); err != nil {
return nil, err
}
c.Taint()
defer c.Untaint()

if _, err := c.Read(ctx, NewReadRequest("auth/token/lookup-self", nil)); err != nil {
if err := c.Validate(ctx); err != nil {
return nil, err
}

Expand All @@ -154,7 +153,7 @@ type Client interface {
Restore(context.Context, *api.Secret) error
GetTokenSecret() *api.Secret
CheckExpiry(int64) (bool, error)
Validate() error
Validate(ctx context.Context) error
GetVaultAuthObj() *secretsv1beta1.VaultAuth
GetVaultConnectionObj() *secretsv1beta1.VaultConnection
GetCredentialProvider() provider.CredentialProviderBase
Expand Down Expand Up @@ -184,7 +183,7 @@ type defaultClient struct {
inClosing bool
closed bool
lastWatcherErr error
watcherDoneCh chan<- Client
watcherDoneCh chan<- *ClientCallbackHandlerRequest
tainted bool
once sync.Once
mu sync.RWMutex
Expand Down Expand Up @@ -220,7 +219,7 @@ func (c *defaultClient) Taint() {
// Validate the client, returning an error for any validation failures.
// Typically, an invalid Client would be discarded and replaced with a new
// instance.
func (c *defaultClient) Validate() error {
func (c *defaultClient) Validate(ctx context.Context) error {
c.mu.RLock()
defer c.mu.RUnlock()

Expand All @@ -245,6 +244,16 @@ func (c *defaultClient) Validate() error {
return errors.New("client token expired")
}

if c.client == nil {
return errors.New("client not set")
}

if c.tainted {
if _, err := c.Read(ctx, NewReadRequest("auth/token/lookup-self", nil)); err != nil {
return fmt.Errorf("tainted client is invalid: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -492,7 +501,10 @@ func (c *defaultClient) startLifetimeWatcher(ctx context.Context) error {
if c.watcherDoneCh != nil {
if !c.inClosing {
logger.V(consts.LogLevelTrace).Info("Writing to watcherDone channel")
c.watcherDoneCh <- c
c.watcherDoneCh <- &ClientCallbackHandlerRequest{
Client: c,
On: ClientCallbackOnLifetimeWatcherDone,
}
} else {
logger.V(consts.LogLevelTrace).Info("In closing, not writing to watcherDone channel")
}
Expand Down Expand Up @@ -759,12 +771,22 @@ func (c *defaultClient) init(ctx context.Context, client ctrlclient.Client,
}

func (c *defaultClient) observeTime(ts time.Time, operation string) {
if c.connObj == nil {
// should not happen on a properly initialized Client
return
}

clientOperationTimes.WithLabelValues(operation, ctrlclient.ObjectKeyFromObject(c.connObj).String()).Observe(
time.Since(ts).Seconds(),
)
}

func (c *defaultClient) incrementOperationCounter(operation string, err error) {
if c.connObj == nil {
// should not happen on a properly initialized Client
return
}

vaultConn := ctrlclient.ObjectKeyFromObject(c.connObj).String()
clientOperations.WithLabelValues(operation, vaultConn).Inc()
if err != nil {
Expand Down
166 changes: 113 additions & 53 deletions internal/vault/client_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,43 @@ import (
)

// ClientCallbackOn is an enumeration of possible client callback events.
type ClientCallbackOn int
type ClientCallbackOn uint32

const (
NamePrefixVCC = "vso-cc-"

// ClientCallbackOnLifetimeWatcherDone is a ClientCallbackOn that handles client
// lifetime watcher done events.
ClientCallbackOnLifetimeWatcherDone ClientCallbackOn = iota
NamePrefixVCC = "vso-cc-"
ClientCallbackOnLifetimeWatcherDone ClientCallbackOn = 1 << iota
// ClientCallbackOnCacheRemoval is a ClientCallbackOn that handles client cache removal events.
ClientCallbackOnCacheRemoval
)

func (o ClientCallbackOn) String() string {
switch o {
case ClientCallbackOnLifetimeWatcherDone:
return "LifetimeWatcherDone"
case ClientCallbackOnCacheRemoval:
return "CacheRemoval"
default:
return "Unknown"
}
}

// ClientCallbackHandlerRequest is a struct that contains a ClientCallbackOn
// enumeration and a Client. It is used to send requests to the
// ClientCallbackHandler. The ClientCallbackHandler will call the ClientCallback
// function with the Client and the ClientCallbackOn enumeration. On is the event
// that occurred, and Client is the Client that the event occurred on. On is
// applied as a bitmask, so multiple events can be sent in a single request.
// For example:
// Setting On = ClientCallbackOnLifetimeWatcherDone | ClientCallbackOnCacheRemoval
// would match either event.
type ClientCallbackHandlerRequest struct {
On ClientCallbackOn
Client Client
}

// ClientCallback is a function type that takes a context, a Client, and an error as parameters.
// It is used in the context of a ClientCallbackHandler.
type ClientCallback func(ctx context.Context, c Client)
Expand Down Expand Up @@ -99,7 +127,7 @@ type cachingClientFactory struct {
pruneStorageOnEvict bool
ctrlClient ctrlclient.Client
clientCallbacks []ClientCallbackHandler
callbackHandlerCh chan Client
callbackHandlerCh chan *ClientCallbackHandlerRequest
mu sync.RWMutex
onceDoWatcher sync.Once
callbackHandlerCancel context.CancelFunc
Expand Down Expand Up @@ -182,7 +210,10 @@ func (m *cachingClientFactory) prune(ctx context.Context, client ctrlclient.Clie
if !skipCallbacks {
for _, c := range pruned {
// the callback handler will remove the client from the storage
m.callbackHandlerCh <- c
m.callbackHandlerCh <- &ClientCallbackHandlerRequest{
On: ClientCallbackOnCacheRemoval,
Client: c,
}
}
} else {
// for all cache entries pruned, remove the corresponding storage entries.
Expand Down Expand Up @@ -404,38 +435,31 @@ func (m *cachingClientFactory) Get(ctx context.Context, client ctrlclient.Client
if ok {
// return the Client from the cache if it is still Valid
tainted = c.Tainted()
logger.V(consts.LogLevelTrace).Info("Got client from cache", "clientID", c.ID(), "tainted", tainted)
if tainted {
// if the Client is tainted, we need to validate its token.
if _, err := c.Read(ctx, NewReadRequest("auth/token/lookup-self", nil)); err == nil {
defer c.Untaint()
tainted = false
if err := c.Validate(); err == nil {
return namespacedClient(c)
}
logger.V(consts.LogLevelTrace).Info("Got client from cache",
"clientID", c.ID(), "tainted", tainted)
if err := c.Validate(ctx); err != nil {
logger.V(consts.LogLevelDebug).Error(err, "Invalid client",
"tainted", tainted)
m.callbackHandlerCh <- &ClientCallbackHandlerRequest{
On: ClientCallbackOnCacheRemoval,
Client: c,
}
} else if err := c.Validate(); err == nil {
} else {
c.Untaint()
return namespacedClient(c)
}

logger.V(consts.LogLevelDebug).Error(err, "Invalid client",
"tainted", tainted)

// remove the parent Client from the cache in order to prune any of its clones.
m.cache.Remove(cacheKey)
} else {
logger.V(consts.LogLevelTrace).Info("Client not found in cache", "cacheKey", fmt.Sprintf("%#v", cacheKey))
}

if !ok && m.storageEnabled() {
// try and restore from Client storage cache, if properly configured to do so.
restored, err := m.restoreClientFromCacheKey(ctx, client, cacheKey)
if restored != nil {
return namespacedClient(restored)
}
if m.storageEnabled() {
// try and restore from Client storage cache, if properly configured to do so.
restored, err := m.restoreClientFromCacheKey(ctx, client, cacheKey)
if restored != nil {
return namespacedClient(restored)
}

if !IsStorageEntryNotFoundErr(err) {
logger.Error(err, "Failed to restore client from storage")
if !IsStorageEntryNotFoundErr(err) {
logger.Error(err, "Failed to restore client from storage")
}
}
}

Expand Down Expand Up @@ -642,7 +666,7 @@ func (m *cachingClientFactory) storageEncryptionClient(ctx context.Context, clie
// ensure that the cached Vault Client is not expired, and if it is then call storageEncryptionClient() again.
// This operation should be safe since we are setting m.clientCacheKeyEncrypt to empty string,
// so there should be no risk of causing a maximum recursion error.
if reason := c.Validate(); reason != nil {
if reason := c.Validate(ctx); reason != nil {
m.logger.V(consts.LogLevelWarning).Info("Restored Vault client is invalid, recreating it",
"cacheKey", m.clientCacheKeyEncrypt, "reason", reason)

Expand Down Expand Up @@ -677,7 +701,7 @@ func (m *cachingClientFactory) startClientCallbackHandler(ctx context.Context) {

go func() {
if m.callbackHandlerCh == nil {
m.callbackHandlerCh = make(chan Client)
m.callbackHandlerCh = make(chan *ClientCallbackHandlerRequest)
}
defer func() {
close(m.callbackHandlerCh)
Expand All @@ -689,16 +713,20 @@ func (m *cachingClientFactory) startClientCallbackHandler(ctx context.Context) {
case <-callbackCtx.Done():
logger.Info("Client callback handler done")
return
case c, stillOpen := <-m.callbackHandlerCh:
case req, stillOpen := <-m.callbackHandlerCh:
if !stillOpen {
logger.Info("Client callback handler channel closed")
return
}
if c.IsClone() {
if req == nil {
continue
}

cacheKey, err := c.GetCacheKey()
if req.Client.IsClone() {
continue
}

cacheKey, err := req.Client.GetCacheKey()
if err != nil {
logger.Error(err, "Invalid client, client callbacks not executed",
"cacheKey", cacheKey)
Expand All @@ -708,30 +736,62 @@ func (m *cachingClientFactory) startClientCallbackHandler(ctx context.Context) {
// remove the client from the cache, it will be recreated when a reconciler
// requests it.
logger.V(consts.LogLevelDebug).Info("Removing client from cache", "cacheKey", cacheKey)
m.cache.Remove(cacheKey)
if m.storageEnabled() {
if _, err := m.pruneStorage(ctx, m.ctrlClient, cacheKey); err != nil {
logger.Info("Warning: failed to prune storage", "cacheKey", cacheKey)
if req.On&ClientCallbackOnLifetimeWatcherDone != 0 {
m.cache.Remove(cacheKey)
if m.storageEnabled() {
if _, err := m.pruneStorage(ctx, m.ctrlClient, cacheKey); err != nil {
logger.Info("Warning: failed to prune storage", "cacheKey", cacheKey)
}
}
}

for idx, cbReq := range m.clientCallbacks {
if cbReq.On != ClientCallbackOnLifetimeWatcherDone {
continue
}

logger.Info("Calling client callback on lifetime watcher done",
"index", idx, "cacheKey", cacheKey, "clientID", c.ID())
// call in a go routine to avoid blocking the channel
go func(cbReq ClientCallbackHandler) {
cbReq.Callback(ctx, c)
}(cbReq)
}
m.callClientCallbacks(ctx, req.Client, req.On, false)
}
}
}()
}

// callClientCallbacks calls all registered client callbacks for the specified
// event. If wait is true, it will block until all callbacks have been executed.
// Note: wait is only for testing purposes.
func (m *cachingClientFactory) callClientCallbacks(ctx context.Context, c Client, on ClientCallbackOn, wait bool) {
logger := log.FromContext(ctx).WithName("callClientCallbacks")

var cbs []ClientCallbackHandler
for _, cbReq := range m.clientCallbacks {
x := on & cbReq.On
if x != 0 {
cbs = append(cbs, cbReq)
continue
}
}

if len(cbs) == 0 {
return
}

var wg sync.WaitGroup
if wait {
wg.Add(len(cbs))
}

for idx, cbReq := range cbs {
logger.Info("Calling client callback",
"index", idx, "clientID", c.ID(), "on", on)
// call in a go routine to avoid blocking the channel
go func(cbReq ClientCallbackHandler) {
if wait {
defer wg.Done()
}
cbReq.Callback(ctx, c)
}(cbReq)
}

if wait {
wg.Wait()
}
}

// NewCachingClientFactory returns a CachingClientFactory with ClientCache initialized.
// The ClientCache's onEvictCallback is registered with the factory's onClientEvict(),
// to ensure any evictions are handled by the factory (this is very important).
Expand All @@ -741,7 +801,7 @@ func NewCachingClientFactory(ctx context.Context, client ctrlclient.Client, cach
recorder: config.Recorder,
persist: config.Persist,
ctrlClient: client,
callbackHandlerCh: make(chan Client),
callbackHandlerCh: make(chan *ClientCallbackHandlerRequest),
encryptionRequired: config.StorageConfig.EnforceEncryption,
clientLocks: make(map[ClientCacheKey]*sync.RWMutex, config.ClientCacheSize),
logger: zap.New().WithName("clientCacheFactory").WithValues(
Expand Down
Loading