Skip to content

Commit 82b9ee8

Browse files
committed
Use rules DB directly from ruler
Will fall back to using configs service if no DB information supplied.
1 parent 029f17c commit 82b9ee8

File tree

8 files changed

+187
-35
lines changed

8 files changed

+187
-35
lines changed

cmd/lite/main.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
chunkStoreConfig chunk.StoreConfig
4343
distributorConfig distributor.Config
4444
ingesterConfig ingester.Config
45+
configStoreConfig ruler.ConfigStoreConfig
4546
rulerConfig ruler.Config
4647
schemaConfig chunk.SchemaConfig
4748
storageConfig storage.Config
@@ -52,7 +53,7 @@ func main() {
5253
// Ingester needs to know our gRPC listen port.
5354
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
5455
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
55-
&ingesterConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
56+
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
5657
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
5758
flag.Parse()
5859
schemaConfig.MaxChunkAge = ingesterConfig.MaxChunkAge
@@ -122,15 +123,20 @@ func main() {
122123
tableManager.Start()
123124
defer tableManager.Stop()
124125

125-
if rulerConfig.ConfigsAPIURL.String() != "" {
126+
if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
127+
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
128+
if err != nil {
129+
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
130+
os.Exit(1)
131+
}
126132
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
127133
if err != nil {
128134
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
129135
os.Exit(1)
130136
}
131137
defer rlr.Stop()
132138

133-
rulerServer, err := ruler.NewServer(rulerConfig, rlr)
139+
rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
134140
if err != nil {
135141
level.Error(util.Logger).Log("msg", "error initializing ruler server", "err", err)
136142
os.Exit(1)

cmd/ruler/main.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ func main() {
3232
chunkStoreConfig chunk.StoreConfig
3333
schemaConfig chunk.SchemaConfig
3434
storageConfig storage.Config
35+
configStoreConfig ruler.ConfigStoreConfig
3536
logLevel util.LogLevel
3637
)
3738
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
38-
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &logLevel)
39+
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
3940
flag.Parse()
4041

4142
util.InitLogger(logLevel.AllowedLevel)
@@ -75,7 +76,13 @@ func main() {
7576
}
7677
defer rlr.Stop()
7778

78-
rulerServer, err := ruler.NewServer(rulerConfig, rlr)
79+
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
80+
if err != nil {
81+
level.Error(util.Logger).Log("msg", "error initializing rules API", "err", err)
82+
os.Exit(1)
83+
}
84+
85+
rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
7986
if err != nil {
8087
level.Error(util.Logger).Log("msg", "error initializing ruler server: %v", err)
8188
os.Exit(1)

pkg/configs/client/configs.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) {
8585
return cfg, nil
8686
}
8787

88-
func getConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
88+
// GetConfigs gets configurations from the configs server.
89+
func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
8990
req, err := http.NewRequest("GET", endpoint, nil)
9091
if err != nil {
9192
return nil, err
@@ -120,7 +121,7 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse,
120121
suffix = fmt.Sprintf("?since=%d", since)
121122
}
122123
endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix)
123-
return getConfigs(endpoint, c.Timeout, since)
124+
return GetConfigs(endpoint, c.Timeout, since)
124125
}
125126

126127
// RulesAPI allows retrieving recording and alerting rules.
@@ -137,5 +138,5 @@ func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) {
137138
suffix = fmt.Sprintf("?since=%d", since)
138139
}
139140
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
140-
return getConfigs(endpoint, c.Timeout, since)
141+
return GetConfigs(endpoint, c.Timeout, since)
141142
}

pkg/configs/configs.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package configs
22

3+
import (
4+
"fmt"
5+
6+
"github.com/prometheus/prometheus/promql"
7+
"github.com/prometheus/prometheus/rules"
8+
"github.com/weaveworks/cortex/pkg/util"
9+
)
10+
311
// An ID is the ID of a single users's Cortex configuration. When a
412
// configuration changes, it gets a new ID.
513
type ID int
@@ -33,6 +41,36 @@ func (v View) GetVersionedRulesConfig() VersionedRulesConfig {
3341
// RulesConfig are the set of rules files for a particular organization.
3442
type RulesConfig map[string]string
3543

44+
// Parse rules from the Cortex configuration.
45+
//
46+
// Strongly inspired by `loadGroups` in Prometheus.
47+
func (c RulesConfig) Parse() ([]rules.Rule, error) {
48+
result := []rules.Rule{}
49+
for fn, content := range c {
50+
stmts, err := promql.ParseStmts(content)
51+
if err != nil {
52+
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
53+
}
54+
55+
for _, stmt := range stmts {
56+
var rule rules.Rule
57+
58+
switch r := stmt.(type) {
59+
case *promql.AlertStmt:
60+
rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger)
61+
62+
case *promql.RecordStmt:
63+
rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels)
64+
65+
default:
66+
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
67+
}
68+
result = append(result, rule)
69+
}
70+
}
71+
return result, nil
72+
}
73+
3674
// VersionedRulesConfig is a RulesConfig together with a version.
3775
// `data Versioned a = Versioned { id :: ID , config :: a }`
3876
type VersionedRulesConfig struct {

pkg/configs/db/db.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,12 @@ func New(cfg Config) (DB, error) {
6969
}
7070
return traced{timed{d}}, nil
7171
}
72+
73+
// NewRulesDB creates a new rules config database.
74+
func NewRulesDB(cfg Config) (RulesDB, error) {
75+
db, err := New(cfg)
76+
if err != nil {
77+
return nil, err
78+
}
79+
return db, err
80+
}

