Skip to content

Commit cb84080

Browse files
committed
Revert "Revert "New ruler endpoints (#620)" (#648)"
This reverts commit 860d2fb.
1 parent 860d2fb commit cb84080

File tree

15 files changed

+952
-77
lines changed

15 files changed

+952
-77
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ shell: build-image/$(UPTODATE)
106106
bash
107107

108108
configs-integration-test:
109-
/bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/..."
109+
/bin/bash -c "go test -tags 'netgo integration' -timeout 30s ./pkg/configs/... ./pkg/ruler/..."
110110

111111
endif
112112

cmd/lite/main.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func main() {
4242
chunkStoreConfig chunk.StoreConfig
4343
distributorConfig distributor.Config
4444
ingesterConfig ingester.Config
45+
configStoreConfig ruler.ConfigStoreConfig
4546
rulerConfig ruler.Config
4647
schemaConfig chunk.SchemaConfig
4748
storageConfig storage.Config
@@ -52,7 +53,7 @@ func main() {
5253
// Ingester needs to know our gRPC listen port.
5354
ingesterConfig.ListenPort = &serverConfig.GRPCListenPort
5455
util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig,
55-
&ingesterConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
56+
&ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &logLevel)
5657
flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.")
5758
flag.Parse()
5859
schemaConfig.MaxChunkAge = ingesterConfig.MaxChunkAge
@@ -122,15 +123,20 @@ func main() {
122123
tableManager.Start()
123124
defer tableManager.Stop()
124125

125-
if rulerConfig.ConfigsAPIURL.String() != "" {
126+
if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
127+
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
128+
if err != nil {
129+
level.Error(util.Logger).Log("msg", "error initializing ruler config store", "err", err)
130+
os.Exit(1)
131+
}
126132
rlr, err := ruler.NewRuler(rulerConfig, dist, chunkStore)
127133
if err != nil {
128134
level.Error(util.Logger).Log("msg", "error initializing ruler", "err", err)
129135
os.Exit(1)
130136
}
131137
defer rlr.Stop()
132138

133-
rulerServer, err := ruler.NewServer(rulerConfig, rlr)
139+
rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
134140
if err != nil {
135141
level.Error(util.Logger).Log("msg", "error initializing ruler server", "err", err)
136142
os.Exit(1)
@@ -163,6 +169,18 @@ func main() {
163169
})
164170
}
165171

172+
// Only serve the API for setting & getting rules configs if the database
173+
// was provided. Allows for smoother migration. See
174+
// https://github.com/weaveworks/cortex/issues/619
175+
if configStoreConfig.DBConfig.URI != "" {
176+
a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig)
177+
if err != nil {
178+
level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err)
179+
os.Exit(1)
180+
}
181+
a.RegisterRoutes(server.HTTP)
182+
}
183+
166184
subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
167185
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
168186
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(sampleQueryable.RemoteReadHandler)))

cmd/ruler/main.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ func main() {
3232
chunkStoreConfig chunk.StoreConfig
3333
schemaConfig chunk.SchemaConfig
3434
storageConfig storage.Config
35+
configStoreConfig ruler.ConfigStoreConfig
3536
logLevel util.LogLevel
3637
)
3738
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
38-
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &logLevel)
39+
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
3940
flag.Parse()
4041

4142
util.InitLogger(logLevel.AllowedLevel)
@@ -75,7 +76,13 @@ func main() {
7576
}
7677
defer rlr.Stop()
7778

78-
rulerServer, err := ruler.NewServer(rulerConfig, rlr)
79+
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
80+
if err != nil {
81+
level.Error(util.Logger).Log("msg", "error initializing rules API", "err", err)
82+
os.Exit(1)
83+
}
84+
85+
rulerServer, err := ruler.NewServer(rulerConfig, rlr, rulesAPI)
7986
if err != nil {
8087
level.Error(util.Logger).Log("msg", "error initializing ruler server: %v", err)
8188
os.Exit(1)
@@ -89,6 +96,18 @@ func main() {
8996
}
9097
defer server.Shutdown()
9198

99+
// Only serve the API for setting & getting rules configs if the database
100+
// was provided. Allows for smoother migration. See
101+
// https://github.com/weaveworks/cortex/issues/619
102+
if configStoreConfig.DBConfig.URI != "" {
103+
a, err := ruler.NewAPIFromConfig(configStoreConfig.DBConfig)
104+
if err != nil {
105+
level.Error(util.Logger).Log("msg", "error initializing public rules API", "err", err)
106+
os.Exit(1)
107+
}
108+
a.RegisterRoutes(server.HTTP)
109+
}
110+
92111
server.HTTP.Handle("/ring", r)
93112
server.Run()
94113
}

pkg/configs/client/configs.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func AlertmanagerConfigFromConfig(c configs.Config) (*config.Config, error) {
8585
return cfg, nil
8686
}
8787

88-
func getConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
88+
// GetConfigs gets configurations from the configs server.
89+
func GetConfigs(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
8990
req, err := http.NewRequest("GET", endpoint, nil)
9091
if err != nil {
9192
return nil, err
@@ -120,22 +121,5 @@ func (c *AlertManagerConfigsAPI) GetConfigs(since configs.ID) (*ConfigsResponse,
120121
suffix = fmt.Sprintf("?since=%d", since)
121122
}
122123
endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix)
123-
return getConfigs(endpoint, c.Timeout, since)
124-
}
125-
126-
// RulesAPI allows retrieving recording and alerting rules.
127-
type RulesAPI struct {
128-
URL *url.URL
129-
Timeout time.Duration
130-
}
131-
132-
// GetConfigs returns all Cortex configurations from a configs API server
133-
// that have been updated after the given configs.ID was last updated.
134-
func (c *RulesAPI) GetConfigs(since configs.ID) (*ConfigsResponse, error) {
135-
suffix := ""
136-
if since != 0 {
137-
suffix = fmt.Sprintf("?since=%d", since)
138-
}
139-
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
140-
return getConfigs(endpoint, c.Timeout, since)
124+
return GetConfigs(endpoint, c.Timeout, since)
141125
}

