From d6aa67ab390b1bde31c5ee94baeaf8a7f16a880d Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Thu, 23 May 2019 18:33:26 -0600 Subject: [PATCH 1/3] Use ingester prefix for LifecyclerConfig Many ingester flags were recently broken. This fixes the flags that were broken, but if anyone is running the new HA Ruler they will need to update their consul flags. Signed-off-by: Chris Marchbanks --- pkg/ingester/ingester.go | 2 +- pkg/ring/consul_client.go | 13 +++++----- pkg/ring/kvstore.go | 3 ++- pkg/ring/lifecycler.go | 10 ++++---- pkg/ring/ring.go | 14 ++++------- pkg/util/flagext/once.go | 50 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 70 insertions(+), 22 deletions(-) create mode 100644 pkg/util/flagext/once.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 8ab08a4110..12e9f11540 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -100,7 +100,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.LifecyclerConfig.RegisterFlags(f) + cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingester.", f) f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") diff --git a/pkg/ring/consul_client.go b/pkg/ring/consul_client.go index 3b7d1f7e3d..5a231fe9f7 100644 --- a/pkg/ring/consul_client.go +++ b/pkg/ring/consul_client.go @@ -13,6 +13,7 @@ import ( cleanhttp "github.com/hashicorp/go-cleanhttp" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/instrument" ) @@ -30,14 +31,14 @@ type ConsulConfig struct { ConsistentReads bool } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. // If prefix is not an empty string it should end with a period. func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet, prefix string) { - f.StringVar(&cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.") - f.StringVar(&cfg.Prefix, prefix+"consul.prefix", "collectors/", "Prefix for keys in Consul. Should end with a /.") - f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.") - f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul") - f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.") + flagext.StringVarOnce(f, &cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.") + flagext.StringVarOnce(f, &cfg.Prefix, prefix+"consul.prefix", "collectors/", "Prefix for keys in Consul. Should end with a /.") + flagext.StringVarOnce(f, &cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.") + flagext.DurationVarOnce(f, &cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul") + flagext.BoolVarOnce(f, &cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.") } type kv interface { diff --git a/pkg/ring/kvstore.go b/pkg/ring/kvstore.go index b9d5101fb6..6a76c002cb 100644 --- a/pkg/ring/kvstore.go +++ b/pkg/ring/kvstore.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/golang/protobuf/proto" "github.com/golang/snappy" ) @@ -47,7 +48,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { if prefix == "" { prefix = "ring." } - f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, inmemory).") + flagext.StringVarOnce(f, &cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, inmemory).") } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e75abdef41..4ad047a644 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -41,7 +41,7 @@ var ( type LifecyclerConfig struct { RingConfig Config `yaml:"ring,omitempty"` - // Config for the ingester lifecycle control + // Config for the ingester lifecycle control. ListenPort *int NumTokens int `yaml:"num_tokens,omitempty"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` @@ -52,21 +52,21 @@ type LifecyclerConfig struct { InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` - // For testing, you can override the address and ID of this ingester + // For testing, you can override the address and ID of this ingester. Addr string `yaml:"address"` Port int ID string SkipUnregister bool } -// RegisterFlags adds the flags required to config this to the given FlagSet +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) + cfg.RingConfig.RegisterFlags(f) f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.") f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 61c1aa3128..fab453fd2d 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" ) const ( @@ -61,17 +62,12 @@ type Config struct { ReplicationFactor int `yaml:"replication_factor,omitempty"` } -// RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) -} - -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix -func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.KVStore.RegisterFlagsWithPrefix(prefix, f) + cfg.KVStore.RegisterFlagsWithPrefix("", f) - f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") - f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") + flagext.DurationVarOnce(f, &cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") + flagext.IntVarOnce(f, &cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") } // Ring holds the information about the members of the consistent hash ring. diff --git a/pkg/util/flagext/once.go b/pkg/util/flagext/once.go new file mode 100644 index 0000000000..fd6d6a0365 --- /dev/null +++ b/pkg/util/flagext/once.go @@ -0,0 +1,50 @@ +package flagext + +import ( + "flag" + "time" +) + +// IntVarOnce will check to see if a flag has already been registered. If +// so it will set the value to the value already parsed. If not, it will +// register the new flag. +func IntVarOnce(f *flag.FlagSet, p *int, name string, value int, usage string) { + if fl := f.Lookup(name); fl == nil { + f.IntVar(p, name, value, usage) + } else { + *p = fl.Value.(flag.Getter).Get().(int) + } +} + +// BoolVarOnce will check to see if a flag has already been registered. If +// so it will set the value to the value already parsed. If not, it will +// register the new flag. +func BoolVarOnce(f *flag.FlagSet, p *bool, name string, value bool, usage string) { + if fl := f.Lookup(name); fl == nil { + f.BoolVar(p, name, value, usage) + } else { + *p = fl.Value.(flag.Getter).Get().(bool) + } +} + +// StringVarOnce will check to see if a flag has already been registered. If +// so it will set the value to the value already parsed. If not, it will +// register the new flag. +func StringVarOnce(f *flag.FlagSet, p *string, name string, value string, usage string) { + if fl := f.Lookup(name); fl == nil { + f.StringVar(p, name, value, usage) + } else { + *p = fl.Value.(flag.Getter).Get().(string) + } +} + +// DurationVarOnce will check to see if a flag has already been registered. If +// so it will set the value to the value already parsed. If not, it will +// register the new flag. +func DurationVarOnce(f *flag.FlagSet, p *time.Duration, name string, value time.Duration, usage string) { + if fl := f.Lookup(name); fl == nil { + f.DurationVar(p, name, value, usage) + } else { + *p = fl.Value.(flag.Getter).Get().(time.Duration) + } +} From 97de9f4d1c4ff466351d524aee5ae822c3a57be1 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 24 May 2019 11:53:15 -0600 Subject: [PATCH 2/3] Revert "Use ingester prefix for LifecyclerConfig" This reverts commit d6aa67ab390b1bde31c5ee94baeaf8a7f16a880d. Signed-off-by: Chris Marchbanks --- pkg/ingester/ingester.go | 2 +- pkg/ring/consul_client.go | 13 +++++----- pkg/ring/kvstore.go | 3 +-- pkg/ring/lifecycler.go | 10 ++++---- pkg/ring/ring.go | 14 +++++++---- pkg/util/flagext/once.go | 50 --------------------------------------- 6 files changed, 22 insertions(+), 70 deletions(-) delete mode 100644 pkg/util/flagext/once.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 12e9f11540..8ab08a4110 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -100,7 +100,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ingester.", f) + cfg.LifecyclerConfig.RegisterFlags(f) f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") diff --git a/pkg/ring/consul_client.go b/pkg/ring/consul_client.go index 5a231fe9f7..3b7d1f7e3d 100644 --- a/pkg/ring/consul_client.go +++ b/pkg/ring/consul_client.go @@ -13,7 +13,6 @@ import ( cleanhttp "github.com/hashicorp/go-cleanhttp" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/instrument" ) @@ -31,14 +30,14 @@ type ConsulConfig struct { ConsistentReads bool } -// RegisterFlags adds the flags required to config this to the given FlagSet. +// RegisterFlags adds the flags required to config this to the given FlagSet // If prefix is not an empty string it should end with a period. func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet, prefix string) { - flagext.StringVarOnce(f, &cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.") - flagext.StringVarOnce(f, &cfg.Prefix, prefix+"consul.prefix", "collectors/", "Prefix for keys in Consul. Should end with a /.") - flagext.StringVarOnce(f, &cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.") - flagext.DurationVarOnce(f, &cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul") - flagext.BoolVarOnce(f, &cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.") + f.StringVar(&cfg.Host, prefix+"consul.hostname", "localhost:8500", "Hostname and port of Consul.") + f.StringVar(&cfg.Prefix, prefix+"consul.prefix", "collectors/", "Prefix for keys in Consul. Should end with a /.") + f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.") + f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to consul") + f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to consul.") } type kv interface { diff --git a/pkg/ring/kvstore.go b/pkg/ring/kvstore.go index 6a76c002cb..b9d5101fb6 100644 --- a/pkg/ring/kvstore.go +++ b/pkg/ring/kvstore.go @@ -6,7 +6,6 @@ import ( "fmt" "sync" - "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/golang/protobuf/proto" "github.com/golang/snappy" ) @@ -48,7 +47,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { if prefix == "" { prefix = "ring." } - flagext.StringVarOnce(f, &cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, inmemory).") + f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, inmemory).") } // CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 4ad047a644..e75abdef41 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -41,7 +41,7 @@ var ( type LifecyclerConfig struct { RingConfig Config `yaml:"ring,omitempty"` - // Config for the ingester lifecycle control. + // Config for the ingester lifecycle control ListenPort *int NumTokens int `yaml:"num_tokens,omitempty"` HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` @@ -52,21 +52,21 @@ type LifecyclerConfig struct { InfNames []string `yaml:"interface_names"` FinalSleep time.Duration `yaml:"final_sleep"` - // For testing, you can override the address and ID of this ingester. + // For testing, you can override the address and ID of this ingester Addr string `yaml:"address"` Port int ID string SkipUnregister bool } -// RegisterFlags adds the flags required to config this to the given FlagSet. +// RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { cfg.RegisterFlagsWithPrefix("", f) } -// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.RingConfig.RegisterFlags(f) + cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.") f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index fab453fd2d..61c1aa3128 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -16,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/flagext" ) const ( @@ -62,12 +61,17 @@ type Config struct { ReplicationFactor int `yaml:"replication_factor,omitempty"` } -// RegisterFlags adds the flags required to config this to the given FlagSet. +// RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.KVStore.RegisterFlagsWithPrefix("", f) + cfg.RegisterFlagsWithPrefix("", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet with a specified prefix +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.KVStore.RegisterFlagsWithPrefix(prefix, f) - flagext.DurationVarOnce(f, &cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") - flagext.IntVarOnce(f, &cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") + f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") + f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") } // Ring holds the information about the members of the consistent hash ring. diff --git a/pkg/util/flagext/once.go b/pkg/util/flagext/once.go deleted file mode 100644 index fd6d6a0365..0000000000 --- a/pkg/util/flagext/once.go +++ /dev/null @@ -1,50 +0,0 @@ -package flagext - -import ( - "flag" - "time" -) - -// IntVarOnce will check to see if a flag has already been registered. If -// so it will set the value to the value already parsed. If not, it will -// register the new flag. -func IntVarOnce(f *flag.FlagSet, p *int, name string, value int, usage string) { - if fl := f.Lookup(name); fl == nil { - f.IntVar(p, name, value, usage) - } else { - *p = fl.Value.(flag.Getter).Get().(int) - } -} - -// BoolVarOnce will check to see if a flag has already been registered. If -// so it will set the value to the value already parsed. If not, it will -// register the new flag. -func BoolVarOnce(f *flag.FlagSet, p *bool, name string, value bool, usage string) { - if fl := f.Lookup(name); fl == nil { - f.BoolVar(p, name, value, usage) - } else { - *p = fl.Value.(flag.Getter).Get().(bool) - } -} - -// StringVarOnce will check to see if a flag has already been registered. If -// so it will set the value to the value already parsed. If not, it will -// register the new flag. -func StringVarOnce(f *flag.FlagSet, p *string, name string, value string, usage string) { - if fl := f.Lookup(name); fl == nil { - f.StringVar(p, name, value, usage) - } else { - *p = fl.Value.(flag.Getter).Get().(string) - } -} - -// DurationVarOnce will check to see if a flag has already been registered. If -// so it will set the value to the value already parsed. If not, it will -// register the new flag. -func DurationVarOnce(f *flag.FlagSet, p *time.Duration, name string, value time.Duration, usage string) { - if fl := f.Lookup(name); fl == nil { - f.DurationVar(p, name, value, usage) - } else { - *p = fl.Value.(flag.Getter).Get().(time.Duration) - } -} From dad15afc24f91d65be4d85c7825d9676924a21a1 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Fri, 24 May 2019 12:04:23 -0600 Subject: [PATCH 3/3] Default Lifecycler flags to have the ingester prefix Signed-off-by: Chris Marchbanks --- pkg/ring/lifecycler.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index e75abdef41..914e1f0853 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -68,6 +68,12 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { cfg.RingConfig.RegisterFlagsWithPrefix(prefix, f) + // In order to keep backwards compatibility all of these need to be prefixed + // with "ingester." + if prefix == "" { + prefix = "ingester." + } + f.IntVar(&cfg.NumTokens, prefix+"num-tokens", 128, "Number of tokens for each ingester.") f.DurationVar(&cfg.HeartbeatPeriod, prefix+"heartbeat-period", 5*time.Second, "Period at which to heartbeat to consul.") f.DurationVar(&cfg.JoinAfter, prefix+"join-after", 0*time.Second, "Period to wait for a claim from another member; will join automatically after this.")