pkg/ruler/configs.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package ruler
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"net/url"
7+
"time"
8+
9+
"github.com/weaveworks/cortex/pkg/configs"
10+
configs_client "github.com/weaveworks/cortex/pkg/configs/client"
11+
"github.com/weaveworks/cortex/pkg/configs/db"
12+
"github.com/weaveworks/cortex/pkg/util"
13+
)
14+
15+
// ConfigStoreConfig says where we can find the ruler configs.
16+
type ConfigStoreConfig struct {
17+
DBConfig db.Config
18+
19+
// DEPRECATED
20+
ConfigsAPIURL util.URLValue
21+
22+
// DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud
23+
// configs service.
24+
ClientTimeout time.Duration
25+
}
26+
27+
// RegisterFlags adds the flags required to config this to the given FlagSet
28+
func (cfg *ConfigStoreConfig) RegisterFlags(f *flag.FlagSet) {
29+
cfg.DBConfig.RegisterFlags(f)
30+
f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.")
31+
f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.")
32+
}
33+
34+
// RulesAPI is what the ruler needs from a config store to process rules.
35+
type RulesAPI interface {
36+
// GetConfigs returns all Cortex configurations from a configs API server
37+
// that have been updated after the given configs.ID was last updated.
38+
GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error)
39+
}
40+
41+
// NewRulesAPI creates a new RulesAPI.
42+
func NewRulesAPI(cfg ConfigStoreConfig) (RulesAPI, error) {
43+
// All of this falderal is to allow for a smooth transition away from
44+
// using the configs server and toward directly connecting to the database.
45+
// See https://github.com/weaveworks/cortex/issues/619
46+
if cfg.DBConfig.URI == "" {
47+
return configsClient{
48+
URL: cfg.ConfigsAPIURL.URL,
49+
Timeout: cfg.ClientTimeout,
50+
}, nil
51+
}
52+
db, err := db.NewRulesDB(cfg.DBConfig)
53+
if err != nil {
54+
return nil, err
55+
}
56+
return dbStore{db: db}, nil
57+
}
58+
59+
// configsClient allows retrieving recording and alerting rules from the configs server.
60+
type configsClient struct {
61+
URL *url.URL
62+
Timeout time.Duration
63+
}
64+
65+
// GetConfigs implements RulesAPI.
66+
func (c configsClient) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
67+
suffix := ""
68+
if since != 0 {
69+
suffix = fmt.Sprintf("?since=%d", since)
70+
}
71+
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
72+
response, err := configs_client.GetConfigs(endpoint, c.Timeout, since)
73+
if err != nil {
74+
return nil, err
75+
}
76+
configs := map[string]configs.VersionedRulesConfig{}
77+
for id, view := range response.Configs {
78+
configs[id] = view.GetVersionedRulesConfig()
79+
}
80+
return configs, nil
81+
}
82+
83+
type dbStore struct {
84+
db db.RulesDB
85+
}
86+
87+
// GetConfigs implements RulesAPI.
88+
func (d dbStore) GetConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
89+
if since == 0 {
90+
return d.db.GetAllRulesConfigs()
91+
}
92+
return d.db.GetRulesConfigs(since)
93+
}
94+
95+
// getLatestConfigID gets the latest configs ID.
96+
// max [latest, max (map getID cfgs)]
97+
func getLatestConfigID(cfgs map[string]configs.VersionedRulesConfig, latest configs.ID) configs.ID {
98+
ret := latest
99+
for _, config := range cfgs {
100+
if config.ID > ret {
101+
ret = config.ID
102+
}
103+
}
104+
return ret
105+
}

pkg/ruler/ruler.go

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222