pkg/configs/configs.go

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
package configs
22

3+
import (
4+
"fmt"
5+
6+
"github.com/prometheus/prometheus/promql"
7+
"github.com/prometheus/prometheus/rules"
8+
"github.com/weaveworks/cortex/pkg/util"
9+
)
10+
311
// An ID is the ID of a single users's Cortex configuration. When a
412
// configuration changes, it gets a new ID.
513
type ID int
614

715
// A Config is a Cortex configuration for a single user.
816
type Config struct {
917
// RulesFiles maps from a rules filename to file contents.
10-
RulesFiles map[string]string `json:"rules_files"`
11-
AlertmanagerConfig string `json:"alertmanager_config"`
18+
RulesFiles RulesConfig `json:"rules_files"`
19+
AlertmanagerConfig string `json:"alertmanager_config"`
1220
}
1321

1422
// View is what's returned from the Weave Cloud configs service
@@ -21,3 +29,70 @@ type View struct {
2129
ID ID `json:"id"`
2230
Config Config `json:"config"`
2331
}
32+
33+
// GetVersionedRulesConfig specializes the view to just the rules config.
34+
func (v View) GetVersionedRulesConfig() *VersionedRulesConfig {
35+
if v.Config.RulesFiles == nil {
36+
return nil
37+
}
38+
return &VersionedRulesConfig{
39+
ID: v.ID,
40+
Config: v.Config.RulesFiles,
41+
}
42+
}
43+
44+
// RulesConfig are the set of rules files for a particular organization.
45+
type RulesConfig map[string]string
46+
47+
// Equal compares two RulesConfigs for equality.
48+
//
49+
// instance Eq RulesConfig
50+
func (c RulesConfig) Equal(o RulesConfig) bool {
51+
if len(o) != len(c) {
52+
return false
53+
}
54+
for k, v1 := range c {
55+
v2, ok := o[k]
56+
if !ok || v1 != v2 {
57+
return false
58+
}
59+
}
60+
return true
61+
}
62+
63+
// Parse rules from the Cortex configuration.
64+
//
65+
// Strongly inspired by `loadGroups` in Prometheus.
66+
func (c RulesConfig) Parse() ([]rules.Rule, error) {
67+
result := []rules.Rule{}
68+
for fn, content := range c {
69+
stmts, err := promql.ParseStmts(content)
70+
if err != nil {
71+
return nil, fmt.Errorf("error parsing %s: %s", fn, err)
72+
}
73+
74+
for _, stmt := range stmts {
75+
var rule rules.Rule
76+
77+
switch r := stmt.(type) {
78+
case *promql.AlertStmt:
79+
rule = rules.NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, util.Logger)
80+
81+
case *promql.RecordStmt:
82+
rule = rules.NewRecordingRule(r.Name, r.Expr, r.Labels)
83+
84+
default:
85+
return nil, fmt.Errorf("ruler.GetRules: unknown statement type")
86+
}
87+
result = append(result, rule)
88+
}
89+
}
90+
return result, nil
91+
}
92+
93+
// VersionedRulesConfig is a RulesConfig together with a version.
94+
// `data Versioned a = Versioned { id :: ID , config :: a }`
95+
type VersionedRulesConfig struct {
96+
ID ID `json:"id"`
97+
Config RulesConfig `json:"config"`
98+
}

pkg/configs/db/db.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,26 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
2222
flag.StringVar(&cfg.MigrationsDir, "database.migrations", "", "Path where the database migration files can be found")
2323
}
2424

25+
// RulesDB has ruler-specific DB interfaces.
26+
type RulesDB interface {
27+
// GetRulesConfig gets the user's ruler config
28+
GetRulesConfig(userID string) (configs.VersionedRulesConfig, error)
29+
// SetRulesConfig does a compare-and-swap (CAS) on the user's rules config.
30+
// `oldConfig` must precisely match the current config in order to change the config to `newConfig`.
31+
// Will return `true` if the config was updated, `false` otherwise.
32+
SetRulesConfig(userID string, oldConfig, newConfig configs.RulesConfig) (bool, error)
33+
34+
// GetAllRulesConfigs gets all of the ruler configs
35+
GetAllRulesConfigs() (map[string]configs.VersionedRulesConfig, error)
36+
// GetRulesConfigs gets all of the configs that have been added or have
37+
// changed since the provided config.
38+
GetRulesConfigs(since configs.ID) (map[string]configs.VersionedRulesConfig, error)
39+
}
40+
2541
// DB is the interface for the database.
2642
type DB interface {
43+
RulesDB
44+
2745
GetConfig(userID string) (configs.View, error)
2846
SetConfig(userID string, cfg configs.Config) error
2947

@@ -53,3 +71,12 @@ func New(cfg Config) (DB, error) {
5371
}
5472
return traced{timed{d}}, nil
5573
}
74+
75+
// NewRulesDB creates a new rules config database.
76+
func NewRulesDB(cfg Config) (RulesDB, error) {
77+
db, err := New(cfg)
78+
if err != nil {
79+
return nil, err
80+
}
81+
return db, err
82+
}

0 commit comments

Comments
 (0)