diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 269d3f589f..2b7881aa9c 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/grafana/dskit/modules" + "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -60,7 +61,6 @@ import ( "github.com/cortexproject/cortex/pkg/util/grpc/healthcheck" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/process" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -120,7 +120,7 @@ type Config struct { Configs configs.Config `yaml:"configs"` Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager"` AlertmanagerStorage alertstore.Config `yaml:"alertmanager_storage"` - RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config"` + RuntimeConfig runtimeconfig.Config `yaml:"runtime_config"` MemberlistKV memberlist.KVConfig `yaml:"memberlist"` QueryScheduler scheduler.Config `yaml:"query_scheduler"` } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 9e58bf675b..557066e72c 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/grafana/dskit/modules" + "github.com/grafana/dskit/runtimeconfig" "github.com/grafana/dskit/services" "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" @@ -32,7 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/flusher" - frontend "github.com/cortexproject/cortex/pkg/frontend" + "github.com/cortexproject/cortex/pkg/frontend" "github.com/cortexproject/cortex/pkg/frontend/transport" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/querier" @@ -46,7 +47,6 @@ import ( "github.com/cortexproject/cortex/pkg/scheduler" "github.com/cortexproject/cortex/pkg/storegateway" util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -157,7 +157,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) { // make sure to set default limits before we start loading configuration into memory validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) - serv, err := runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer) + serv, err := runtimeconfig.New(t.Cfg.RuntimeConfig, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer), util_log.Logger) if err == nil { // TenantLimits just delegates to RuntimeConfig and doesn't have any state or need to do // anything in the start/stopping phase. Thus we can create it as part of runtime config diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go index 150c25727b..c45caa0c13 100644 --- a/pkg/cortex/runtime_config.go +++ b/pkg/cortex/runtime_config.go @@ -5,12 +5,12 @@ import ( "io" "net/http" + "github.com/grafana/dskit/runtimeconfig" "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" ) diff --git a/pkg/util/runtimeconfig/manager_test.go b/pkg/util/runtimeconfig/manager_test.go deleted file mode 100644 index f14e1e5735..0000000000 --- a/pkg/util/runtimeconfig/manager_test.go +++ /dev/null @@ -1,279 +0,0 @@ -package runtimeconfig - -import ( - "context" - "crypto/sha256" - "fmt" - "io" - "io/ioutil" - "os" - "strings" - "testing" - "time" - - "github.com/grafana/dskit/services" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/atomic" - "gopkg.in/yaml.v2" -) - -type TestLimits struct { - Limit1 int `json:"limit1"` - Limit2 int `json:"limit2"` -} - -// WARNING: THIS GLOBAL VARIABLE COULD LEAD TO UNEXPECTED BEHAVIOUR WHEN RUNNING MULTIPLE DIFFERENT TESTS -var defaultTestLimits *TestLimits - -type testOverrides struct { - Overrides map[string]*TestLimits `yaml:"overrides"` -} - -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (l *TestLimits) UnmarshalYAML(unmarshal func(interface{}) error) error { - if defaultTestLimits != nil { - *l = *defaultTestLimits - } - type plain TestLimits - return unmarshal((*plain)(l)) -} - -func testLoadOverrides(r io.Reader) (interface{}, error) { - var overrides = &testOverrides{} - - decoder := yaml.NewDecoder(r) - decoder.SetStrict(true) - if err := decoder.Decode(&overrides); err != nil { - return nil, err - } - return overrides, nil -} - -func newTestOverridesManagerConfig(t *testing.T, i int32) (*atomic.Int32, ManagerConfig) { - var config = atomic.NewInt32(i) - - // create empty file - tempFile, err := ioutil.TempFile("", "test-validation") - require.NoError(t, err) - - t.Cleanup(func() { - tempFile.Close() - os.Remove(tempFile.Name()) - }) - - // testing NewRuntimeConfigManager with overrides reload config set - return config, ManagerConfig{ - ReloadPeriod: 5 * time.Second, - LoadPath: tempFile.Name(), - Loader: func(_ io.Reader) (i interface{}, err error) { - val := int(config.Load()) - return val, nil - }, - } -} - -func TestNewOverridesManager(t *testing.T) { - tempFile, err := ioutil.TempFile("", "test-validation") - require.NoError(t, err) - - defer func() { - // Clean up - require.NoError(t, tempFile.Close()) - require.NoError(t, os.Remove(tempFile.Name())) - }() - - _, err = tempFile.WriteString(`overrides: - user1: - limit2: 150`) - require.NoError(t, err) - - defaultTestLimits = &TestLimits{Limit1: 100} - - // testing NewRuntimeConfigManager with overrides reload config set - overridesManagerConfig := ManagerConfig{ - ReloadPeriod: time.Second, - LoadPath: tempFile.Name(), - Loader: testLoadOverrides, - } - - overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), overridesManager)) - - // Cleaning up - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), overridesManager)) - - // Make sure test limits were loaded. - require.NotNil(t, overridesManager.GetConfig()) -} - -func TestManager_ListenerWithDefaultLimits(t *testing.T) { - tempFile, err := ioutil.TempFile("", "test-validation") - require.NoError(t, err) - require.NoError(t, tempFile.Close()) - - defer func() { - // Clean up - require.NoError(t, os.Remove(tempFile.Name())) - }() - - config := []byte(`overrides: - user1: - limit2: 150`) - err = ioutil.WriteFile(tempFile.Name(), config, 0600) - require.NoError(t, err) - - defaultTestLimits = &TestLimits{Limit1: 100} - - // testing NewRuntimeConfigManager with overrides reload config set - overridesManagerConfig := ManagerConfig{ - ReloadPeriod: time.Second, - LoadPath: tempFile.Name(), - Loader: testLoadOverrides, - } - - reg := prometheus.NewPedanticRegistry() - - overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig, reg) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), overridesManager)) - - // check if the metrics is set to the config map value before - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` - # HELP cortex_runtime_config_hash Hash of the currently active runtime config file. - # TYPE cortex_runtime_config_hash gauge - cortex_runtime_config_hash{sha256="%s"} 1 - # HELP cortex_runtime_config_last_reload_successful Whether the last runtime-config reload attempt was successful. - # TYPE cortex_runtime_config_last_reload_successful gauge - cortex_runtime_config_last_reload_successful 1 - `, fmt.Sprintf("%x", sha256.Sum256(config)))))) - - // need to use buffer, otherwise loadConfig will throw away update - ch := overridesManager.CreateListenerChannel(1) - - // rewrite file - config = []byte(`overrides: - user2: - limit2: 200`) - err = ioutil.WriteFile(tempFile.Name(), config, 0600) - require.NoError(t, err) - - // reload - err = overridesManager.loadConfig() - require.NoError(t, err) - - var newValue interface{} - select { - case newValue = <-ch: - // ok - case <-time.After(time.Second): - t.Fatal("listener was not called") - } - - to := newValue.(*testOverrides) - require.Equal(t, 200, to.Overrides["user2"].Limit2) // new overrides - require.Equal(t, 100, to.Overrides["user2"].Limit1) // from defaults - - // check if the metrics have been updated - assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(` - # HELP cortex_runtime_config_hash Hash of the currently active runtime config file. - # TYPE cortex_runtime_config_hash gauge - cortex_runtime_config_hash{sha256="%s"} 1 - # HELP cortex_runtime_config_last_reload_successful Whether the last runtime-config reload attempt was successful. - # TYPE cortex_runtime_config_last_reload_successful gauge - cortex_runtime_config_last_reload_successful 1 - `, fmt.Sprintf("%x", sha256.Sum256(config)))))) - - // Cleaning up - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), overridesManager)) - - // Make sure test limits were loaded. - require.NotNil(t, overridesManager.GetConfig()) -} - -func TestManager_ListenerChannel(t *testing.T) { - config, overridesManagerConfig := newTestOverridesManagerConfig(t, 555) - - overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), overridesManager)) - - // need to use buffer, otherwise loadConfig will throw away update - ch := overridesManager.CreateListenerChannel(1) - - err = overridesManager.loadConfig() - require.NoError(t, err) - - select { - case newValue := <-ch: - require.Equal(t, 555, newValue) - case <-time.After(time.Second): - t.Fatal("listener was not called") - } - - config.Store(1111) - err = overridesManager.loadConfig() - require.NoError(t, err) - - select { - case newValue := <-ch: - require.Equal(t, 1111, newValue) - case <-time.After(time.Second): - t.Fatal("listener was not called") - } - - overridesManager.CloseListenerChannel(ch) - select { - case _, ok := <-ch: - require.False(t, ok) - case <-time.After(time.Second): - t.Fatal("channel not closed") - } -} - -func TestManager_StopClosesListenerChannels(t *testing.T) { - _, overridesManagerConfig := newTestOverridesManagerConfig(t, 555) - - overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), overridesManager)) - - // need to use buffer, otherwise loadConfig will throw away update - ch := overridesManager.CreateListenerChannel(0) - - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), overridesManager)) - - select { - case _, ok := <-ch: - require.False(t, ok) - case <-time.After(time.Second): - t.Fatal("channel not closed") - } -} - -func TestManager_ShouldFastFailOnInvalidConfigAtStartup(t *testing.T) { - // Create an invalid runtime config file. - tempFile, err := ioutil.TempFile("", "invalid-config") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.Remove(tempFile.Name())) - }) - - _, err = tempFile.Write([]byte("!invalid!")) - require.NoError(t, err) - require.NoError(t, tempFile.Close()) - - // Create the config manager and start it. - cfg := ManagerConfig{ - ReloadPeriod: time.Second, - LoadPath: tempFile.Name(), - Loader: testLoadOverrides, - } - - m, err := NewRuntimeConfigManager(cfg, nil) - require.NoError(t, err) - require.Error(t, services.StartAndAwaitRunning(context.Background(), m)) -} diff --git a/pkg/util/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go similarity index 87% rename from pkg/util/runtimeconfig/manager.go rename to vendor/github.com/grafana/dskit/runtimeconfig/manager.go index 67d9eec3f2..f650663694 100644 --- a/pkg/util/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -11,21 +11,21 @@ import ( "sync" "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/grafana/dskit/services" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/grafana/dskit/services" ) // Loader loads the configuration from file. type Loader func(r io.Reader) (interface{}, error) -// ManagerConfig holds the config for an Manager instance. +// Config holds the config for an Manager instance. // It holds config related to loading per-tenant config. -type ManagerConfig struct { +type Config struct { ReloadPeriod time.Duration `yaml:"period"` // LoadPath contains the path to the runtime config file, requires an // non-empty value @@ -34,7 +34,7 @@ type ManagerConfig struct { } // RegisterFlags registers flags. -func (mc *ManagerConfig) RegisterFlags(f *flag.FlagSet) { +func (mc *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&mc.LoadPath, "runtime-config.file", "", "File with the configuration that can be updated in runtime.") f.DurationVar(&mc.ReloadPeriod, "runtime-config.reload-period", 10*time.Second, "How often to check runtime config file.") } @@ -44,7 +44,8 @@ func (mc *ManagerConfig) RegisterFlags(f *flag.FlagSet) { type Manager struct { services.Service - cfg ManagerConfig + cfg Config + logger log.Logger listenersMtx sync.Mutex listeners []chan interface{} @@ -56,8 +57,8 @@ type Manager struct { configHash *prometheus.GaugeVec } -// NewRuntimeConfigManager creates an instance of Manager and starts reload config loop based on config -func NewRuntimeConfigManager(cfg ManagerConfig, registerer prometheus.Registerer) (*Manager, error) { +// New creates an instance of Manager and starts reload config loop based on config +func New(cfg Config, registerer prometheus.Registerer, logger log.Logger) (*Manager, error) { if cfg.LoadPath == "" { return nil, errors.New("LoadPath is empty") } @@ -65,13 +66,14 @@ func NewRuntimeConfigManager(cfg ManagerConfig, registerer prometheus.Registerer mgr := Manager{ cfg: cfg, configLoadSuccess: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_runtime_config_last_reload_successful", + Name: "runtime_config_last_reload_successful", Help: "Whether the last runtime-config reload attempt was successful.", }), configHash: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_runtime_config_hash", + Name: "runtime_config_hash", Help: "Hash of the currently active runtime config file.", }, []string{"sha256"}), + logger: logger, } mgr.Service = services.NewBasicService(mgr.starting, mgr.loop, mgr.stopping) @@ -118,7 +120,7 @@ func (om *Manager) CloseListenerChannel(listener <-chan interface{}) { func (om *Manager) loop(ctx context.Context) error { if om.cfg.LoadPath == "" { - level.Info(util_log.Logger).Log("msg", "runtime config disabled: file not specified") + level.Info(om.logger).Log("msg", "runtime config disabled: file not specified") <-ctx.Done() return nil } @@ -132,7 +134,7 @@ func (om *Manager) loop(ctx context.Context) error { err := om.loadConfig() if err != nil { // Log but don't stop on error - we don't want to halt all ingesters because of a typo - level.Error(util_log.Logger).Log("msg", "failed to load config", "err", err) + level.Error(om.logger).Log("msg", "failed to load config", "err", err) } case <-ctx.Done(): return nil diff --git a/vendor/modules.txt b/vendor/modules.txt index ccac2c7be2..c598b07939 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -298,6 +298,7 @@ github.com/gorilla/websocket ## explicit github.com/grafana/dskit/backoff github.com/grafana/dskit/modules +github.com/grafana/dskit/runtimeconfig github.com/grafana/dskit/services # github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 ## explicit