2323
"github.com/weaveworks/common/user"
2424
"github.com/weaveworks/cortex/pkg/chunk"
25-
configs "github.com/weaveworks/cortex/pkg/configs/client"
2625
"github.com/weaveworks/cortex/pkg/distributor"
2726
"github.com/weaveworks/cortex/pkg/querier"
2827
"github.com/weaveworks/cortex/pkg/util"
@@ -54,12 +53,6 @@ func init() {
5453

5554
// Config is the configuration for the recording rules server.
5655
type Config struct {
57-
ConfigsAPIURL util.URLValue
58-
59-
// HTTP timeout duration for requests made to the Weave Cloud configs
60-
// service.
61-
ClientTimeout time.Duration
62-
6356
// This is used for template expansion in alerts; must be a valid URL
6457
ExternalURL util.URLValue
6558

@@ -84,10 +77,8 @@ type Config struct {
8477
// RegisterFlags adds the flags required to config this to the given FlagSet
8578
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8679
cfg.ExternalURL.URL, _ = url.Parse("") // Must be non-nil
87-
f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "URL of configs API server.")
8880
f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.")
8981
f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules")
90-
f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.")
9182
f.IntVar(&cfg.NumWorkers, "ruler.num-workers", 1, "Number of rule evaluator worker routines in this process")
9283
f.Var(&cfg.AlertmanagerURL, "ruler.alertmanager-url", "URL of the Alertmanager to send notifications to.")
9384
f.BoolVar(&cfg.AlertmanagerDiscovery, "ruler.alertmanager-discovery", false, "Use DNS SRV records to discover alertmanager hosts.")
@@ -280,13 +271,9 @@ type Server struct {
280271
}
281272

282273
// NewServer makes a new rule processing server.
283-
func NewServer(cfg Config, ruler *Ruler) (*Server, error) {
284-
c := configs.RulesAPI{
285-
URL: cfg.ConfigsAPIURL.URL,
286-
Timeout: cfg.ClientTimeout,
287-
}
274+
func NewServer(cfg Config, ruler *Ruler, rulesAPI RulesAPI) (*Server, error) {
288275
// TODO: Separate configuration for polling interval.
289-
s := newScheduler(c, cfg.EvaluationInterval, cfg.EvaluationInterval)
276+
s := newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval)
290277
if cfg.NumWorkers <= 0 {
291278
return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers)
292279
}

pkg/ruler/scheduler.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414

1515
"github.com/weaveworks/common/instrument"
1616
"github.com/weaveworks/cortex/pkg/configs"
17-
configs_client "github.com/weaveworks/cortex/pkg/configs/client"
1817
"github.com/weaveworks/cortex/pkg/util"
1918
)
2019

@@ -79,7 +78,7 @@ func (w workItem) String() string {
7978
}
8079

8180
type scheduler struct {
82-
configsAPI configs_client.RulesAPI
81+
rulesAPI RulesAPI
8382
evaluationInterval time.Duration // how often we re-evaluate each rule set
8483
q *SchedulingQueue
8584

@@ -94,9 +93,9 @@ type scheduler struct {
9493
}
9594

9695
// newScheduler makes a new scheduler.
97-
func newScheduler(configsAPI configs_client.RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler {
96+
func newScheduler(rulesAPI RulesAPI, evaluationInterval, pollInterval time.Duration) scheduler {
9897
return scheduler{
99-
configsAPI: configsAPI,
98+
rulesAPI: rulesAPI,
10099
evaluationInterval: evaluationInterval,
101100
pollInterval: pollInterval,
102101
q: NewSchedulingQueue(clockwork.NewRealClock()),
@@ -137,7 +136,7 @@ func (s *scheduler) Stop() {
137136

138137
// Load the full set of configurations from the server, retrying with backoff
139138
// until we can get them.
140-
func (s *scheduler) loadAllConfigs() map[string]configs.View {
139+
func (s *scheduler) loadAllConfigs() map[string]configs.VersionedRulesConfig {
141140
backoff := util.NewBackoff(context.Background(), backoffConfig)
142141
for {
143142
cfgs, err := s.poll()
@@ -160,31 +159,31 @@ func (s *scheduler) updateConfigs(now time.Time) error {
160159
}
161160

162161
// poll the configuration server. Not re-entrant.
163-
func (s *scheduler) poll() (map[string]configs.View, error) {
162+
func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) {
164163
s.Lock()
165164
configID := s.latestConfig
166165
s.Unlock()
167-
var cfgs *configs_client.ConfigsResponse
166+
var cfgs map[string]configs.VersionedRulesConfig
168167
err := instrument.TimeRequestHistogram(context.Background(), "Configs.GetConfigs", configsRequestDuration, func(_ context.Context) error {
169168
var err error
170-
cfgs, err = s.configsAPI.GetConfigs(configID)
169+
cfgs, err = s.rulesAPI.GetConfigs(configID)
171170
return err
172171
})
173172
if err != nil {
174173
level.Warn(util.Logger).Log("msg", "scheduler: configs server poll failed", "err", err)
175174
return nil, err
176175
}
177176
s.Lock()
178-
s.latestConfig = cfgs.GetLatestConfigID()
177+
s.latestConfig = getLatestConfigID(cfgs, configID)
179178
s.Unlock()
180-
return cfgs.Configs, nil
179+
return cfgs, nil
181180
}
182181

183-
func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.View) {
182+
func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) {
184183
// TODO: instrument how many configs we have, both valid & invalid.
185184
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
186185
for userID, config := range cfgs {
187-
rules, err := configs_client.RulesFromConfig(config.Config)
186+
rules, err := config.Config.Parse()
188187
if err != nil {
189188
// XXX: This means that if a user has a working configuration and
190189
// they submit a broken one, we'll keep processing the last known

0 commit comments

Comments
 (0)