Skip to content

Commit 4b19e36

Browse files
committed
Update to Prometheus 2.0 YAML-based rule format
There's some trickiness here regarding what format to return from parsing, regarding the ability to track alert states, and not being able to create final rule groups yet. That's laid out in the comment above RulesConfig.Parse(). Fixes #622
1 parent 41a2a7b commit 4b19e36

File tree

6 files changed

+98
-96
lines changed

6 files changed

+98
-96
lines changed

pkg/configs/api/api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"github.com/weaveworks/common/user"
1616
"github.com/weaveworks/cortex/pkg/configs"
17-
configs_client "github.com/weaveworks/cortex/pkg/configs/client"
1817
"github.com/weaveworks/cortex/pkg/configs/db"
1918
"github.com/weaveworks/cortex/pkg/util"
2019
)
@@ -177,7 +176,8 @@ func validateAlertmanagerConfig(cfg string) error {
177176
}
178177

179178
func validateRulesFiles(c configs.Config) error {
180-
_, err := configs_client.RulesFromConfig(c)
179+
rc := configs.RulesConfig(c.RulesFiles)
180+
_, err := rc.Parse()
181181
return err
182182
}
183183

pkg/configs/client/configs.go

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010

1111
"github.com/go-kit/kit/log/level"
1212
"github.com/prometheus/alertmanager/config"
13-
"github.com/prometheus/prometheus/promql"
14-
"github.com/prometheus/prometheus/rules"
1513
"github.com/weaveworks/cortex/pkg/configs"
1614
"github.com/weaveworks/cortex/pkg/util"
1715
)
@@ -46,36 +44,6 @@ func (c ConfigsResponse) GetLatestConfigID() configs.ID {
4644
return latest
4745
}
4846

