diff --git a/pkg/configs/client/client.go b/pkg/configs/client/client.go index a5528b29ecc..bbd05432c5f 100644 --- a/pkg/configs/client/client.go +++ b/pkg/configs/client/client.go @@ -16,10 +16,6 @@ import ( // Client is what the ruler and altermanger needs from a config store to process rules. type Client interface { - // GetRules returns all Cortex configurations from a configs API server - // that have been updated after the given configs.ID was last updated. - GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) - // GetAlerts fetches all the alerts that have changes since since. GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) } @@ -55,27 +51,6 @@ type configsClient struct { Timeout time.Duration } -// GetRules implements ConfigClient. -func (c configsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - suffix := "" - if since != 0 { - suffix = fmt.Sprintf("?since=%d", since) - } - endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) - response, err := doRequest(endpoint, c.Timeout, since) - if err != nil { - return nil, err - } - configs := map[string]configs.VersionedRulesConfig{} - for id, view := range response.Configs { - cfg := view.GetVersionedRulesConfig() - if cfg != nil { - configs[id] = *cfg - } - } - return configs, nil -} - // GetAlerts implements ConfigClient. func (c configsClient) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) { suffix := "" @@ -117,14 +92,6 @@ type dbStore struct { db db.DB } -// GetRules implements ConfigClient. -func (d dbStore) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - if since == 0 { - return d.db.GetAllRulesConfigs(ctx) - } - return d.db.GetRulesConfigs(ctx, since) -} - // GetAlerts implements ConfigClient. func (d dbStore) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) { var resp map[string]configs.View diff --git a/pkg/configs/client/config.go b/pkg/configs/client/config.go index 3850b967c68..4d64eae2b4e 100644 --- a/pkg/configs/client/config.go +++ b/pkg/configs/client/config.go @@ -28,8 +28,6 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.DBConfig.RegisterFlags(f) - f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.") - f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") flag.Var(&cfg.ConfigsAPIURL, "alertmanager.configs.url", "URL of configs API server.") flag.DurationVar(&cfg.ClientTimeout, "alertmanager.configs.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") } @@ -49,16 +47,6 @@ type instrumented struct { next Client } -func (i instrumented) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - var cfgs map[string]configs.VersionedRulesConfig - err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { - var err error - cfgs, err = i.next.GetRules(ctx, since) // Warning: this will produce an incorrect result if the configID ever overflows - return err - }) - return cfgs, err -} - func (i instrumented) GetAlerts(ctx context.Context, since configs.ID) (*ConfigsResponse, error) { var cfgs *ConfigsResponse err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error { diff --git a/pkg/configs/storage/clients/configdb/client.go b/pkg/configs/storage/clients/configdb/client.go new file mode 100644 index 00000000000..ac86cadc2c7 --- /dev/null +++ b/pkg/configs/storage/clients/configdb/client.go @@ -0,0 +1,164 @@ +package configdb + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "net/http" + "net/url" + "strings" + "time" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log/level" +) + +// Config says where we can find the ruler configs. +type Config struct { + ConfigsAPIURL flagext.URLValue + ClientTimeout time.Duration +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.Var(&cfg.ConfigsAPIURL, prefix+".configs.url", "DEPRECATED. URL of configs API server.") + f.DurationVar(&cfg.ClientTimeout, prefix+".client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.") +} + +// ConfigsClient allows retrieving recording and alerting rules from the configs server. +type ConfigsClient struct { + URL *url.URL + Timeout time.Duration + + lastPoll configs.ID +} + +// New creates a new ConfigClient. +func New(cfg Config) (*ConfigsClient, error) { + return &ConfigsClient{ + URL: cfg.ConfigsAPIURL.URL, + Timeout: cfg.ClientTimeout, + + lastPoll: 0, + }, nil +} + +// GetRules implements ConfigClient. +func (c *ConfigsClient) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) { + suffix := "" + if since != 0 { + suffix = fmt.Sprintf("?since=%d", since) + } + endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix) + response, err := doRequest(endpoint, c.Timeout, since) + if err != nil { + return nil, err + } + configs := map[string]configs.VersionedRulesConfig{} + for id, view := range response.Configs { + cfg := view.GetVersionedRulesConfig() + if cfg != nil { + configs[id] = *cfg + } + } + return configs, nil +} + +// Stop stops rthe config client +func (c *ConfigsClient) Stop() {} + +func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) { + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return nil, err + } + + client := &http.Client{Timeout: timeout} + resp, err := client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Invalid response from configs server: %v", resp.StatusCode) + } + + var config ConfigsResponse + if err := json.NewDecoder(resp.Body).Decode(&config); err != nil { + level.Error(util.Logger).Log("msg", "configs: couldn't decode JSON body", "err", err) + return nil, err + } + + config.since = since + return &config, nil +} + +// ConfigsResponse is a response from server for GetConfigs. +type ConfigsResponse struct { + // The version since which these configs were changed + since configs.ID + + // Configs maps user ID to their latest configs.View. + Configs map[string]configs.View `json:"configs"` +} + +// GetLatestConfigID returns the last config ID from a set of configs. +func (c ConfigsResponse) GetLatestConfigID() configs.ID { + latest := c.since + for _, config := range c.Configs { + if config.ID > latest { + latest = config.ID + } + } + return latest +} + +// PollRules polls the configdb server and returns the updated rule groups +func (c *ConfigsClient) PollRules(ctx context.Context) (map[string][]rules.RuleGroup, error) { + resp, err := c.GetRules(ctx, c.lastPoll) + if err != nil { + return nil, err + } + + newRules := map[string][]rules.RuleGroup{} + + var highestID configs.ID + for user, cfg := range resp { + if cfg.ID > highestID { + highestID = cfg.ID + } + userRules := []rules.RuleGroup{} + if cfg.IsDeleted() { + newRules[user] = []rules.RuleGroup{} + } + rMap, err := cfg.Config.Parse() + if err != nil { + return nil, err + } + for groupSlug, r := range rMap { + name, file := decomposeGroupSlug(groupSlug) + userRules = append(userRules, rules.FormattedToRuleGroup(user, file, name, r)) + } + newRules[user] = userRules + } + + if err != nil { + return nil, err + } + + c.lastPoll = highestID + + return newRules, nil +} + +// decomposeGroupSlug breaks the group slug from Parse +// into it's group name and file name +func decomposeGroupSlug(slug string) (string, string) { + components := strings.Split(slug, ";") + return components[0], components[1] +} diff --git a/pkg/configs/storage/clients/configdb/client_test.go b/pkg/configs/storage/clients/configdb/client_test.go new file mode 100644 index 00000000000..bf0d38cdbb5 --- /dev/null +++ b/pkg/configs/storage/clients/configdb/client_test.go @@ -0,0 +1,54 @@ +package configdb + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/configs" + "github.com/stretchr/testify/assert" +) + +var response = `{ + "configs": { + "2": { + "id": 1, + "config": { + "rules_files": { + "recording.rules": "groups:\n- name: demo-service-alerts\n interval: 15s\n rules:\n - alert: SomethingIsUp\n expr: up == 1\n" + }, + "rule_format_version": "2" + } + } + } +} +` + +func TestDoRequest(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, err := w.Write([]byte(response)) + require.NoError(t, err) + })) + defer server.Close() + + resp, err := doRequest(server.URL, 1*time.Second, 0) + assert.Nil(t, err) + + expected := ConfigsResponse{Configs: map[string]configs.View{ + "2": { + ID: 1, + Config: configs.Config{ + RulesConfig: configs.RulesConfig{ + Files: map[string]string{ + "recording.rules": "groups:\n- name: demo-service-alerts\n interval: 15s\n rules:\n - alert: SomethingIsUp\n expr: up == 1\n", + }, + FormatVersion: configs.RuleFormatV2, + }, + }, + }, + }} + assert.Equal(t, &expected, resp) +} diff --git a/pkg/configs/storage/rules/compat.go b/pkg/configs/storage/rules/compat.go new file mode 100644 index 00000000000..2aa8037a6dc --- /dev/null +++ b/pkg/configs/storage/rules/compat.go @@ -0,0 +1,98 @@ +package rules + +import ( + time "time" + + "github.com/cortexproject/cortex/pkg/ingester/client" + + "github.com/golang/protobuf/proto" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/rules" +) + +// ProtoRuleUpdateDescFactory makes new RuleUpdateDesc +func ProtoRuleUpdateDescFactory() proto.Message { + return NewRuleUpdateDesc() +} + +// NewRuleUpdateDesc returns an empty *distributor.RuleUpdateDesc. +func NewRuleUpdateDesc() *RuleUpdateDesc { + return &RuleUpdateDesc{} +} + +// ToProto transforms a formatted prometheus rulegroup to a rule group protobuf +func ToProto(user string, namespace string, rl rulefmt.RuleGroup) RuleGroupDesc { + dur := time.Duration(rl.Interval) + rg := RuleGroupDesc{ + Name: rl.Name, + Namespace: namespace, + Interval: &dur, + Rules: formattedRuleToProto(rl.Rules), + User: user, + } + return rg +} + +func formattedRuleToProto(rls []rulefmt.Rule) []*RuleDesc { + rules := make([]*RuleDesc, len(rls)) + for i := range rls { + f := time.Duration(rls[i].For) + + rules[i] = &RuleDesc{ + Expr: rls[i].Expr, + Record: rls[i].Record, + Alert: rls[i].Alert, + + For: &f, + Labels: client.FromLabelsToLabelAdapaters(labels.FromMap(rls[i].Labels)), + Annotations: client.FromLabelsToLabelAdapaters(labels.FromMap(rls[i].Labels)), + } + } + + return rules +} + +// FromProto generates a rulefmt RuleGroup +func FromProto(rg *RuleGroupDesc) *rulefmt.RuleGroup { + formattedRuleGroup := rulefmt.RuleGroup{ + Name: rg.GetName(), + Interval: model.Duration(*rg.Interval), + Rules: make([]rulefmt.Rule, len(rg.GetRules())), + } + + for i, rl := range rg.GetRules() { + formattedRuleGroup.Rules[i] = rulefmt.Rule{ + Record: rl.GetRecord(), + Alert: rl.GetAlert(), + Expr: rl.GetExpr(), + For: model.Duration(*rl.GetFor()), + Labels: client.FromLabelAdaptersToLabels(rl.Labels).Map(), + Annotations: client.FromLabelAdaptersToLabels(rl.Annotations).Map(), + } + } + + return &formattedRuleGroup +} + +// ToRuleGroup returns a functional rulegroup from a proto +func ToRuleGroup(rg *RuleGroupDesc) *Group { + return &Group{ + name: rg.GetName(), + namespace: rg.GetNamespace(), + user: rg.GetUser(), + interval: *rg.Interval, + rules: rg.Rules, + } +} + +// FormattedToRuleGroup transforms a formatted prometheus rulegroup to a rule group protobuf +func FormattedToRuleGroup(user string, namespace string, name string, rls []rules.Rule) *Group { + return &Group{ + name: name, + namespace: namespace, + user: user, + activeRules: rls, + } +} diff --git a/pkg/configs/storage/rules/group.go b/pkg/configs/storage/rules/group.go new file mode 100644 index 00000000000..ab1e3d42aa3 --- /dev/null +++ b/pkg/configs/storage/rules/group.go @@ -0,0 +1,118 @@ +package rules + +import ( + "context" + time "time" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/rules" +) + +// TODO: Add a lazy rule group that only loads rules when they are needed +// TODO: The cortex project should implement a separate Group struct from +// the prometheus project. This will allow for more precise instrumentation + +// Group is used as a compatibility format between storage and evaluation +type Group struct { + name string + namespace string + user string + interval time.Duration + rules []*RuleDesc + + // activeRules allows for the support of the configdb client + // TODO: figure out a better way to accomplish this + activeRules []rules.Rule +} + +// NewRuleGroup returns a Group +func NewRuleGroup(name, namespace, user string, rls []rulefmt.Rule) *Group { + return &Group{ + name: name, + namespace: namespace, + user: user, + rules: formattedRuleToProto(rls), + } +} + +// Rules returns eval ready prometheus rules +func (g *Group) Rules(ctx context.Context) ([]rules.Rule, error) { + // Used to be compatible with configdb client + if g.rules == nil && g.activeRules != nil { + return g.activeRules, nil + } + + rls := make([]rules.Rule, 0, len(g.rules)) + for _, rl := range g.rules { + expr, err := promql.ParseExpr(rl.GetExpr()) + if err != nil { + return nil, err + } + + if rl.Alert != "" { + rls = append(rls, rules.NewAlertingRule( + rl.Alert, + expr, + *rl.GetFor(), + client.FromLabelAdaptersToLabels(rl.Labels), + client.FromLabelAdaptersToLabels(rl.Annotations), + true, + log.With(util.Logger, "alert", rl.Alert), + )) + continue + } + rls = append(rls, rules.NewRecordingRule( + rl.Record, + expr, + client.FromLabelAdaptersToLabels(rl.Labels), + )) + } + return rls, nil +} + +// ID returns a unique group identifier with the namespace and name +func (g *Group) ID() string { + return g.namespace + "/" + g.name +} + +// Name returns the name of the rule group +func (g *Group) Name() string { + return g.name +} + +// Namespace returns the Namespace of the rule group +func (g *Group) Namespace() string { + return g.namespace +} + +// User returns the User of the rule group +func (g *Group) User() string { + return g.user +} + +// Formatted returns a prometheus rulefmt formatted rule group +func (g *Group) Formatted() rulefmt.RuleGroup { + formattedRuleGroup := rulefmt.RuleGroup{ + Name: g.name, + Interval: model.Duration(g.interval), + Rules: make([]rulefmt.Rule, len(g.rules)), + } + + for i, rl := range g.rules { + formattedRuleGroup.Rules[i] = rulefmt.Rule{ + Record: rl.GetRecord(), + Alert: rl.GetAlert(), + Expr: rl.GetExpr(), + For: model.Duration(*rl.GetFor()), + Labels: client.FromLabelAdaptersToLabels(rl.Labels).Map(), + Annotations: client.FromLabelAdaptersToLabels(rl.Annotations).Map(), + } + } + + return formattedRuleGroup +} diff --git a/pkg/configs/storage/rules/rules.pb.go b/pkg/configs/storage/rules/rules.pb.go new file mode 100644 index 00000000000..d4c15283775 --- /dev/null +++ b/pkg/configs/storage/rules/rules.pb.go @@ -0,0 +1,1491 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: rules.proto + +package rules + +import ( + fmt "fmt" + _ "github.com/cortexproject/cortex/pkg/ingester/client" + github_com_cortexproject_cortex_pkg_ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + _ "github.com/golang/protobuf/ptypes/duration" + io "io" + math "math" + reflect "reflect" + strings "strings" + time "time" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type RuleUpdateDesc struct { + User string `protobuf:"bytes,1,opt,name=user,proto3" json:"user,omitempty"` + UpdatedAt int64 `protobuf:"varint,2,opt,name=updatedAt,proto3" json:"updatedAt,omitempty"` +} + +func (m *RuleUpdateDesc) Reset() { *m = RuleUpdateDesc{} } +func (*RuleUpdateDesc) ProtoMessage() {} +func (*RuleUpdateDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_8e722d3e922f0937, []int{0} +} +func (m *RuleUpdateDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RuleUpdateDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RuleUpdateDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RuleUpdateDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_RuleUpdateDesc.Merge(m, src) +} +func (m *RuleUpdateDesc) XXX_Size() int { + return m.Size() +} +func (m *RuleUpdateDesc) XXX_DiscardUnknown() { + xxx_messageInfo_RuleUpdateDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_RuleUpdateDesc proto.InternalMessageInfo + +func (m *RuleUpdateDesc) GetUser() string { + if m != nil { + return m.User + } + return "" +} + +func (m *RuleUpdateDesc) GetUpdatedAt() int64 { + if m != nil { + return m.UpdatedAt + } + return 0 +} + +type RuleGroupDesc struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + Interval *time.Duration `protobuf:"bytes,3,opt,name=interval,proto3,stdduration" json:"interval,omitempty"` + Rules []*RuleDesc `protobuf:"bytes,4,rep,name=rules,proto3" json:"rules,omitempty"` + Deleted bool `protobuf:"varint,5,opt,name=deleted,proto3" json:"deleted,omitempty"` + User string `protobuf:"bytes,6,opt,name=user,proto3" json:"user,omitempty"` +} + +func (m *RuleGroupDesc) Reset() { *m = RuleGroupDesc{} } +func (*RuleGroupDesc) ProtoMessage() {} +func (*RuleGroupDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_8e722d3e922f0937, []int{1} +} +func (m *RuleGroupDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RuleGroupDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RuleGroupDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RuleGroupDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_RuleGroupDesc.Merge(m, src) +} +func (m *RuleGroupDesc) XXX_Size() int { + return m.Size() +} +func (m *RuleGroupDesc) XXX_DiscardUnknown() { + xxx_messageInfo_RuleGroupDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_RuleGroupDesc proto.InternalMessageInfo + +func (m *RuleGroupDesc) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *RuleGroupDesc) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +func (m *RuleGroupDesc) GetInterval() *time.Duration { + if m != nil { + return m.Interval + } + return nil +} + +func (m *RuleGroupDesc) GetRules() []*RuleDesc { + if m != nil { + return m.Rules + } + return nil +} + +func (m *RuleGroupDesc) GetDeleted() bool { + if m != nil { + return m.Deleted + } + return false +} + +func (m *RuleGroupDesc) GetUser() string { + if m != nil { + return m.User + } + return "" +} + +type RuleDesc struct { + Expr string `protobuf:"bytes,1,opt,name=expr,proto3" json:"expr,omitempty"` + Record string `protobuf:"bytes,2,opt,name=record,proto3" json:"record,omitempty"` + Alert string `protobuf:"bytes,3,opt,name=alert,proto3" json:"alert,omitempty"` + For *time.Duration `protobuf:"bytes,4,opt,name=for,proto3,stdduration" json:"for,omitempty"` + Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,5,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` + Annotations []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,6,rep,name=annotations,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"annotations"` +} + +func (m *RuleDesc) Reset() { *m = RuleDesc{} } +func (*RuleDesc) ProtoMessage() {} +func (*RuleDesc) Descriptor() ([]byte, []int) { + return fileDescriptor_8e722d3e922f0937, []int{2} +} +func (m *RuleDesc) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RuleDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RuleDesc.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RuleDesc) XXX_Merge(src proto.Message) { + xxx_messageInfo_RuleDesc.Merge(m, src) +} +func (m *RuleDesc) XXX_Size() int { + return m.Size() +} +func (m *RuleDesc) XXX_DiscardUnknown() { + xxx_messageInfo_RuleDesc.DiscardUnknown(m) +} + +var xxx_messageInfo_RuleDesc proto.InternalMessageInfo + +func (m *RuleDesc) GetExpr() string { + if m != nil { + return m.Expr + } + return "" +} + +func (m *RuleDesc) GetRecord() string { + if m != nil { + return m.Record + } + return "" +} + +func (m *RuleDesc) GetAlert() string { + if m != nil { + return m.Alert + } + return "" +} + +func (m *RuleDesc) GetFor() *time.Duration { + if m != nil { + return m.For + } + return nil +} + +func init() { + proto.RegisterType((*RuleUpdateDesc)(nil), "rules.RuleUpdateDesc") + proto.RegisterType((*RuleGroupDesc)(nil), "rules.RuleGroupDesc") + proto.RegisterType((*RuleDesc)(nil), "rules.RuleDesc") +} + +func init() { proto.RegisterFile("rules.proto", fileDescriptor_8e722d3e922f0937) } + +var fileDescriptor_8e722d3e922f0937 = []byte{ + // 478 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x92, 0xbf, 0x8e, 0xd4, 0x30, + 0x10, 0xc6, 0xe3, 0xdb, 0x3f, 0xec, 0x7a, 0x05, 0x08, 0x0b, 0x21, 0x73, 0x42, 0xde, 0x68, 0x25, + 0xa4, 0x34, 0x24, 0xe2, 0x28, 0xaf, 0xe1, 0x56, 0x27, 0x41, 0x41, 0x81, 0x2c, 0xd1, 0xd0, 0x79, + 0x93, 0xb9, 0x10, 0xf0, 0xc5, 0x91, 0xe3, 0x20, 0x1a, 0x24, 0x1e, 0x81, 0x92, 0x47, 0xe0, 0x51, + 0xae, 0x5c, 0x89, 0xe6, 0x44, 0x71, 0xb0, 0xd9, 0x86, 0xf2, 0x24, 0x5e, 0x00, 0xd9, 0x4e, 0x6e, + 0xaf, 0x44, 0x48, 0x54, 0x99, 0xcf, 0x33, 0xfe, 0x66, 0x7e, 0xe3, 0xe0, 0x99, 0x6e, 0x24, 0xd4, + 0x71, 0xa5, 0x95, 0x51, 0x64, 0xe4, 0xc4, 0xfe, 0xa3, 0xbc, 0x30, 0x6f, 0x9a, 0x55, 0x9c, 0xaa, + 0xd3, 0x24, 0x57, 0xb9, 0x4a, 0x5c, 0x76, 0xd5, 0x9c, 0x38, 0xe5, 0x84, 0x8b, 0xfc, 0xad, 0x7d, + 0x96, 0x2b, 0x95, 0x4b, 0xd8, 0x55, 0x65, 0x8d, 0x16, 0xa6, 0x50, 0x65, 0x97, 0x7f, 0x7a, 0xcd, + 0x2e, 0x55, 0xda, 0xc0, 0x87, 0x4a, 0xab, 0xb7, 0x90, 0x9a, 0x4e, 0x25, 0xd5, 0xbb, 0x3c, 0x29, + 0xca, 0x1c, 0x6a, 0x03, 0x3a, 0x49, 0x65, 0x01, 0x65, 0x9f, 0xf2, 0x0e, 0x8b, 0x25, 0xbe, 0xc5, + 0x1b, 0x09, 0xaf, 0xaa, 0x4c, 0x18, 0x38, 0x86, 0x3a, 0x25, 0x04, 0x0f, 0x9b, 0x1a, 0x34, 0x45, + 0x21, 0x8a, 0xa6, 0xdc, 0xc5, 0xe4, 0x01, 0x9e, 0x36, 0xae, 0x22, 0x3b, 0x32, 0x74, 0x2f, 0x44, + 0xd1, 0x80, 0xef, 0x0e, 0x16, 0xdf, 0x10, 0xbe, 0x69, 0x4d, 0x9e, 0x69, 0xd5, 0x54, 0xbd, 0x47, + 0x29, 0x4e, 0xa1, 0xf7, 0xb0, 0xb1, 0xf5, 0xb0, 0xdf, 0xba, 0x12, 0x29, 0x38, 0x8f, 0x29, 0xdf, + 0x1d, 0x90, 0x43, 0x3c, 0x29, 0x4a, 0x03, 0xfa, 0xbd, 0x90, 0x74, 0x10, 0xa2, 0x68, 0x76, 0x70, + 0x3f, 0xf6, 0xf0, 0x71, 0x0f, 0x1f, 0x1f, 0x77, 0xf0, 0xcb, 0xe1, 0x97, 0x1f, 0x73, 0xc4, 0xaf, + 0x2e, 0x90, 0x87, 0xd8, 0xaf, 0x97, 0x0e, 0xc3, 0x41, 0x34, 0x3b, 0xb8, 0x1d, 0xfb, 0xcd, 0xdb, + 0x99, 0xec, 0x38, 0xdc, 0x67, 0x09, 0xc5, 0x37, 0x32, 0x90, 0x60, 0x20, 0xa3, 0xa3, 0x10, 0x45, + 0x13, 0xde, 0xcb, 0x2b, 0xe6, 0xf1, 0x8e, 0x79, 0xf1, 0x7b, 0x0f, 0x4f, 0x7a, 0x07, 0x5b, 0x60, + 0x77, 0xdb, 0x03, 0xd9, 0x98, 0xdc, 0xc3, 0x63, 0x0d, 0xa9, 0xd2, 0x59, 0x47, 0xd3, 0x29, 0x72, + 0x17, 0x8f, 0x84, 0x04, 0x6d, 0x1c, 0xc7, 0x94, 0x7b, 0x41, 0x1e, 0xe3, 0xc1, 0x89, 0xd2, 0x74, + 0xf8, 0x77, 0x6c, 0xb6, 0x96, 0xd4, 0x78, 0x2c, 0xc5, 0x0a, 0x64, 0x4d, 0x47, 0x8e, 0xeb, 0x4e, + 0xdc, 0x3d, 0xdd, 0x0b, 0x7b, 0xfa, 0x52, 0x14, 0x7a, 0xf9, 0xfc, 0xec, 0x62, 0x1e, 0x7c, 0xbf, + 0x98, 0xff, 0xcb, 0x8f, 0xe0, 0x6d, 0x8e, 0x32, 0x51, 0x19, 0xd0, 0xbc, 0x6b, 0x45, 0x3e, 0xe2, + 0x99, 0x28, 0x4b, 0x65, 0xdc, 0x34, 0x35, 0x1d, 0xff, 0xff, 0xce, 0xd7, 0xfb, 0x2d, 0x0f, 0xd7, + 0x1b, 0x16, 0x9c, 0x6f, 0x58, 0x70, 0xb9, 0x61, 0xe8, 0x53, 0xcb, 0xd0, 0xd7, 0x96, 0xa1, 0xb3, + 0x96, 0xa1, 0x75, 0xcb, 0xd0, 0xcf, 0x96, 0xa1, 0x5f, 0x2d, 0x0b, 0x2e, 0x5b, 0x86, 0x3e, 0x6f, + 0x59, 0xb0, 0xde, 0xb2, 0xe0, 0x7c, 0xcb, 0x82, 0xd7, 0xfe, 0x81, 0x57, 0x63, 0xb7, 0xce, 0x27, + 0x7f, 0x02, 0x00, 0x00, 0xff, 0xff, 0x70, 0xb7, 0x00, 0xbd, 0x7a, 0x03, 0x00, 0x00, +} + +func (this *RuleUpdateDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*RuleUpdateDesc) + if !ok { + that2, ok := that.(RuleUpdateDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.User != that1.User { + return false + } + if this.UpdatedAt != that1.UpdatedAt { + return false + } + return true +} +func (this *RuleGroupDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*RuleGroupDesc) + if !ok { + that2, ok := that.(RuleGroupDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Name != that1.Name { + return false + } + if this.Namespace != that1.Namespace { + return false + } + if this.Interval != nil && that1.Interval != nil { + if *this.Interval != *that1.Interval { + return false + } + } else if this.Interval != nil { + return false + } else if that1.Interval != nil { + return false + } + if len(this.Rules) != len(that1.Rules) { + return false + } + for i := range this.Rules { + if !this.Rules[i].Equal(that1.Rules[i]) { + return false + } + } + if this.Deleted != that1.Deleted { + return false + } + if this.User != that1.User { + return false + } + return true +} +func (this *RuleDesc) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*RuleDesc) + if !ok { + that2, ok := that.(RuleDesc) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Expr != that1.Expr { + return false + } + if this.Record != that1.Record { + return false + } + if this.Alert != that1.Alert { + return false + } + if this.For != nil && that1.For != nil { + if *this.For != *that1.For { + return false + } + } else if this.For != nil { + return false + } else if that1.For != nil { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } + } + if len(this.Annotations) != len(that1.Annotations) { + return false + } + for i := range this.Annotations { + if !this.Annotations[i].Equal(that1.Annotations[i]) { + return false + } + } + return true +} +func (this *RuleUpdateDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&rules.RuleUpdateDesc{") + s = append(s, "User: "+fmt.Sprintf("%#v", this.User)+",\n") + s = append(s, "UpdatedAt: "+fmt.Sprintf("%#v", this.UpdatedAt)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *RuleGroupDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 10) + s = append(s, "&rules.RuleGroupDesc{") + s = append(s, "Name: "+fmt.Sprintf("%#v", this.Name)+",\n") + s = append(s, "Namespace: "+fmt.Sprintf("%#v", this.Namespace)+",\n") + s = append(s, "Interval: "+fmt.Sprintf("%#v", this.Interval)+",\n") + if this.Rules != nil { + s = append(s, "Rules: "+fmt.Sprintf("%#v", this.Rules)+",\n") + } + s = append(s, "Deleted: "+fmt.Sprintf("%#v", this.Deleted)+",\n") + s = append(s, "User: "+fmt.Sprintf("%#v", this.User)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *RuleDesc) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 10) + s = append(s, "&rules.RuleDesc{") + s = append(s, "Expr: "+fmt.Sprintf("%#v", this.Expr)+",\n") + s = append(s, "Record: "+fmt.Sprintf("%#v", this.Record)+",\n") + s = append(s, "Alert: "+fmt.Sprintf("%#v", this.Alert)+",\n") + s = append(s, "For: "+fmt.Sprintf("%#v", this.For)+",\n") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + s = append(s, "Annotations: "+fmt.Sprintf("%#v", this.Annotations)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringRules(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *RuleUpdateDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RuleUpdateDesc) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.User) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.User))) + i += copy(dAtA[i:], m.User) + } + if m.UpdatedAt != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintRules(dAtA, i, uint64(m.UpdatedAt)) + } + return i, nil +} + +func (m *RuleGroupDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RuleGroupDesc) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Namespace) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.Namespace))) + i += copy(dAtA[i:], m.Namespace) + } + if m.Interval != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintRules(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Interval))) + n1, err := github_com_gogo_protobuf_types.StdDurationMarshalTo(*m.Interval, dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if len(m.Rules) > 0 { + for _, msg := range m.Rules { + dAtA[i] = 0x22 + i++ + i = encodeVarintRules(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.Deleted { + dAtA[i] = 0x28 + i++ + if m.Deleted { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + if len(m.User) > 0 { + dAtA[i] = 0x32 + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.User))) + i += copy(dAtA[i:], m.User) + } + return i, nil +} + +func (m *RuleDesc) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RuleDesc) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Expr) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.Expr))) + i += copy(dAtA[i:], m.Expr) + } + if len(m.Record) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.Record))) + i += copy(dAtA[i:], m.Record) + } + if len(m.Alert) > 0 { + dAtA[i] = 0x1a + i++ + i = encodeVarintRules(dAtA, i, uint64(len(m.Alert))) + i += copy(dAtA[i:], m.Alert) + } + if m.For != nil { + dAtA[i] = 0x22 + i++ + i = encodeVarintRules(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdDuration(*m.For))) + n2, err := github_com_gogo_protobuf_types.StdDurationMarshalTo(*m.For, dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x2a + i++ + i = encodeVarintRules(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Annotations) > 0 { + for _, msg := range m.Annotations { + dAtA[i] = 0x32 + i++ + i = encodeVarintRules(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func encodeVarintRules(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *RuleUpdateDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.User) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + if m.UpdatedAt != 0 { + n += 1 + sovRules(uint64(m.UpdatedAt)) + } + return n +} + +func (m *RuleGroupDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + if m.Interval != nil { + l = github_com_gogo_protobuf_types.SizeOfStdDuration(*m.Interval) + n += 1 + l + sovRules(uint64(l)) + } + if len(m.Rules) > 0 { + for _, e := range m.Rules { + l = e.Size() + n += 1 + l + sovRules(uint64(l)) + } + } + if m.Deleted { + n += 2 + } + l = len(m.User) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + return n +} + +func (m *RuleDesc) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Expr) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + l = len(m.Record) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + l = len(m.Alert) + if l > 0 { + n += 1 + l + sovRules(uint64(l)) + } + if m.For != nil { + l = github_com_gogo_protobuf_types.SizeOfStdDuration(*m.For) + n += 1 + l + sovRules(uint64(l)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovRules(uint64(l)) + } + } + if len(m.Annotations) > 0 { + for _, e := range m.Annotations { + l = e.Size() + n += 1 + l + sovRules(uint64(l)) + } + } + return n +} + +func sovRules(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRules(x uint64) (n int) { + return sovRules(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *RuleUpdateDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&RuleUpdateDesc{`, + `User:` + fmt.Sprintf("%v", this.User) + `,`, + `UpdatedAt:` + fmt.Sprintf("%v", this.UpdatedAt) + `,`, + `}`, + }, "") + return s +} +func (this *RuleGroupDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&RuleGroupDesc{`, + `Name:` + fmt.Sprintf("%v", this.Name) + `,`, + `Namespace:` + fmt.Sprintf("%v", this.Namespace) + `,`, + `Interval:` + strings.Replace(fmt.Sprintf("%v", this.Interval), "Duration", "duration.Duration", 1) + `,`, + `Rules:` + strings.Replace(fmt.Sprintf("%v", this.Rules), "RuleDesc", "RuleDesc", 1) + `,`, + `Deleted:` + fmt.Sprintf("%v", this.Deleted) + `,`, + `User:` + fmt.Sprintf("%v", this.User) + `,`, + `}`, + }, "") + return s +} +func (this *RuleDesc) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&RuleDesc{`, + `Expr:` + fmt.Sprintf("%v", this.Expr) + `,`, + `Record:` + fmt.Sprintf("%v", this.Record) + `,`, + `Alert:` + fmt.Sprintf("%v", this.Alert) + `,`, + `For:` + strings.Replace(fmt.Sprintf("%v", this.For), "Duration", "duration.Duration", 1) + `,`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `Annotations:` + fmt.Sprintf("%v", this.Annotations) + `,`, + `}`, + }, "") + return s +} +func valueToStringRules(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *RuleUpdateDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RuleUpdateDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RuleUpdateDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field User", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.User = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdatedAt", wireType) + } + m.UpdatedAt = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.UpdatedAt |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipRules(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *RuleGroupDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RuleGroupDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RuleGroupDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Interval", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Interval == nil { + m.Interval = new(time.Duration) + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(m.Interval, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Rules", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Rules = append(m.Rules, &RuleDesc{}) + if err := m.Rules[len(m.Rules)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Deleted", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Deleted = bool(v != 0) + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field User", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.User = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRules(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *RuleDesc) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RuleDesc: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RuleDesc: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Expr", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Expr = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Record", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Record = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Alert", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Alert = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field For", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.For == nil { + m.For = new(time.Duration) + } + if err := github_com_gogo_protobuf_types.StdDurationUnmarshal(m.For, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Annotations", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRules + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRules + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRules + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Annotations = append(m.Annotations, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) + if err := m.Annotations[len(m.Annotations)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRules(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthRules + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRules(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRules + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRules + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRules + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthRules + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthRules + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRules + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRules(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthRules + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthRules = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRules = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/configs/storage/rules/rules.proto b/pkg/configs/storage/rules/rules.proto new file mode 100644 index 00000000000..7f13695d659 --- /dev/null +++ b/pkg/configs/storage/rules/rules.proto @@ -0,0 +1,38 @@ +syntax = "proto3"; + +package rules; + +option go_package = "rules"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "google/protobuf/duration.proto"; +import "github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +message RuleUpdateDesc { + string user = 1; + int64 updatedAt = 2; +} + +message RuleGroupDesc { + string name = 1; + string namespace = 2; + google.protobuf.Duration interval = 3 [(gogoproto.stdduration) = true]; + + repeated RuleDesc rules = 4; + + bool deleted = 5; + + string user = 6; + } + + message RuleDesc { + string expr = 1; + string record = 2; + string alert = 3; + google.protobuf.Duration for = 4 [(gogoproto.stdduration) = true]; + repeated cortex.LabelPair labels = 5 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"];; + repeated cortex.LabelPair annotations = 6 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"];; + } \ No newline at end of file diff --git a/pkg/configs/storage/rules/store.go b/pkg/configs/storage/rules/store.go new file mode 100644 index 00000000000..90723d773ec --- /dev/null +++ b/pkg/configs/storage/rules/store.go @@ -0,0 +1,133 @@ +package rules + +import ( + "context" + "errors" + "fmt" + + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/rules" +) + +var ( + // ErrGroupNotFound is returned if a rule group does not exist + ErrGroupNotFound = errors.New("group does not exist") + // ErrGroupNamespaceNotFound is returned if a namespace does not exist + ErrGroupNamespaceNotFound = errors.New("group namespace does not exist") + // ErrUserNotFound is returned if the user does not currently exist + ErrUserNotFound = errors.New("no rule groups found for user") +) + +// RulePoller is used to poll for recently updated rules +type RulePoller interface { + PollRules(ctx context.Context) (map[string][]RuleGroup, error) + Stop() +} + +// RuleStoreConditions are used to filter retrieived results from a rule store +type RuleStoreConditions struct { + // UserID specifies to only retrieve rules with this ID + UserID string + + // Namespaces filters results only rule groups with the specified namespace + // are retrieved + Namespace string +} + +// RuleStore is used to store and retrieve rules +type RuleStore interface { + ListRuleGroups(ctx context.Context, options RuleStoreConditions) (RuleGroupList, error) + GetRuleGroup(ctx context.Context, userID, namespace, group string) (RuleGroup, error) + SetRuleGroup(ctx context.Context, userID, namespace string, group rulefmt.RuleGroup) error + DeleteRuleGroup(ctx context.Context, userID, namespace string, group string) error +} + +// RuleGroup is used to retrieve rules from the database to evaluate, +// an interface is used to allow for lazy evaluation implementations +type RuleGroup interface { + Rules(ctx context.Context) ([]rules.Rule, error) + ID() string + Name() string + Namespace() string + User() string + Formatted() rulefmt.RuleGroup +} + +// RuleGroupList contains a set of rule groups +type RuleGroupList []RuleGroup + +// Formatted returns the rule group list as a set of formatted rule groups mapped +// by namespace +func (l RuleGroupList) Formatted(user string) map[string][]rulefmt.RuleGroup { + ruleMap := map[string][]rulefmt.RuleGroup{} + for _, g := range l { + if g.User() != user { + continue + } + + if _, exists := ruleMap[g.Namespace()]; !exists { + ruleMap[g.Namespace()] = []rulefmt.RuleGroup{g.Formatted()} + } + ruleMap[g.Namespace()] = append(ruleMap[g.Namespace()], g.Formatted()) + + } + return ruleMap +} + +// RuleNamespace is used to parse a slightly modified prometheus +// rule file format, if no namespace is set, the default namespace +// is used +type RuleNamespace struct { + // Namespace field only exists for setting namespace in namespace body instead of file name + Namespace string `yaml:"namespace,omitempty"` + + Groups []rulefmt.RuleGroup `yaml:"groups"` +} + +// Validate each rule in the rule namespace is valid +func (r RuleNamespace) Validate() []error { + set := map[string]struct{}{} + var errs []error + + for _, g := range r.Groups { + if g.Name == "" { + errs = append(errs, fmt.Errorf("Groupname should not be empty")) + } + + if _, ok := set[g.Name]; ok { + errs = append( + errs, + fmt.Errorf("groupname: \"%s\" is repeated in the same namespace", g.Name), + ) + } + + set[g.Name] = struct{}{} + + errs = append(errs, ValidateRuleGroup(g)...) + } + + return errs +} + +// ValidateRuleGroup validates a rulegroup +func ValidateRuleGroup(g rulefmt.RuleGroup) []error { + var errs []error + for i, r := range g.Rules { + for _, err := range r.Validate() { + var ruleName string + if r.Alert != "" { + ruleName = r.Alert + } else { + ruleName = r.Record + } + errs = append(errs, &rulefmt.Error{ + Group: g.Name, + Rule: i, + RuleName: ruleName, + Err: err, + }) + } + } + + return errs +} diff --git a/pkg/configs/storage/testutils/testutils.go b/pkg/configs/storage/testutils/testutils.go new file mode 100644 index 00000000000..79b37382f21 --- /dev/null +++ b/pkg/configs/storage/testutils/testutils.go @@ -0,0 +1,13 @@ +package testutils + +import ( + "github.com/cortexproject/cortex/pkg/configs/storage/alerts" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" +) + +// Fixture type for per-backend testing. +type Fixture interface { + Name() string + Clients() (alerts.AlertStore, rules.RuleStore, error) + Teardown() error +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c899556cb13..eff81f9a716 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -21,7 +21,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/configs/api" - config_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ingester" @@ -320,27 +319,11 @@ func (t *Cortex) initRuler(cfg *Config) (err error) { cfg.Ruler.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort queryable, engine := querier.New(cfg.Querier, t.distributor, t.store) - rulesAPI, err := config_client.New(cfg.ConfigStore) - if err != nil { - return err - } - - t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, rulesAPI) + t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor) if err != nil { return } - // Only serve the API for setting & getting rules configs if we're not - // serving configs from the configs API. Allows for smoother - // migration. See https://github.com/cortexproject/cortex/issues/619 - if cfg.ConfigStore.ConfigsAPIURL.URL == nil { - a, err := ruler.NewAPIFromConfig(cfg.ConfigStore.DBConfig) - if err != nil { - return err - } - a.RegisterRoutes(t.server.HTTP) - } - t.server.HTTP.Handle("/ruler_ring", t.ruler) return } diff --git a/pkg/ruler/api.go b/pkg/ruler/api.go deleted file mode 100644 index aeee8fe5ba2..00000000000 --- a/pkg/ruler/api.go +++ /dev/null @@ -1,118 +0,0 @@ -package ruler - -import ( - "database/sql" - "encoding/json" - "fmt" - "net/http" - - "github.com/go-kit/kit/log/level" - "github.com/gorilla/mux" - - "github.com/cortexproject/cortex/pkg/configs" - "github.com/cortexproject/cortex/pkg/configs/db" - "github.com/cortexproject/cortex/pkg/util" - "github.com/weaveworks/common/user" -) - -// API implements the configs api. -type API struct { - db db.DB - http.Handler -} - -// NewAPIFromConfig makes a new API from our database config. -func NewAPIFromConfig(cfg db.Config) (*API, error) { - db, err := db.New(cfg) - if err != nil { - return nil, err - } - return NewAPI(db), nil -} - -// NewAPI creates a new API. -func NewAPI(db db.DB) *API { - a := &API{db: db} - r := mux.NewRouter() - a.RegisterRoutes(r) - a.Handler = r - return a -} - -// RegisterRoutes registers the configs API HTTP routes with the provided Router. -func (a *API) RegisterRoutes(r *mux.Router) { - for _, route := range []struct { - name, method, path string - handler http.HandlerFunc - }{ - {"get_rules", "GET", "/api/prom/rules", a.getConfig}, - {"cas_rules", "POST", "/api/prom/rules", a.casConfig}, - } { - r.Handle(route.path, route.handler).Methods(route.method).Name(route.name) - } -} - -// getConfig returns the request configuration. -func (a *API) getConfig(w http.ResponseWriter, r *http.Request) { - userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) - if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - logger := util.WithContext(r.Context(), util.Logger) - - cfg, err := a.db.GetRulesConfig(r.Context(), userID) - if err == sql.ErrNoRows { - http.Error(w, "No configuration", http.StatusNotFound) - return - } else if err != nil { - level.Error(logger).Log("msg", "error getting config", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(cfg); err != nil { - level.Error(logger).Log("msg", "error encoding config", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} - -type configUpdateRequest struct { - OldConfig configs.RulesConfig `json:"old_config"` - NewConfig configs.RulesConfig `json:"new_config"` -} - -func (a *API) casConfig(w http.ResponseWriter, r *http.Request) { - userID, _, err := user.ExtractOrgIDFromHTTPRequest(r) - if err != nil { - http.Error(w, err.Error(), http.StatusUnauthorized) - return - } - logger := util.WithContext(r.Context(), util.Logger) - - var updateReq configUpdateRequest - if err := json.NewDecoder(r.Body).Decode(&updateReq); err != nil { - level.Error(logger).Log("msg", "error decoding json body", "err", err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - if _, err = updateReq.NewConfig.Parse(); err != nil { - level.Error(logger).Log("msg", "invalid rules", "err", err) - http.Error(w, fmt.Sprintf("Invalid rules: %v", err), http.StatusBadRequest) - return - } - - updated, err := a.db.SetRulesConfig(r.Context(), userID, updateReq.OldConfig, updateReq.NewConfig) - if err != nil { - level.Error(logger).Log("msg", "error storing config", "err", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - if !updated { - http.Error(w, "Supplied configuration doesn't match current configuration", http.StatusConflict) - } - w.WriteHeader(http.StatusNoContent) -} diff --git a/pkg/ruler/api_test.go b/pkg/ruler/api_test.go deleted file mode 100644 index 3d8e4f51c26..00000000000 --- a/pkg/ruler/api_test.go +++ /dev/null @@ -1,469 +0,0 @@ -package ruler - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "net/http/httptest" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/configs" - "github.com/cortexproject/cortex/pkg/configs/api" - "github.com/cortexproject/cortex/pkg/configs/client" - "github.com/cortexproject/cortex/pkg/configs/db" - "github.com/cortexproject/cortex/pkg/configs/db/dbtest" - "github.com/weaveworks/common/user" -) - -const ( - endpoint = "/api/prom/rules" -) - -var ( - app *API - database db.DB - counter int - privateAPI client.Client -) - -// setup sets up the environment for the tests. -func setup(t *testing.T) { - database = dbtest.Setup(t) - app = NewAPI(database) - counter = 0 - var err error - privateAPI, err = client.New(client.Config{ - DBConfig: db.Config{ - URI: "mock", // trigger client.NewConfigClient to use the mock DB. - Mock: database, - }, - }) - require.NoError(t, err) -} - -// cleanup cleans up the environment after a test. -func cleanup(t *testing.T) { - dbtest.Cleanup(t, database) -} - -// request makes a request to the configs API. -func request(t *testing.T, handler http.Handler, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { - w := httptest.NewRecorder() - r, err := http.NewRequest(method, urlStr, body) - require.NoError(t, err) - handler.ServeHTTP(w, r) - return w -} - -// requestAsUser makes a request to the configs API as the given user. -func requestAsUser(t *testing.T, handler http.Handler, userID string, method, urlStr string, body io.Reader) *httptest.ResponseRecorder { - w := httptest.NewRecorder() - r, err := http.NewRequest(method, urlStr, body) - require.NoError(t, err) - r = r.WithContext(user.InjectOrgID(r.Context(), userID)) - user.InjectOrgIDIntoHTTPRequest(r.Context(), r) - handler.ServeHTTP(w, r) - return w -} - -// makeString makes a string, guaranteed to be unique within a test. -func makeString(pattern string) string { - counter++ - return fmt.Sprintf(pattern, counter) -} - -// makeUserID makes an arbitrary user ID. Guaranteed to be unique within a test. -func makeUserID() string { - return makeString("user%d") -} - -// makeRulerConfig makes an arbitrary ruler config -func makeRulerConfig(rfv configs.RuleFormatVersion) configs.RulesConfig { - switch rfv { - case configs.RuleFormatV1: - return configs.RulesConfig{ - Files: map[string]string{ - "filename.rules": makeString(` -# Config no. %d. -ALERT ScrapeFailed - IF up != 1 - FOR 10m - LABELS { severity="warning" } - ANNOTATIONS { - summary = "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed.", - description = "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod.", - impact = "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues.", - dashboardURL = "$${base_url}/admin/prometheus/targets", - } - `), - }, - FormatVersion: configs.RuleFormatV1, - } - case configs.RuleFormatV2: - return configs.RulesConfig{ - Files: map[string]string{ - "filename.rules": makeString(` -# Config no. %d. -groups: -- name: example - rules: - - alert: ScrapeFailed - expr: 'up != 1' - for: 10m - labels: - severity: warning - annotations: - summary: "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed." - description: "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod." - impact: "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues." - dashboardURL: "$${base_url}/admin/prometheus/targets" - `), - }, - FormatVersion: configs.RuleFormatV2, - } - default: - panic("unknown rule format") - } -} - -// parseVersionedRulesConfig parses a configs.VersionedRulesConfig from JSON. -func parseVersionedRulesConfig(t *testing.T, b []byte) configs.VersionedRulesConfig { - var x configs.VersionedRulesConfig - err := json.Unmarshal(b, &x) - require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) - return x -} - -// post a config -func post(t *testing.T, userID string, oldConfig configs.RulesConfig, newConfig configs.RulesConfig) configs.VersionedRulesConfig { - updateRequest := configUpdateRequest{ - OldConfig: oldConfig, - NewConfig: newConfig, - } - b, err := json.Marshal(updateRequest) - require.NoError(t, err) - reader := bytes.NewReader(b) - w := requestAsUser(t, app, userID, "POST", endpoint, reader) - require.Equal(t, http.StatusNoContent, w.Code) - return get(t, userID) -} - -// get a config -func get(t *testing.T, userID string) configs.VersionedRulesConfig { - w := requestAsUser(t, app, userID, "GET", endpoint, nil) - return parseVersionedRulesConfig(t, w.Body.Bytes()) -} - -// configs returns 404 if no config has been created yet. -func Test_GetConfig_NotFound(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - w := requestAsUser(t, app, userID, "GET", endpoint, nil) - assert.Equal(t, http.StatusNotFound, w.Code) -} - -// configs returns 401 to requests without authentication. -func Test_PostConfig_Anonymous(t *testing.T) { - setup(t) - defer cleanup(t) - - w := request(t, app, "POST", endpoint, nil) - assert.Equal(t, http.StatusUnauthorized, w.Code) -} - -// Posting to a configuration sets it so that you can get it again. -func Test_PostConfig_CreatesConfig(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - config := makeRulerConfig(configs.RuleFormatV2) - result := post(t, userID, configs.RulesConfig{}, config) - assert.Equal(t, config, result.Config) -} - -// Posting an invalid config when there's none set returns an error and leaves the config unset. -func Test_PostConfig_InvalidNewConfig(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - invalidConfig := configs.RulesConfig{ - Files: map[string]string{ - "some.rules": "invalid config", - }, - FormatVersion: configs.RuleFormatV2, - } - updateRequest := configUpdateRequest{ - OldConfig: configs.RulesConfig{}, - NewConfig: invalidConfig, - } - b, err := json.Marshal(updateRequest) - require.NoError(t, err) - reader := bytes.NewReader(b) - { - w := requestAsUser(t, app, userID, "POST", endpoint, reader) - require.Equal(t, http.StatusBadRequest, w.Code) - } - { - w := requestAsUser(t, app, userID, "GET", endpoint, nil) - require.Equal(t, http.StatusNotFound, w.Code) - } -} - -// Posting a v1 rule format configuration sets it so that you can get it again. -func Test_PostConfig_UpdatesConfig_V1RuleFormat(t *testing.T) { - setup(t) - app = NewAPI(database) - defer cleanup(t) - - userID := makeUserID() - config1 := makeRulerConfig(configs.RuleFormatV1) - view1 := post(t, userID, configs.RulesConfig{}, config1) - config2 := makeRulerConfig(configs.RuleFormatV1) - view2 := post(t, userID, config1, config2) - assert.True(t, view2.ID > view1.ID, "%v > %v", view2.ID, view1.ID) - assert.Equal(t, config2, view2.Config) -} - -// Posting an invalid v1 rule format config when there's one already set returns an error and leaves the config as is. -func Test_PostConfig_InvalidChangedConfig_V1RuleFormat(t *testing.T) { - setup(t) - app = NewAPI(database) - defer cleanup(t) - - userID := makeUserID() - config := makeRulerConfig(configs.RuleFormatV1) - post(t, userID, configs.RulesConfig{}, config) - invalidConfig := configs.RulesConfig{ - Files: map[string]string{ - "some.rules": "invalid config", - }, - FormatVersion: configs.RuleFormatV1, - } - updateRequest := configUpdateRequest{ - OldConfig: configs.RulesConfig{}, - NewConfig: invalidConfig, - } - b, err := json.Marshal(updateRequest) - require.NoError(t, err) - reader := bytes.NewReader(b) - { - w := requestAsUser(t, app, userID, "POST", endpoint, reader) - require.Equal(t, http.StatusBadRequest, w.Code) - } - result := get(t, userID) - assert.Equal(t, config, result.Config) -} - -// Posting a v2 rule format configuration sets it so that you can get it again. -func Test_PostConfig_UpdatesConfig_V2RuleFormat(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - config1 := makeRulerConfig(configs.RuleFormatV2) - view1 := post(t, userID, configs.RulesConfig{}, config1) - config2 := makeRulerConfig(configs.RuleFormatV2) - view2 := post(t, userID, config1, config2) - assert.True(t, view2.ID > view1.ID, "%v > %v", view2.ID, view1.ID) - assert.Equal(t, config2, view2.Config) -} - -// Posting an invalid v2 rule format config when there's one already set returns an error and leaves the config as is. -func Test_PostConfig_InvalidChangedConfig_V2RuleFormat(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - config := makeRulerConfig(configs.RuleFormatV2) - post(t, userID, configs.RulesConfig{}, config) - invalidConfig := configs.RulesConfig{ - Files: map[string]string{ - "some.rules": "invalid config", - }, - } - updateRequest := configUpdateRequest{ - OldConfig: configs.RulesConfig{}, - NewConfig: invalidConfig, - } - b, err := json.Marshal(updateRequest) - require.NoError(t, err) - reader := bytes.NewReader(b) - { - w := requestAsUser(t, app, userID, "POST", endpoint, reader) - require.Equal(t, http.StatusBadRequest, w.Code) - } - result := get(t, userID) - assert.Equal(t, config, result.Config) -} - -// Posting a config with an invalid rule format version returns an error and leaves the config as is. -func Test_PostConfig_InvalidChangedConfig_InvalidRuleFormat(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - config := makeRulerConfig(configs.RuleFormatV2) - post(t, userID, configs.RulesConfig{}, config) - - // We have to provide the marshaled JSON manually here because json.Marshal() would error - // on a bad rule format version. - reader := strings.NewReader(`{"old_config":{"format_version":"1","files":null},"new_config":{"format_version":"","files":{"filename.rules":"# Empty."}}}`) - { - w := requestAsUser(t, app, userID, "POST", endpoint, reader) - require.Equal(t, http.StatusBadRequest, w.Code) - } - result := get(t, userID) - assert.Equal(t, config, result.Config) -} - -// Different users can have different configurations. -func Test_PostConfig_MultipleUsers(t *testing.T) { - setup(t) - defer cleanup(t) - - userID1 := makeUserID() - userID2 := makeUserID() - config1 := post(t, userID1, configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - config2 := post(t, userID2, configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - foundConfig1 := get(t, userID1) - assert.Equal(t, config1, foundConfig1) - foundConfig2 := get(t, userID2) - assert.Equal(t, config2, foundConfig2) - assert.True(t, config2.ID > config1.ID, "%v > %v", config2.ID, config1.ID) -} - -// GetAllConfigs returns an empty list of configs if there aren't any. -func Test_GetAllConfigs_Empty(t *testing.T) { - setup(t) - defer cleanup(t) - - configs, err := privateAPI.GetRules(context.Background(), 0) - assert.NoError(t, err, "error getting configs") - assert.Equal(t, 0, len(configs)) -} - -// GetAllConfigs returns all created configs. -func Test_GetAllConfigs(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - config := makeRulerConfig(configs.RuleFormatV2) - view := post(t, userID, configs.RulesConfig{}, config) - - found, err := privateAPI.GetRules(context.Background(), 0) - assert.NoError(t, err, "error getting configs") - assert.Equal(t, map[string]configs.VersionedRulesConfig{ - userID: view, - }, found) -} - -// GetAllConfigs returns the *newest* versions of all created configs. -func Test_GetAllConfigs_Newest(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - - config1 := post(t, userID, configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - config2 := post(t, userID, config1.Config, makeRulerConfig(configs.RuleFormatV2)) - lastCreated := post(t, userID, config2.Config, makeRulerConfig(configs.RuleFormatV2)) - - found, err := privateAPI.GetRules(context.Background(), 0) - assert.NoError(t, err, "error getting configs") - assert.Equal(t, map[string]configs.VersionedRulesConfig{ - userID: lastCreated, - }, found) -} - -func Test_GetConfigs_IncludesNewerConfigsAndExcludesOlder(t *testing.T) { - setup(t) - defer cleanup(t) - - post(t, makeUserID(), configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - config2 := post(t, makeUserID(), configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - userID3 := makeUserID() - config3 := post(t, userID3, configs.RulesConfig{}, makeRulerConfig(configs.RuleFormatV2)) - - found, err := privateAPI.GetRules(context.Background(), config2.ID) - assert.NoError(t, err, "error getting configs") - assert.Equal(t, map[string]configs.VersionedRulesConfig{ - userID3: config3, - }, found) -} - -// postAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. -func postAlertmanagerConfig(t *testing.T, userID, configFile string) { - config := configs.Config{ - AlertmanagerConfig: configFile, - RulesConfig: configs.RulesConfig{}, - } - b, err := json.Marshal(config) - require.NoError(t, err) - reader := bytes.NewReader(b) - configsAPI := api.New(database) - w := requestAsUser(t, configsAPI, userID, "POST", "/api/prom/configs/alertmanager", reader) - require.Equal(t, http.StatusNoContent, w.Code) -} - -// getAlertmanagerConfig posts an alertmanager config to the alertmanager configs API. -func getAlertmanagerConfig(t *testing.T, userID string) string { - w := requestAsUser(t, api.New(database), userID, "GET", "/api/prom/configs/alertmanager", nil) - var x configs.View - b := w.Body.Bytes() - err := json.Unmarshal(b, &x) - require.NoError(t, err, "Could not unmarshal JSON: %v", string(b)) - return x.Config.AlertmanagerConfig -} - -// If a user has only got alertmanager config set, then we learn nothing about them via GetConfigs. -func Test_AlertmanagerConfig_NotInAllConfigs(t *testing.T) { - setup(t) - defer cleanup(t) - - config := makeString(` - # Config no. %d. - route: - receiver: noop - - receivers: - - name: noop`) - postAlertmanagerConfig(t, makeUserID(), config) - - found, err := privateAPI.GetRules(context.Background(), 0) - assert.NoError(t, err, "error getting configs") - assert.Equal(t, map[string]configs.VersionedRulesConfig{}, found) -} - -// Setting a ruler config doesn't change alertmanager config. -func Test_AlertmanagerConfig_RulerConfigDoesntChangeIt(t *testing.T) { - setup(t) - defer cleanup(t) - - userID := makeUserID() - alertmanagerConfig := makeString(` - # Config no. %d. - route: - receiver: noop - - receivers: - - name: noop`) - postAlertmanagerConfig(t, userID, alertmanagerConfig) - - rulerConfig := makeRulerConfig(configs.RuleFormatV2) - post(t, userID, configs.RulesConfig{}, rulerConfig) - - newAlertmanagerConfig := getAlertmanagerConfig(t, userID) - assert.Equal(t, alertmanagerConfig, newAlertmanagerConfig) -} diff --git a/pkg/ruler/group.go b/pkg/ruler/group.go index a9a9d34678e..be56338f35d 100644 --- a/pkg/ruler/group.go +++ b/pkg/ruler/group.go @@ -7,24 +7,24 @@ import ( "github.com/prometheus/prometheus/rules" ) -// group is a wrapper around a prometheus rules.Group, with a mutable appendable +// wrappedGroup is a wrapper around a prometheus rules.Group, with a mutable appendable // appendable stored here will be the same appendable as in promGroup.opts.Appendable -type group struct { +type wrappedGroup struct { promGroup *rules.Group appendable *appendableAppender } -func newGroup(name string, rls []rules.Rule, appendable *appendableAppender, opts *rules.ManagerOptions) *group { +func newGroup(name string, rls []rules.Rule, appendable *appendableAppender, opts *rules.ManagerOptions) *wrappedGroup { delay := 0 * time.Second // Unused, so 0 value is fine. promGroup := rules.NewGroup(name, "none", delay, rls, false, opts) - return &group{promGroup, appendable} + return &wrappedGroup{promGroup, appendable} } -func (g *group) Eval(ctx context.Context, ts time.Time) { +func (g *wrappedGroup) Eval(ctx context.Context, ts time.Time) { g.appendable.ctx = ctx g.promGroup.Eval(ctx, ts) } -func (g *group) Rules() []rules.Rule { +func (g *wrappedGroup) Rules() []rules.Rule { return g.promGroup.Rules() } diff --git a/pkg/ruler/mock_store.go b/pkg/ruler/mock_store.go new file mode 100644 index 00000000000..750a6f81863 --- /dev/null +++ b/pkg/ruler/mock_store.go @@ -0,0 +1,116 @@ +package ruler + +import ( + "context" + "strings" + "sync" + + "github.com/cortexproject/cortex/pkg/configs/storage/rules" + "github.com/prometheus/prometheus/pkg/rulefmt" +) + +type mockRuleStore struct { + sync.Mutex + rules map[string]rules.RuleGroup + + pollPayload map[string][]rules.RuleGroup +} + +func (m *mockRuleStore) PollRules(ctx context.Context) (map[string][]rules.RuleGroup, error) { + m.Lock() + defer m.Unlock() + pollPayload := m.pollPayload + m.pollPayload = map[string][]rules.RuleGroup{} + return pollPayload, nil +} + +func (m *mockRuleStore) Stop() {} + +// RuleStore returns an RuleStore from the client +func (m *mockRuleStore) RuleStore() rules.RuleStore { + return m +} + +func (m *mockRuleStore) ListRuleGroups(ctx context.Context, options rules.RuleStoreConditions) (rules.RuleGroupList, error) { + m.Lock() + defer m.Unlock() + + groupPrefix := options.UserID + ":" + + namespaces := []string{} + nss := rules.RuleGroupList{} + for n := range m.rules { + if strings.HasPrefix(n, groupPrefix) { + components := strings.Split(n, ":") + if len(components) != 3 { + continue + } + namespaces = append(namespaces, components[1]) + } + } + + if len(namespaces) == 0 { + return nss, rules.ErrUserNotFound + } + + for _, n := range namespaces { + ns, err := m.getRuleNamespace(ctx, options.UserID, n) + if err != nil { + continue + } + + nss = append(nss, ns...) + } + + return nss, nil +} + +func (m *mockRuleStore) getRuleNamespace(ctx context.Context, userID string, namespace string) (rules.RuleGroupList, error) { + groupPrefix := userID + ":" + namespace + ":" + + ns := rules.RuleGroupList{} + for n, g := range m.rules { + if strings.HasPrefix(n, groupPrefix) { + ns = append(ns, g) + } + } + + if len(ns) == 0 { + return ns, rules.ErrGroupNamespaceNotFound + } + + return ns, nil +} + +func (m *mockRuleStore) GetRuleGroup(ctx context.Context, userID string, namespace string, group string) (rules.RuleGroup, error) { + m.Lock() + defer m.Unlock() + + groupID := userID + ":" + namespace + ":" + group + g, ok := m.rules[groupID] + + if !ok { + return nil, rules.ErrGroupNotFound + } + + return g, nil + +} + +func (m *mockRuleStore) SetRuleGroup(ctx context.Context, userID string, namespace string, group rulefmt.RuleGroup) error { + m.Lock() + defer m.Unlock() + + groupID := userID + ":" + namespace + ":" + group.Name + m.rules[groupID] = rules.NewRuleGroup(group.Name, namespace, userID, group.Rules) + return nil +} + +func (m *mockRuleStore) DeleteRuleGroup(ctx context.Context, userID string, namespace string, group string) error { + m.Lock() + defer m.Unlock() + + groupID := userID + ":" + namespace + ":" + group + delete(m.rules, groupID) + return nil +} diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 790d0a12f00..63b089daac5 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -10,21 +10,21 @@ import ( "time" "github.com/go-kit/kit/log/level" - opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/notifier" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/rules" - "github.com/prometheus/prometheus/storage" + promRules "github.com/prometheus/prometheus/rules" + promStorage "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/strutil" "golang.org/x/net/context" "golang.org/x/net/context/ctxhttp" - "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" + store "github.com/cortexproject/cortex/pkg/configs/storage/rules" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/instrument" @@ -36,24 +36,17 @@ var ( Namespace: "cortex", Name: "group_evaluation_duration_seconds", Help: "The duration for a rule group to execute.", - Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 25}, - }) - rulesProcessed = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "rules_processed_total", - Help: "How many rules have been processed.", + Buckets: []float64{.5, 1, 2.5, 5, 10, 25, 60, 120}, }) ringCheckErrors = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "cortex", Name: "ruler_ring_check_errors_total", Help: "Number of errors that have occurred when checking the ring for ownership", }) - ruleMetrics *rules.Metrics ) func init() { evalDuration.Register() - ruleMetrics = rules.NewGroupMetrics(prometheus.DefaultRegisterer) } // Config is the configuration for the recording rules server. @@ -85,11 +78,14 @@ type Config struct { SearchPendingFor time.Duration LifecyclerConfig ring.LifecyclerConfig FlushCheckPeriod time.Duration + + StoreConfig RuleStoreConfig } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("ruler.", f) + cfg.StoreConfig.RegisterFlags(f) cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") @@ -113,7 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Ruler struct { cfg Config engine *promql.Engine - queryable storage.Queryable + queryable promStorage.Queryable pusher Pusher alertURL *url.URL notifierCfg *config.Config @@ -124,13 +120,19 @@ type Ruler struct { lifecycler *ring.Lifecycler ring *ring.Ring + store rules.RuleStore + // Per-user notifiers with separate queues. notifiersMtx sync.Mutex notifiers map[string]*rulerNotifier + + // Per-user rules metrics + userMetricsMtx sync.Mutex + userMetrics map[string]*promRules.Metrics } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d *distributor.Distributor, rulesAPI client.Client) (*Ruler, error) { +func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable, d *distributor.Distributor) (*Ruler, error) { if cfg.NumWorkers <= 0 { return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers) } @@ -140,6 +142,11 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d return nil, err } + rulePoller, ruleStore, err := NewRuleStorage(cfg.StoreConfig) + if err != nil { + return nil, err + } + ruler := &Ruler{ cfg: cfg, engine: engine, @@ -149,9 +156,11 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d notifierCfg: ncfg, notifiers: map[string]*rulerNotifier{}, workerWG: &sync.WaitGroup{}, + userMetrics: map[string]*promRules.Metrics{}, + store: ruleStore, } - ruler.scheduler = newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup) + ruler.scheduler = newScheduler(rulePoller, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup) // If sharding is enabled, create/join a ring to distribute tokens to // the ruler @@ -208,35 +217,53 @@ func (r *Ruler) Stop() { } } -func (r *Ruler) newGroup(userID string, groupName string, rls []rules.Rule) (*group, error) { +func (r *Ruler) newGroup(ctx context.Context, g store.RuleGroup) (*wrappedGroup, error) { + user := g.User() appendable := &appendableAppender{pusher: r.pusher} - notifier, err := r.getOrCreateNotifier(userID) + notifier, err := r.getOrCreateNotifier(user) + if err != nil { + return nil, err + } + + rls, err := g.Rules(ctx) if err != nil { return nil, err } - opts := &rules.ManagerOptions{ + + // Get the rule group metrics for set user or create it if it does not exist + r.userMetricsMtx.Lock() + metrics, exists := r.userMetrics[user] + if !exists { + // Wrap the default register with the users ID and pass + reg := prometheus.WrapRegistererWith(prometheus.Labels{"user": user}, prometheus.DefaultRegisterer) + metrics = promRules.NewGroupMetrics(reg) + r.userMetrics[user] = metrics + } + r.userMetricsMtx.Unlock() + + opts := &promRules.ManagerOptions{ Appendable: appendable, - QueryFunc: rules.EngineQueryFunc(r.engine, r.queryable), + QueryFunc: promRules.EngineQueryFunc(r.engine, r.queryable), Context: context.Background(), ExternalURL: r.alertURL, NotifyFunc: sendAlerts(notifier, r.alertURL.String()), Logger: util.Logger, - Metrics: ruleMetrics, + Metrics: metrics, } - return newGroup(groupName, rls, appendable, opts), nil + return newGroup(g.ID(), rls, appendable, opts), nil } -// sendAlerts implements a rules.NotifyFunc for a Notifier. +// sendAlerts implements a promRules.NotifyFunc for a Notifier. // It filters any non-firing alerts from the input. // // Copied from Prometheus's main.go. -func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc { - return func(ctx native_ctx.Context, expr string, alerts ...*rules.Alert) { +func sendAlerts(n *notifier.Manager, externalURL string) promRules.NotifyFunc { + return func(ctx native_ctx.Context, expr string, alerts ...*promRules.Alert) { var res []*notifier.Alert for _, alert := range alerts { // Only send actually firing alerts. - if alert.State == rules.StatePending { + if alert.State == promRules.StatePending { continue } a := ¬ifier.Alert{ @@ -292,33 +319,6 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) { return n.notifier, nil } -// Evaluate a list of rules in the given context. -func (r *Ruler) Evaluate(userID string, item *workItem) { - ctx := user.InjectOrgID(context.Background(), userID) - logger := util.WithContext(ctx, util.Logger) - if r.cfg.EnableSharding && !r.ownsRule(item.hash) { - level.Debug(util.Logger).Log("msg", "ruler: skipping evaluation, not owned", "user_id", item.userID, "group", item.groupName) - return - } - level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(item.group.Rules())) - ctx, cancelTimeout := context.WithTimeout(ctx, r.cfg.GroupTimeout) - instrument.CollectedRequest(ctx, "Evaluate", evalDuration, nil, func(ctx native_ctx.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil { - span.SetTag("instance", userID) - span.SetTag("groupName", item.groupName) - } - item.group.Eval(ctx, time.Now()) - return nil - }) - if err := ctx.Err(); err == nil { - cancelTimeout() // release resources - } else { - level.Warn(logger).Log("msg", "context error", "error", err) - } - - rulesProcessed.Add(float64(len(item.group.Rules()))) -} - func (r *Ruler) ownsRule(hash uint32) bool { rlrs, err := r.ring.Get(hash, ring.Read) // If an error occurs evaluate a rule as if it is owned @@ -330,9 +330,10 @@ func (r *Ruler) ownsRule(hash uint32) bool { return true } if rlrs.Ingesters[0].Addr == r.lifecycler.Addr { + level.Debug(util.Logger).Log("msg", "rule group owned", "owner_addr", rlrs.Ingesters[0].Addr, "addr", r.lifecycler.Addr) return true } - level.Debug(util.Logger).Log("msg", "rule group not owned, address does not match", "owner", rlrs.Ingesters[0].Addr, "current", r.cfg.LifecyclerConfig.Addr) + level.Debug(util.Logger).Log("msg", "rule group not owned, address does not match", "owner_addr", rlrs.Ingesters[0].Addr, "addr", r.lifecycler.Addr) return false } @@ -353,6 +354,9 @@ func (r *Ruler) ServeHTTP(w http.ResponseWriter, req *http.Request) { ` w.WriteHeader(http.StatusOK) - w.Write([]byte(unshardedPage)) + _, err := w.Write([]byte(unshardedPage)) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to serve status page", "err", err) + } } } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 51725da5c7b..c125a85beb1 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -1,42 +1,34 @@ package ruler import ( - "context" "net/http" "net/http/httptest" "sync" "testing" "time" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/configs" - client_config "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/consul" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/prometheus/prometheus/notifier" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" "github.com/stretchr/testify/assert" "github.com/weaveworks/common/user" ) -type mockRuleStore struct{} - -func (m *mockRuleStore) GetRules(ctx context.Context, since configs.ID) (map[string]configs.VersionedRulesConfig, error) { - return map[string]configs.VersionedRulesConfig{}, nil -} - -func (m *mockRuleStore) GetAlerts(ctx context.Context, since configs.ID) (*client_config.ConfigsResponse, error) { - return nil, nil -} - func defaultRulerConfig() Config { codec := codec.Proto{Factory: ring.ProtoDescFactory} consul := consul.NewInMemoryClient(codec) - cfg := Config{} + cfg := Config{ + StoreConfig: RuleStoreConfig{ + mock: &mockRuleStore{}, + }, + } flagext.DefaultValues(&cfg) flagext.DefaultValues(&cfg.LifecyclerConfig) cfg.LifecyclerConfig.RingConfig.ReplicationFactor = 1 @@ -59,7 +51,8 @@ func newTestRuler(t *testing.T, cfg Config) *Ruler { Timeout: 2 * time.Minute, }) queryable := querier.NewQueryable(nil, nil, nil, 0) - ruler, err := NewRuler(cfg, engine, queryable, nil, &mockRuleStore{}) + ruler, err := NewRuler(cfg, engine, queryable, nil) + ruler.store = &mockRuleStore{} if err != nil { t.Fatal(err) } @@ -78,7 +71,8 @@ func TestNotifierSendsUserIDHeader(t *testing.T) { defer ts.Close() cfg := defaultRulerConfig() - cfg.AlertmanagerURL.Set(ts.URL) + err := cfg.AlertmanagerURL.Set(ts.URL) + require.NoError(t, err) cfg.AlertmanagerDiscovery = false r := newTestRuler(t, cfg) diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index 3a9ff8ec704..8f515a0c3bb 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -9,24 +9,14 @@ import ( "sync" "time" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" + "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/jonboulle/clockwork" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/rules" - - "github.com/cortexproject/cortex/pkg/configs" - config_client "github.com/cortexproject/cortex/pkg/configs/client" - "github.com/cortexproject/cortex/pkg/util" ) -var backoffConfig = util.BackoffConfig{ - // Backoff for loading initial configuration set. - MinBackoff: 100 * time.Millisecond, - MaxBackoff: 2 * time.Second, -} - const ( timeLogFormat = "2006-01-02T15:04:05" ) @@ -35,32 +25,39 @@ var ( totalConfigs = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "scheduler_configs_total", - Help: "How many configs the scheduler knows about.", + Help: "How many user configs the scheduler knows about.", }) totalRuleGroups = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "scheduler_groups_total", Help: "How many rule groups the scheduler is currently evaluating", }) - configUpdates = promauto.NewCounter(prometheus.CounterOpts{ + evalLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", - Name: "scheduler_config_updates_total", - Help: "How many config updates the scheduler has made.", + Name: "group_evaluation_latency_seconds", + Help: "How far behind the target time each rule group executed.", + Buckets: []float64{.025, .05, .1, .25, .5, 1, 2.5, 5, 10, 25, 60}, }) + iterationsMissed = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "rule_group_iterations_missed_total", + Help: "The total number of rule group evaluations missed due to slow rule group evaluation.", + }, []string{"user"}) ) type workItem struct { - userID string - groupName string - hash uint32 - group *group - scheduled time.Time - generation configs.ID // a monotonically increasing number used to spot out of date work items + userID string + groupID string + hash uint32 + group *wrappedGroup + scheduled time.Time + + done chan struct{} } // Key implements ScheduledItem func (w workItem) Key() string { - return w.userID + ":" + w.groupName + return w.userID + ":" + w.groupID } // Scheduled implements ScheduledItem @@ -68,40 +65,35 @@ func (w workItem) Scheduled() time.Time { return w.scheduled } -// Defer returns a work item with updated rules, rescheduled to a later time. -func (w workItem) Defer(interval time.Duration) workItem { - return workItem{w.userID, w.groupName, w.hash, w.group, w.scheduled.Add(interval), w.generation} -} - func (w workItem) String() string { - return fmt.Sprintf("%s:%s:%d@%s", w.userID, w.groupName, len(w.group.Rules()), w.scheduled.Format(timeLogFormat)) + return fmt.Sprintf("%s:%s:%d@%s", w.userID, w.groupID, len(w.group.Rules()), w.scheduled.Format(timeLogFormat)) } type userConfig struct { - rules map[string][]rules.Rule - generation configs.ID // a monotonically increasing number used to spot out of date work items + done chan struct{} + id string + rules []rules.RuleGroup } -type groupFactory func(userID string, groupName string, rls []rules.Rule) (*group, error) +type groupFactory func(context.Context, rules.RuleGroup) (*wrappedGroup, error) type scheduler struct { - ruleStore config_client.Client + poller rules.RulePoller evaluationInterval time.Duration // how often we re-evaluate each rule set q *SchedulingQueue pollInterval time.Duration // how often we check for new config - cfgs map[string]userConfig // all rules for all users - latestConfig configs.ID // # of last update received from config - groupFn groupFactory // function to create a new group + cfgs map[string]userConfig // all rules for all users + groupFn groupFactory // function to create a new group sync.RWMutex done chan struct{} } // newScheduler makes a new scheduler. -func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) *scheduler { +func newScheduler(poller rules.RulePoller, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) *scheduler { return &scheduler{ - ruleStore: ruleStore, + poller: poller, evaluationInterval: evaluationInterval, pollInterval: pollInterval, q: NewSchedulingQueue(clockwork.NewRealClock()), @@ -116,13 +108,16 @@ func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterv func (s *scheduler) Run() { level.Debug(util.Logger).Log("msg", "scheduler started") - // Load initial set of all configurations before polling for new ones. - s.addNewConfigs(time.Now(), s.loadAllConfigs()) + err := s.updateConfigs(context.TODO()) + if err != nil { + level.Error(util.Logger).Log("msg", "scheduler: error updating rule groups", "err", err) + } + ticker := time.NewTicker(s.pollInterval) for { select { - case now := <-ticker.C: - err := s.updateConfigs(now) + case <-ticker.C: + err := s.updateConfigs(context.TODO()) if err != nil { level.Warn(util.Logger).Log("msg", "scheduler: error updating configs", "err", err) } @@ -135,72 +130,97 @@ func (s *scheduler) Run() { } func (s *scheduler) Stop() { + s.poller.Stop() close(s.done) s.q.Close() level.Debug(util.Logger).Log("msg", "scheduler stopped") } -// Load the full set of configurations from the server, retrying with backoff -// until we can get them. -func (s *scheduler) loadAllConfigs() map[string]configs.VersionedRulesConfig { - backoff := util.NewBackoff(context.Background(), backoffConfig) - for { - cfgs, err := s.poll() - if err == nil { - level.Debug(util.Logger).Log("msg", "scheduler: initial configuration load", "num_configs", len(cfgs)) - return cfgs - } - level.Warn(util.Logger).Log("msg", "scheduler: error fetching all configurations, backing off", "err", err) - backoff.Wait() - } -} - -func (s *scheduler) updateConfigs(now time.Time) error { - cfgs, err := s.poll() +func (s *scheduler) updateConfigs(ctx context.Context) error { + cfgs, err := s.poller.PollRules(ctx) if err != nil { return err } - s.addNewConfigs(now, cfgs) + + for user, cfg := range cfgs { + s.addUserConfig(ctx, user, cfg) + } + + totalConfigs.Set(float64(len(s.cfgs))) return nil } -// poll the configuration server. Not re-entrant. -func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { - s.Lock() - configID := s.latestConfig - s.Unlock() +func (s *scheduler) addUserConfig(ctx context.Context, userID string, rgs []rules.RuleGroup) { + level.Info(util.Logger).Log("msg", "scheduler: updating rules for user", "user_id", userID, "num_groups", len(rgs)) - cfgs, err := s.ruleStore.GetRules(context.Background(), configID) // Warning: this will produce an incorrect result if the configID ever overflows - if err != nil { - level.Warn(util.Logger).Log("msg", "scheduler: configs server poll failed", "err", err) - return nil, err + // create a new userchan for rulegroups of this user + userChan := make(chan struct{}) + + ringHasher := fnv.New32a() + workItems := []workItem{} + evalTime := s.determineEvalTime(userID) + + for _, rg := range rgs { + level.Debug(util.Logger).Log("msg", "scheduler: updating rules for user and group", "user_id", userID, "group", rg.ID()) + grp, err := s.groupFn(ctx, rg) + if err != nil { + level.Error(util.Logger).Log("msg", "scheduler: failed to create group for user", "user_id", userID, "group", rg.ID(), "err", err) + return + } + + ringHasher.Reset() + _, err = ringHasher.Write([]byte(rg.ID())) + if err != nil { + level.Error(util.Logger).Log("msg", "scheduler: failed to create group for user", "user_id", userID, "group", rg.ID(), "err", err) + return + } + + hash := ringHasher.Sum32() + workItems = append(workItems, workItem{userID, rg.ID(), hash, grp, evalTime, userChan}) + } + + s.updateUserConfig(ctx, userConfig{ + id: userID, + rules: rgs, + done: userChan, + }) + + for _, i := range workItems { + s.addWorkItem(i) } + + totalRuleGroups.Add(float64(len(workItems))) +} + +func (s *scheduler) updateUserConfig(ctx context.Context, cfg userConfig) { + // Retrieve any previous configuration and update to the new configuration s.Lock() - s.latestConfig = getLatestConfigID(cfgs, configID) + curr, exists := s.cfgs[cfg.id] + s.cfgs[cfg.id] = cfg s.Unlock() - return cfgs, nil -} -// getLatestConfigID gets the latest configs ID. -// max [latest, max (map getID cfgs)] -func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID { - ret := latest - for _, config := range cfgs { - if config.ID > ret { - ret = config.ID - } + if exists { + close(curr.done) // If a previous configuration exists, ensure it is closed } - return ret +} + +func (s *scheduler) determineEvalTime(userID string) time.Time { + now := time.Now() + hasher := fnv.New64a() + return computeNextEvalTime(hasher, now, float64(s.evaluationInterval.Nanoseconds()), userID) } // computeNextEvalTime Computes when a user's rules should be next evaluated, based on how far we are through an evaluation cycle -func (s *scheduler) computeNextEvalTime(hasher hash.Hash64, now time.Time, userID string) time.Time { - intervalNanos := float64(s.evaluationInterval.Nanoseconds()) +func computeNextEvalTime(hasher hash.Hash64, now time.Time, intervalNanos float64, userID string) time.Time { // Compute how far we are into the current evaluation cycle currentEvalCyclePoint := math.Mod(float64(now.UnixNano()), intervalNanos) hasher.Reset() - hasher.Write([]byte(userID)) + _, err := hasher.Write([]byte(userID)) + if err != nil { + // if an error occurs just return the current time plus a minute + return now.Add(time.Minute) + } offset := math.Mod( // We subtract our current point in the cycle to cause the entries // before 'now' to wrap around to the end. @@ -210,78 +230,13 @@ func (s *scheduler) computeNextEvalTime(hasher hash.Hash64, now time.Time, userI return now.Add(time.Duration(int64(offset))) } -func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) { - // TODO: instrument how many configs we have, both valid & invalid. - level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) - hasher := fnv.New64a() - s.Lock() - generation := s.latestConfig - s.Unlock() - - for userID, config := range cfgs { - s.addUserConfig(now, hasher, generation, userID, config) - } - - configUpdates.Add(float64(len(cfgs))) - s.Lock() - lenCfgs := len(s.cfgs) - s.Unlock() - totalConfigs.Set(float64(lenCfgs)) -} - -func (s *scheduler) addUserConfig(now time.Time, hasher hash.Hash64, generation configs.ID, userID string, config configs.VersionedRulesConfig) { - rulesByGroup, err := config.Config.Parse() - if err != nil { - // XXX: This means that if a user has a working configuration and - // they submit a broken one, we'll keep processing the last known - // working configuration, and they'll never know. - // TODO: Provide a way of deleting / cancelling recording rules. - level.Warn(util.Logger).Log("msg", "scheduler: invalid Cortex configuration", "user_id", userID, "err", err) - return - } - - level.Info(util.Logger).Log("msg", "scheduler: updating rules for user", "user_id", userID, "num_groups", len(rulesByGroup), "is_deleted", config.IsDeleted()) - s.Lock() - // if deleted remove from map, otherwise - update map - if config.IsDeleted() { - delete(s.cfgs, userID) - s.Unlock() - return - } - s.cfgs[userID] = userConfig{rules: rulesByGroup, generation: generation} - s.Unlock() - - ringHasher := fnv.New32a() - evalTime := s.computeNextEvalTime(hasher, now, userID) - workItems := []workItem{} - for group, rules := range rulesByGroup { - level.Debug(util.Logger).Log("msg", "scheduler: updating rules for user and group", "user_id", userID, "group", group, "num_rules", len(rules)) - g, err := s.groupFn(userID, group, rules) - if err != nil { - // XXX: similarly to above if a user has a working configuration and - // for some reason we cannot create a group for the new one we'll use - // the last known working configuration - level.Warn(util.Logger).Log("msg", "scheduler: failed to create group for user", "user_id", userID, "group", group, "err", err) - return - } - ringHasher.Reset() - ringHasher.Write([]byte(userID + ":" + group)) - hash := ringHasher.Sum32() - workItems = append(workItems, workItem{userID, group, hash, g, evalTime, generation}) - } - for _, i := range workItems { - totalRuleGroups.Inc() - s.addWorkItem(i) - } -} - func (s *scheduler) addWorkItem(i workItem) { select { case <-s.done: level.Debug(util.Logger).Log("msg", "scheduler: work item not added, scheduler stoped", "item", i) return default: - // The queue is keyed by userID+groupName, so items for existing userID+groupName will be replaced. + // The queue is keyed by userID+groupID, so items for existing userID+groupID will be replaced. s.q.Enqueue(i) level.Debug(util.Logger).Log("msg", "scheduler: work item added", "item", i) } @@ -302,26 +257,34 @@ func (s *scheduler) nextWorkItem() *workItem { } item := op.(workItem) level.Debug(util.Logger).Log("msg", "scheduler: work item granted", "item", item) + + // Record the latency of the items evaluation here + latency := time.Since(item.scheduled) + evalLatency.Observe(latency.Seconds()) + level.Debug(util.Logger).Log("msg", "sheduler: returning item", "item", item, "latency", latency.String()) + return &item } // workItemDone marks the given item as being ready to be rescheduled. func (s *scheduler) workItemDone(i workItem) { - s.Lock() - config, found := s.cfgs[i.userID] - var currentRules []rules.Rule - if found { - currentRules = config.rules[i.groupName] - } - s.Unlock() - if !found || len(currentRules) == 0 || i.generation < config.generation { - // Warning: this test will produce an incorrect result if the generation ever overflows - level.Debug(util.Logger).Log("msg", "scheduler: stopping item", "user_id", i.userID, "group", i.groupName, "found", found, "len", len(currentRules)) - totalRuleGroups.Dec() + select { + case <-i.done: + // Unschedule the work item + level.Debug(util.Logger).Log("msg", "scheduler: work item dropped", "item", i) return - } + default: + // If the evaluation of the item took longer than it's evaluation interval, skip to the next valid interval + // and record any evaluation misses. This must be differentiated from lateness due to scheduling which is + // caused by the overall workload, not the result of latency within a single rule group. + missed := (time.Since(i.scheduled) / s.evaluationInterval) - 1 + if missed > 0 { + level.Warn(util.Logger).Log("msg", "scheduler: work item missed evaluation", "item", i, "late_by", missed.String) + iterationsMissed.WithLabelValues(i.userID).Add(float64(missed)) + } - next := i.Defer(s.evaluationInterval) - level.Debug(util.Logger).Log("msg", "scheduler: work item rescheduled", "item", i, "time", next.scheduled.Format(timeLogFormat)) - s.addWorkItem(next) + i.scheduled = i.scheduled.Add((missed + 1) * s.evaluationInterval) + level.Debug(util.Logger).Log("msg", "scheduler: work item rescheduled", "item", i, "time", i.scheduled.Format(timeLogFormat)) + s.addWorkItem(i) + } } diff --git a/pkg/ruler/scheduler_test.go b/pkg/ruler/scheduler_test.go index adbcc2a21e7..cc4e7202541 100644 --- a/pkg/ruler/scheduler_test.go +++ b/pkg/ruler/scheduler_test.go @@ -1,13 +1,14 @@ package ruler import ( + "context" "strconv" "testing" "time" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" + "github.com/prometheus/prometheus/pkg/rulefmt" "github.com/stretchr/testify/assert" - - "github.com/prometheus/prometheus/rules" ) type fakeHasher struct { @@ -43,7 +44,7 @@ func TestSchedulerComputeNextEvalTime(t *testing.T) { // We use the fake hasher to give us control over the hash output // so that we can test the wrap-around behaviour of the modulo fakeUserID := strconv.FormatInt(hashResult, 10) - return s.computeNextEvalTime(&h, time.Unix(0, now), fakeUserID).UnixNano() + return computeNextEvalTime(&h, time.Unix(0, now), 15, fakeUserID).UnixNano() } { cycleStartTime := int64(30) @@ -70,27 +71,43 @@ func TestSchedulerComputeNextEvalTime(t *testing.T) { func TestSchedulerRulesOverlap(t *testing.T) { s := newScheduler(nil, 15, 15, nil) userID := "bob" - groupName := "test" + groupOne := "test1" + groupTwo := "test2" next := time.Now() - ruleSet := []rules.Rule{ - nil, + ruleSetsOne := []rules.RuleGroup{ + rules.NewRuleGroup(groupOne, "default", userID, []rulefmt.Rule{}), } - ruleSets := map[string][]rules.Rule{} - ruleSets[groupName] = ruleSet - cfg := userConfig{generation: 1, rules: ruleSets} - s.cfgs[userID] = cfg - w1 := workItem{userID: userID, groupName: groupName, scheduled: next, generation: cfg.generation} - s.workItemDone(w1) + ruleSetsTwo := []rules.RuleGroup{ + rules.NewRuleGroup(groupTwo, "default", userID, []rulefmt.Rule{}), + } + userChanOne := make(chan struct{}) + userChanTwo := make(chan struct{}) + + cfgOne := userConfig{rules: ruleSetsOne, done: userChanOne} + cfgTwo := userConfig{rules: ruleSetsTwo, done: userChanTwo} + + s.updateUserConfig(context.Background(), cfgOne) + w0 := workItem{userID: userID, groupID: groupOne, scheduled: next, done: userChanOne} + s.workItemDone(w0) item := s.q.Dequeue().(workItem) - assert.Equal(t, w1.generation, item.generation) + assert.Equal(t, item.groupID, groupOne) - w0 := workItem{userID: userID, groupName: groupName, scheduled: next, generation: cfg.generation - 1} - s.workItemDone(w1) + // create a new workitem for the updated ruleset + w1 := workItem{userID: userID, groupID: groupTwo, scheduled: next, done: userChanTwo} + + // Apply the new config, scheduling the previous config to be dropped + s.updateUserConfig(context.Background(), cfgTwo) + + // Reschedule the old config first, then the new config s.workItemDone(w0) + s.workItemDone(w1) + + // Ensure the old config was dropped due to the done channel being closed + // when the new user config was updated item = s.q.Dequeue().(workItem) - assert.Equal(t, w1.generation, item.generation) + assert.Equal(t, item.groupID, groupTwo) s.q.Close() assert.Equal(t, nil, s.q.Dequeue()) diff --git a/pkg/ruler/scheduling_queue.go b/pkg/ruler/scheduling_queue.go index d4c0233916d..23808d8685c 100644 --- a/pkg/ruler/scheduling_queue.go +++ b/pkg/ruler/scheduling_queue.go @@ -73,7 +73,7 @@ func (q *queueState) Enqueue(op ScheduledItem) { func (q *queueState) Dequeue() ScheduledItem { item := heap.Pop(q).(ScheduledItem) - itemEvaluationLatency.Observe(time.Now().Sub(item.Scheduled()).Seconds()) + itemEvaluationLatency.Observe(time.Since(item.Scheduled()).Seconds()) return item } diff --git a/pkg/ruler/storage.go b/pkg/ruler/storage.go new file mode 100644 index 00000000000..47f76c9824c --- /dev/null +++ b/pkg/ruler/storage.go @@ -0,0 +1,38 @@ +package ruler + +import ( + "flag" + "fmt" + + "github.com/cortexproject/cortex/pkg/configs/storage/clients/configdb" + "github.com/cortexproject/cortex/pkg/configs/storage/rules" +) + +// RuleStoreConfig conigures a rule store +type RuleStoreConfig struct { + Type string `yaml:"type"` + ConfigDB configdb.Config + + mock *mockRuleStore +} + +// RegisterFlags registers flags. +func (cfg *RuleStoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.ConfigDB.RegisterFlagsWithPrefix("ruler", f) + f.StringVar(&cfg.Type, "ruler.storage.type", "configdb", "Method to use for backend rule storage (configdb)") +} + +// NewRuleStorage returns a new rule storage backend poller and store +func NewRuleStorage(cfg RuleStoreConfig) (rules.RulePoller, rules.RuleStore, error) { + if cfg.mock != nil { + return cfg.mock, cfg.mock, nil + } + + switch cfg.Type { + case "configdb": + poller, err := configdb.New(cfg.ConfigDB) + return poller, nil, err + default: + return nil, nil, fmt.Errorf("Unrecognized rule storage mode %v, choose one of: configdb, gcs", cfg.Type) + } +} diff --git a/pkg/ruler/worker.go b/pkg/ruler/worker.go index a56eae0f707..29be0f707d1 100644 --- a/pkg/ruler/worker.go +++ b/pkg/ruler/worker.go @@ -1,30 +1,30 @@ package ruler import ( + "context" + native_ctx "context" "time" "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/instrument" + "github.com/weaveworks/common/user" ) var ( - blockedWorkers = prometheus.NewGauge(prometheus.GaugeOpts{ + blockedWorkers = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "cortex", Name: "blocked_workers", Help: "How many workers are waiting on an item to be ready.", }) - workerIdleTime = prometheus.NewCounter(prometheus.CounterOpts{ + workerIdleTime = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "cortex", Name: "worker_idle_seconds_total", Help: "How long workers have spent waiting for work.", }) - evalLatency = prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "group_evaluation_latency_seconds", - Help: "How far behind the target time each rule group executed.", - Buckets: []float64{.1, .25, .5, 1, 2.5, 5, 10, 25}, - }) ) // Worker does a thing until it's told to stop. @@ -47,21 +47,48 @@ func newWorker(ruler *Ruler) worker { func (w *worker) Run() { for { + // Grab next scheduled item from the queue + level.Debug(util.Logger).Log("msg", "waiting for next work item") waitStart := time.Now() + blockedWorkers.Inc() - level.Debug(util.Logger).Log("msg", "waiting for next work item") item := w.scheduler.nextWorkItem() blockedWorkers.Dec() - waitElapsed := time.Now().Sub(waitStart) + + waitElapsed := time.Since(waitStart) + workerIdleTime.Add(waitElapsed.Seconds()) + + // If no item is returned, worker is safe to terminate if item == nil { level.Debug(util.Logger).Log("msg", "queue closed and empty; terminating worker") return } - evalLatency.Observe(time.Since(item.scheduled).Seconds()) - workerIdleTime.Add(waitElapsed.Seconds()) - level.Debug(util.Logger).Log("msg", "processing item", "item", item) - w.ruler.Evaluate(item.userID, item) + + w.Evaluate(item.userID, item) w.scheduler.workItemDone(*item) level.Debug(util.Logger).Log("msg", "item handed back to queue", "item", item) } } + +// Evaluate a list of rules in the given context. +func (w *worker) Evaluate(userID string, item *workItem) { + ctx := user.InjectOrgID(context.Background(), userID) + logger := util.WithContext(ctx, util.Logger) + if w.ruler.cfg.EnableSharding && !w.ruler.ownsRule(item.hash) { + level.Debug(util.Logger).Log("msg", "ruler: skipping evaluation, not owned", "user_id", item.userID, "group", item.groupID) + return + } + level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(item.group.Rules())) + + err := instrument.CollectedRequest(ctx, "Evaluate", evalDuration, nil, func(ctx native_ctx.Context) error { + if span := opentracing.SpanFromContext(ctx); span != nil { + span.SetTag("instance", userID) + span.SetTag("groupID", item.groupID) + } + item.group.Eval(ctx, time.Now()) + return nil + }) + if err != nil { + level.Debug(logger).Log("msg", "failed instrumented worker evaluation", "err", err) + } +}