Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 53 additions & 23 deletions providers/appconfig/appconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,27 @@ func Provider(cfg Config) (*AppConfig, error) {
}
client := appconfig.NewFromConfig(c)

return &AppConfig{client: client, config: cfg}, nil
return &AppConfig{client: client, config: cfg, input: inputFromConfig(cfg)}, nil
}

// ProviderWithClient returns an AWS AppConfig provider
// using an existing AWS appconfig client.
func ProviderWithClient(cfg Config, client *appconfig.Client) *AppConfig {
return &AppConfig{client: client, config: cfg}
return &AppConfig{client: client, config: cfg, input: inputFromConfig(cfg)}
}

func inputFromConfig(cfg Config) appconfig.GetConfigurationInput {
// better to load initially, than later, what if `Watch()` is called first and then `ReadBytes()`?
return appconfig.GetConfigurationInput{
Application: &cfg.Application,
ClientId: &cfg.ClientID,
Configuration: &cfg.Configuration,
Environment: &cfg.Environment,
}
}

// ReadBytes returns the raw bytes for parsing.
func (ac *AppConfig) ReadBytes() ([]byte, error) {
ac.input = appconfig.GetConfigurationInput{
Application: &ac.config.Application,
ClientId: &ac.config.ClientID,
Configuration: &ac.config.Configuration,
Environment: &ac.config.Environment,
}
if ac.config.ClientConfigurationVersion != "" {
ac.input.ClientConfigurationVersion = &ac.config.ClientConfigurationVersion
}
Expand All @@ -129,30 +133,56 @@ func (ac *AppConfig) Read() (map[string]interface{}, error) {

// Watch polls AWS AppConfig for configuration updates.
func (ac *AppConfig) Watch(cb func(event interface{}, err error)) error {
return ac.WatchWithContext(context.Background(), cb)
}

// WatchWitchCtx polls AWS AppConfig for configuration updates.
func (ac *AppConfig) WatchWithContext(ctx context.Context, cb func(event interface{}, err error)) error {
if ac.config.WatchInterval == 0 {
// Set default watch interval to 60 seconds.
ac.config.WatchInterval = 60 * time.Second
}

go func() {
loop:
for {
conf, err := ac.client.GetConfiguration(context.TODO(), &ac.input)
if err != nil {
cb(nil, err)
break loop
}

// Check if the configuration has been updated.
if len(conf.Content) == 0 {
// Configuration is not updated and we have the latest version.
// Sleep for WatchInterval and retry watcher.
time.Sleep(ac.config.WatchInterval)
continue
input := ac.input
currentVersion := input.ClientConfigurationVersion

conf, err := ac.client.GetConfiguration(ctx, &input)
if err != nil {
cb(nil, err)
return
}

if currentVersion != conf.ConfigurationVersion {
cb(conf.Content, nil)
currentVersion = conf.ConfigurationVersion
}

ticker := time.NewTicker(ac.config.WatchInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:

input.ClientConfigurationVersion = currentVersion
conf, err := ac.client.GetConfiguration(ctx, &input)
if err != nil {
cb(nil, err)
return
}

if conf != nil {
cb(conf.Content, nil)
currentVersion = conf.ConfigurationVersion
}

case <-ctx.Done():
cb(nil, ctx.Err())
return
}

// Trigger event.
cb(nil, nil)
}
}()

Expand Down
30 changes: 29 additions & 1 deletion providers/consul/consul.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package consul

import (
"context"
"errors"
"fmt"
"sync"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
Expand Down Expand Up @@ -129,6 +131,11 @@ func (c *Consul) Read() (map[string]interface{}, error) {

// Watch watches for changes in the Consul API and triggers a callback.
func (c *Consul) Watch(cb func(event interface{}, err error)) error {
return c.WatchWithContext(context.Background(), cb)
}

// WatchWithContext watches for changes in the Consul API and triggers a callback.
func (c *Consul) WatchWithContext(ctx context.Context, cb func(event interface{}, err error)) error {
p := make(map[string]interface{})

if c.cfg.Recurse {
Expand All @@ -148,8 +155,29 @@ func (c *Consul) Watch(cb func(event interface{}, err error)) error {
cb(val, nil)
}

var once sync.Once

// Create a new context that we can cancel internally. To link the
// lifecycle of our two goroutines.
watchCtx, cancelFunc := context.WithCancel(ctx)

go func() {
defer cancelFunc()
err := plan.Run(c.cfg.Cfg.Address)
if err != nil {
once.Do(func() {
cb(nil, err)
})
}
}()

go func() {
plan.Run(c.cfg.Cfg.Address)
<-watchCtx.Done()
plan.Stop()

once.Do(func() {
cb(nil, ctx.Err())
})
}()

return nil
Expand Down
32 changes: 27 additions & 5 deletions providers/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,42 @@ func (e *Etcd) Read() (map[string]interface{}, error) {
}

func (e *Etcd) Watch(cb func(event interface{}, err error)) error {
return e.WatchWithContext(context.Background(), cb)
}

func (e *Etcd) WatchWithContext(ctx context.Context, cb func(event interface{}, err error)) error {
var w clientv3.WatchChan

if e.cfg.Prefix {
w = e.client.Watch(ctx, e.cfg.Key, clientv3.WithPrefix())
} else {
w = e.client.Watch(ctx, e.cfg.Key)
}

go func() {
if e.cfg.Prefix {
w = e.client.Watch(context.Background(), e.cfg.Key, clientv3.WithPrefix())
} else {
w = e.client.Watch(context.Background(), e.cfg.Key)
}
var err error

for wresp := range w {

if err = wresp.Err(); err != nil {
cb(nil, err)
return
}

for _, ev := range wresp.Events {
cb(ev, nil)
}
}

// no need to check for ctx.Done().
// reference: If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
// and "WatchResponse" from this closed channel has zero events and nil "Err()"
if err = ctx.Err(); err != nil {
cb(nil, err)
return
}

cb(nil, errors.New("etcd watcher channel closed"))
}()

return nil
Expand Down
26 changes: 18 additions & 8 deletions providers/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package file

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -42,8 +43,14 @@ func (f *File) Read() (map[string]interface{}, error) {
// Watch watches the file and triggers a callback when it changes. It is a
// blocking function that internally spawns a goroutine to watch for changes.
func (f *File) Watch(cb func(event interface{}, err error)) error {
return f.WatchWithContext(context.Background(), cb)
}

// WatchWithContext watches the file and triggers a callback when it changes. It is a
// blocking function that internally spawns a goroutine to watch for changes.
func (f *File) WatchWithContext(ctx context.Context, cb func(event interface{}, err error)) error {
f.mu.Lock()

// If a watcher already exists, return an error.
if f.isWatching {
f.mu.Unlock()
Expand All @@ -54,9 +61,9 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
// can be detected.
realPath, err := filepath.EvalSymlinks(f.path)
if err != nil {
f.mu.Unlock()
return err
}
realPath = filepath.Clean(realPath)

// Although only a single file is being watched, fsnotify has to watch
// the whole parent directory to pick up all events such as symlink changes.
Expand All @@ -69,7 +76,7 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
}

f.isWatching = true

// Set up the directory watch before releasing the lock
err = f.w.Add(fDir)
if err != nil {
Expand All @@ -79,7 +86,7 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
f.mu.Unlock()
return err
}

// Release the lock before spawning goroutine
f.mu.Unlock()

Expand All @@ -92,13 +99,17 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
loop:
for {
select {
case <-ctx.Done():
cb(nil, ctx.Err())
break loop

case event, ok := <-f.w.Events:
if !ok {
// Only throw an error if we were still supposed to be watching.
f.mu.Lock()
stillWatching := f.isWatching
f.mu.Unlock()

if stillWatching {
cb(nil, errors.New("fsnotify watch channel closed"))
}
Expand All @@ -123,7 +134,6 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
cb(nil, err)
break loop
}
curPath = filepath.Clean(curPath)

onWatchedFile := evFile == realPath || evFile == f.path

Expand Down Expand Up @@ -151,7 +161,7 @@ func (f *File) Watch(cb func(event interface{}, err error)) error {
f.mu.Lock()
stillWatching := f.isWatching
f.mu.Unlock()

if stillWatching {
cb(nil, errors.New("fsnotify err channel closed"))
}
Expand Down Expand Up @@ -185,7 +195,7 @@ func (f *File) Unwatch() error {
if !f.isWatching {
return nil // Already unwatched
}

f.isWatching = false
if f.w != nil {
// Close the watcher to signal the goroutine to stop
Expand Down
27 changes: 22 additions & 5 deletions providers/nats/nats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nats

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -89,17 +90,33 @@ func (n *Nats) Read() (map[string]interface{}, error) {
}

func (n *Nats) Watch(cb func(event interface{}, err error)) error {
w, err := n.kv.Watch(fmt.Sprintf("%s.*", n.cfg.Prefix))
return n.WatchWithContext(context.Background(), cb)
}

func (n *Nats) WatchWithContext(ctx context.Context, cb func(event any, err error)) error {
w, err := n.kv.Watch(fmt.Sprintf("%s.*", n.cfg.Prefix), nats.Context(ctx))
if err != nil {
return err
}

start := time.Now()
go func(watcher nats.KeyWatcher) {
for update := range watcher.Updates() {
// ignore nil events and only callback when the event is new (nats always sends one "old" event)
if update != nil && update.Created().After(start) {
cb(update, nil)
defer watcher.Stop()

for {
select {
case <-ctx.Done():
cb(nil, ctx.Err())
return
case update, ok := <-watcher.Updates():
if !ok {
cb(nil, errors.New("nats watcher channel closed"))
return
}
// ignore nil events and only callback when the event is new (nats always sends one "old" event)
if update != nil && update.Created().After(start) {
cb(update, nil)
}
}
}
}(w)
Expand Down
Loading
Loading