49-
// RulesFromConfig gets the rules from the Cortex configuration.
50-
//
51-
// Strongly inspired by `loadGroups` in Prometheus.
52-
func RulesFromConfig(c configs.Config) ([]rules.Rule, error) {
53-
result := []rules.Rule{}
54-
for fn, content := range c.RulesFiles {
55-
stmts, err := promql.ParseStmts(content)
56-
if err != nil {
57-
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
58-
}
59-
60-
for _, stmt := range stmts {
61-
var rule rules.Rule
62-
63-
switch r := stmt.(type) {
64-
case *promql.AlertStmt:
65-
rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger)
66-
67-
case *promql.RecordStmt:
68-
rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels)
69-
70-
default:
71-
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
72-
}
73-
result = append(result, rule)
74-
}
75-
}
76-
return result, nil
77-
}
78-
7947
// AlertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration.
8048
func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) {
8149
cfg, err := config.Load(c.AlertmanagerConfig)

pkg/configs/configs.go

Lines changed: 47 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"fmt"
55
"time"
66

7+
"github.com/go-kit/kit/log"
8+
"github.com/prometheus/prometheus/pkg/labels"
9+
"github.com/prometheus/prometheus/pkg/rulefmt"
710
"github.com/prometheus/prometheus/promql"
811
"github.com/prometheus/prometheus/rules"
912
"github.com/weaveworks/cortex/pkg/util"
@@ -63,34 +66,58 @@ func (c RulesConfig) Equal(o RulesConfig) bool {
6366
return true
6467
}
6568

66-
// Parse rules from the Cortex configuration.
69+
// Parse parses and validates the content of the rule files in a RulesConfig.
6770
//
68-
// Strongly inspired by `loadGroups` in Prometheus.
69-
func (c RulesConfig) Parse() ([]rules.Rule, error) {
70-
result := []rules.Rule{}
71+
// NOTE: On one hand, we cannot return fully-fledged lists of rules.Group
72+
// here yet, as creating a rules.Group requires already
73+
// passing in rules.ManagerOptions options (which in turn require a
74+
// notifier, appender, etc.), which we do not want to create simply
75+
// for parsing. On the other hand, we should not return barebones
76+
// rulefmt.RuleGroup sets here either, as only a fully-converted rules.Rule
77+
// is able to track alert states over multiple rule evaluations. The caller
78+
// would otherwise have to ensure to convert the rulefmt.RuleGroup only exactly
79+
// once, not for every evaluation (or risk losing alert pending states). So
80+
// it's probably better to just return a set of rules.Rule here.
81+
func (c RulesConfig) Parse() (map[string][]rules.Rule, error) {
82+
groups := map[string][]rules.Rule{}
83+
7184
for fn, content := range c {
72-
stmts, err := promql.ParseStmts(content)
73-
if err != nil {
74-
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
85+
rgs, errs := rulefmt.Parse([]byte(content))
86+
if errs != nil {
87+
return nil, fmt.Errorf("error parsing %s: %v", fn, errs)
7588
}
7689

77-
for _, stmt := range stmts {
78-
var rule rules.Rule
79-
80-
switch r := stmt.(type) {
81-
case *promql.AlertStmt:
82-
rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger)
83-
84-
case *promql.RecordStmt:
85-
rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels)
90+
for _, rg := range rgs.Groups {
91+
rls := make([]rules.Rule, 0, len(rg.Rules))
92+
for _, rl := range rg.Rules {
93+
expr, err := promql.ParseExpr(rl.Expr)
94+
if err != nil {
95+
return nil, err
96+
}
8697

87-
default:
88-
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
98+
if rl.Alert != "" {
99+
rls = append(rls, rules.NewAlertingRule(
100+
rl.Alert,
101+
expr,
102+
time.Duration(rl.For),
103+
labels.FromMap(rl.Labels),
104+
labels.FromMap(rl.Annotations),
105+
log.With(util.Logger, "alert", rl.Alert),
106+
))
107+
continue
108+
}
109+
rls = append(rls, rules.NewRecordingRule(
110+
rl.Record,
111+
expr,
112+
labels.FromMap(rl.Labels),
113+
))
89114
}
90-
result = append(result, rule)
115+
116+
groups[rg.Name] = rls
91117
}
92118
}
93-
return result, nil
119+
120+
return groups, nil
94121
}
95122

96123
// VersionedRulesConfig is a RulesConfig together with a version.

pkg/ruler/api_test.go

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,20 @@ func makeRulerConfig() configs.RulesConfig {
7979
return configs.RulesConfig(map[string]string{
8080
"filename.rules": makeString(`
8181
# Config no. %d.
82-
ALERT ScrapeFailed
83-
IF up != 1
84-
FOR 10m
85-
LABELS { severity="warning" }
86-
ANNOTATIONS {
87-
summary = "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed.",
88-
description = "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod.",
89-
impact = "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues.",
90-
dashboardURL = "$${base_url}/admin/prometheus/targets",
91-
}
92-
`),
82+
groups:
83+
- name: example
84+
rules:
85+
- alert: ScrapeFailed
86+
expr: 'up != 1'
87+
for: 10m
88+
labels:
89+
severity: warning
90+
annotations:
91+
summary: "Scrape of {{$labels.job}} (pod: {{$labels.instance}}) failed."
92+
description: "Prometheus cannot reach the /metrics page on the {{$labels.instance}} pod."
93+
impact: "We have no monitoring data for {{$labels.job}} - {{$labels.instance}}. At worst, it's completely down. At best, we cannot reliably respond to operational issues."
94+
dashboardURL: "$${base_url}/admin/prometheus/targets"
95+
`),
9396
})
9497
}
9598

pkg/ruler/ruler.go

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func buildNotifierConfig(rulerConfig *Config) (*config.Config, error) {
261261
return promConfig, nil
262262
}
263263

264-
func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) (*rules.Group, error) {
264+
func (r *Ruler) newGroup(ctx context.Context, rls []rules.Rule) (*rules.Group, error) {
265265
appendable := &appendableAppender{pusher: r.pusher, ctx: ctx}
266266
userID, err := user.ExtractOrgID(ctx)
267267
if err != nil {
@@ -280,8 +280,8 @@ func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) (*rules.Group, er
280280
Logger: gklog.NewNopLogger(),
281281
Registerer: prometheus.DefaultRegisterer,
282282
}
283-
delay := 0 * time.Second // Unused, so 0 value is fine.
284-
return rules.NewGroup("default", "none", delay, rs, opts), nil
283+
interval := 0 * time.Second // Unused, so 0 value is fine.
284+
return rules.NewGroup("default", "none", interval, rls, opts), nil
285285
}
286286

287287
// sendAlerts implements a rules.NotifyFunc for a Notifier.
@@ -352,28 +352,31 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) {
352352
}
353353

354354
// Evaluate a list of rules in the given context.
355-
func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
355+
func (r *Ruler) Evaluate(ctx context.Context, rgs map[string][]rules.Rule) {
356356
logger := util.WithContext(ctx, util.Logger)
357-
level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(rs))
357+
level.Debug(logger).Log("msg", "evaluating rule groups...", "num_groups", len(rgs))
358358
start := time.Now()
359-
ctx, cancelTimeout := context.WithTimeout(ctx, r.groupTimeout)
360-
g, err := r.newGroup(ctx, rs)
361-
if err != nil {
362-
level.Error(logger).Log("msg", "failed to create rule group", "err", err)
363-
return
364-
}
365-
g.Eval(ctx, start)
366-
if err := ctx.Err(); err == nil {
367-
cancelTimeout() // release resources
368-
} else {
369-
level.Warn(util.Logger).Log("msg", "context error", "error", err)
359+
360+
for name, rg := range rgs {
361+
ctx, cancelTimeout := context.WithTimeout(ctx, r.groupTimeout)
362+
g, err := r.newGroup(ctx, rg)
363+
if err != nil {
364+
level.Error(logger).Log("msg", "error creating rule group", "group_name", name, "err", err)
365+
}
366+
g.Eval(ctx, start)
367+
if err := ctx.Err(); err == nil {
368+
cancelTimeout() // release resources
369+
} else {
370+
level.Warn(util.Logger).Log("msg", "context error", "error", err)
371+
}
372+
373+
rulesProcessed.Add(float64(len(g.Rules())))
370374
}
371375

372376
// The prometheus routines we're calling have their own instrumentation
373377
// but, a) it's rule-based, not group-based, b) it's a summary, not a
374378
// histogram, so we can't reliably aggregate.
375379
evalDuration.Observe(time.Since(start).Seconds())
376-
rulesProcessed.Add(float64(len(rs)))
377380
}
378381

379382
// Stop stops the Ruler.
@@ -470,7 +473,7 @@ func (w *worker) Run() {
470473
evalLatency.Observe(time.Since(item.scheduled).Seconds())
471474
level.Debug(util.Logger).Log("msg", "processing item", "item", item)
472475
ctx := user.InjectOrgID(context.Background(), item.userID)
473-
w.ruler.Evaluate(ctx, item.rules)
476+
w.ruler.Evaluate(ctx, item.ruleGroups)
474477
w.scheduler.workItemDone(*item)
475478
level.Debug(util.Logger).Log("msg", "item handed back to queue", "item", item)
476479
}

pkg/ruler/scheduler.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,12 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/prometheus/prometheus/rules"
10+
911
"github.com/go-kit/kit/log/level"
1012
"github.com/jonboulle/clockwork"
1113

1214
"github.com/prometheus/client_golang/prometheus"
13-
"github.com/prometheus/prometheus/rules"
1415

1516
"github.com/weaveworks/common/instrument"
1617
"github.com/weaveworks/cortex/pkg/configs"
@@ -53,9 +54,9 @@ func init() {
5354
}
5455

5556
type workItem struct {
56-
userID string
57-
rules []rules.Rule
58-
scheduled time.Time
57+
userID string
58+
ruleGroups map[string][]rules.Rule
59+
scheduled time.Time
5960
}
6061

6162
// Key implements ScheduledItem
@@ -69,12 +70,12 @@ func (w workItem) Scheduled() time.Time {
6970
}
7071

7172
// Defer returns a work item with updated rules, rescheduled to a later time.
72-
func (w workItem) Defer(interval time.Duration, currentRules []rules.Rule) workItem {
73+
func (w workItem) Defer(interval time.Duration, currentRules map[string][]rules.Rule) workItem {
7374
return workItem{w.userID, currentRules, w.scheduled.Add(interval)}
7475
}
7576

7677
func (w workItem) String() string {
77-
return fmt.Sprintf("%s:%d@%s", w.userID, len(w.rules), w.scheduled.Format(timeLogFormat))
78+
return fmt.Sprintf("%s:%d@%s", w.userID, len(w.ruleGroups), w.scheduled.Format(timeLogFormat))
7879
}
7980

8081
type scheduler struct {
@@ -84,8 +85,8 @@ type scheduler struct {
8485

8586
pollInterval time.Duration // how often we check for new config
8687

87-
cfgs map[string][]rules.Rule // all rules for all users
88-
latestConfig configs.ID // # of last update received from config
88+
cfgs map[string]map[string][]rules.Rule // all rules for all users
89+
latestConfig configs.ID // # of last update received from config
8990
sync.RWMutex
9091

9192
stop chan struct{}
@@ -99,7 +100,7 @@ func newScheduler(rulesAPI RulesAPI, evaluationInterval, pollInterval time.Durat
99100
evaluationInterval: evaluationInterval,
100101
pollInterval: pollInterval,
101102
q: NewSchedulingQueue(clockwork.NewRealClock()),
102-
cfgs: map[string][]rules.Rule{},
103+
cfgs: map[string]map[string][]rules.Rule{},
103104

104105
stop: make(chan struct{}),
105106
done: make(chan struct{}),
@@ -183,7 +184,7 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.Version
183184
// TODO: instrument how many configs we have, both valid & invalid.
184185
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
185186
for userID, config := range cfgs {
186-
rules, err := config.Config.Parse()
187+
ruleGroups, err := config.Config.Parse()
187188
if err != nil {
188189
// XXX: This means that if a user has a working configuration and
189190
// they submit a broken one, we'll keep processing the last known
@@ -192,17 +193,17 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.Version
192193
level.Warn(util.Logger).Log("msg", "scheduler: invalid Cortex configuration", "user_id", userID, "err", err)
193194
continue
194195
}
195-
level.Info(util.Logger).Log("msg", "scheduler: updating rules for user", "user_id", userID, "num_rules", len(rules), "is_deleted", config.IsDeleted())
196+
level.Info(util.Logger).Log("msg", "scheduler: updating rules for user", "user_id", userID, "num_groups", len(ruleGroups), "is_deleted", config.IsDeleted())
196197
s.Lock()
197198
// if deleted remove from map, otherwise - update map
198199
if config.IsDeleted() {
199200
delete(s.cfgs, userID)
200201
} else {
201-
s.cfgs[userID] = rules
202+
s.cfgs[userID] = ruleGroups
202203
}
203204
s.Unlock()
204205
if !config.IsDeleted() {
205-
s.addWorkItem(workItem{userID, rules, now})
206+
s.addWorkItem(workItem{userID, ruleGroups, now})
206207
}
207208
}
208209
configUpdates.Add(float64(len(cfgs)))

0 commit comments

Comments
 (0)