Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
10f3000
basic version of gcs backed ruler
jtlisi Jun 14, 2019
c7aafaf
fix gcs configdb config function receiver to be pointer
jtlisi Jun 14, 2019
f8c9eaf
don't return error if no config for user is currenty set
jtlisi Jun 14, 2019
1ba663c
fix adding configs when none exist
jtlisi Jun 14, 2019
fb55660
ensure setting configs in gcs performs cas
jtlisi Jun 14, 2019
9656eb3
add debug logging to gcs configdb
jtlisi Jun 14, 2019
865008d
fix unmarshalling rules from gcs
jtlisi Jun 14, 2019
dfcbc2f
initial refactor of config backend apis
jtlisi Jun 25, 2019
2bd16dc
add debug logging and fix yaml response for rules
jtlisi Jun 25, 2019
fcfcc9b
return 404 when no rule groups are found
jtlisi Jun 26, 2019
eebd400
add set rule group api and fix rule gcs backend
jtlisi Jun 27, 2019
f4a78aa
add logging statements to read logging api
jtlisi Jun 27, 2019
c2a70f3
fix list rules for GCS rule store
jtlisi Jun 27, 2019
228a898
fix listing rule namesaces
jtlisi Jun 27, 2019
502e70a
fix polling and add xtra logging
jtlisi Jun 27, 2019
e768c31
fix double printing api response
jtlisi Jun 27, 2019
a13f738
fix listing rules with namespace
jtlisi Jun 27, 2019
3e68394
fix gcs listing rules by namespace
jtlisi Jun 27, 2019
a7621f4
fix test cases
jtlisi Jun 27, 2019
2dbc85e
return map instead of array for rules api
jtlisi Jun 28, 2019
cd902a2
fix alert prefix for gcs config backend
jtlisi Jun 28, 2019
343bf9b
fix ruler test
jtlisi Jun 28, 2019
13c6b00
fix setting namespace for rule creation
jtlisi Jun 30, 2019
4bdee83
update rule group creation endpoint
jtlisi Jul 2, 2019
2b474f9
remove unused metrics and add user metric for evaluation failures
jtlisi Jul 6, 2019
71984c9
add comment for later group instrumentation
jtlisi Jul 7, 2019
299f4d2
use promauto to register metrics
jtlisi Jul 7, 2019
5fbc094
fix log messages for owning rules
jtlisi Jul 8, 2019
516da88
fix typos
jtlisi Jul 8, 2019
0a9f184
keep namespace in struct for file parsing, unexport gcs error
jtlisi Jul 8, 2019
ae19f69
use rulegroup in groupfn function
jtlisi Jul 9, 2019
325861d
refactor package names and function names
jtlisi Jul 9, 2019
4d6b5e6
rename things
jtlisi Jul 10, 2019
f9763e5
fix s3 client to panic until implemented
jtlisi Jul 11, 2019
f374d63
refactor configdb and config storage clients
jtlisi Jul 11, 2019
03e722d
update poller to return storage
jtlisi Jul 12, 2019
109f65f
fix client tests
jtlisi Jul 12, 2019
f4cc733
remove unused config client
jtlisi Jul 12, 2019
ef85704
refactor configdb client package
jtlisi Jul 12, 2019
d0ebeef
add debug log line with rule eval latency
jtlisi Jul 12, 2019
ec7aabb
update eval latency bucket sizes
jtlisi Jul 12, 2019
440fbb5
update rule eval timestamp when iteration is missed
jtlisi Jul 14, 2019
ef0ec50
reorganize scheduling instrumentation
jtlisi Jul 14, 2019
e78e356
increase size of eval duration buckets
jtlisi Jul 14, 2019
3c94e10
readd total configs metric
jtlisi Jul 15, 2019
eaa6bf1
pass user id as label to the rule group metrics
jtlisi Jul 15, 2019
f7f28bb
pass user id to prometheus rule group metrics
jtlisi Jul 16, 2019
ad0dd20
refactor to use kv store to check for config updates
jtlisi Jul 18, 2019
962db79
add logs and comments
jtlisi Jul 19, 2019
e6d8952
return 404 when rule group is not found
jtlisi Jul 19, 2019
18f96ce
add delete rule group feature
jtlisi Jul 19, 2019
1c04055
fix linting and tests
jtlisi Jul 22, 2019
8ea8cc7
fix ruler tests by adding mutex to mock store
jtlisi Jul 22, 2019
234411b
mod tidy
jtlisi Jul 22, 2019
5172d08
remove aws config backend until later
jtlisi Jul 22, 2019
12d98fc
remove unused package
jtlisi Jul 23, 2019
7f3d836
move storage definitions to storage subpackages
jtlisi Jul 23, 2019
4e0c50c
ensure alertmanager config flags are registered
jtlisi Jul 23, 2019
559771f
fix rebase error
jtlisi Jul 23, 2019
0fff90b
add lateness to work item debug log line
jtlisi Jul 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto
pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto
pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
pkg/storage/rules/rules.pb.go: pkg/storage/rules/rules.proto
pkg/util/usertracker/usertracker.pb.go: pkg/util/usertracker/usertracker.proto
all: $(UPTODATE_FILES)
test: protos
mod-check: protos
Expand Down
116 changes: 116 additions & 0 deletions pkg/alertmanager/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package alertmanager

import (
"io/ioutil"
"net/http"

"github.com/cortexproject/cortex/pkg/storage/alerts"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/mux"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
)

// RegisterRoutes registers the configs API HTTP routes with the provided Router.
func (am *MultitenantAlertmanager) RegisterRoutes(r *mux.Router) {
// if no store is set return without regisering routes
if am.store == nil {
return
}
for _, route := range []struct {
name, method, path string
handler http.HandlerFunc
}{
{"get_config", "GET", "/api/prom/alertmanager", am.getUserConfig},
{"set_config", "POST", "/api/prom/alertmanager", am.setUserConfig},
{"delete_config", "DELETE", "/api/prom/alertmanager", am.deleteUserConfig},
} {
r.Handle(route.path, route.handler).Methods(route.method).Name(route.name)
}
}

func (am *MultitenantAlertmanager) getUserConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

logger := util.WithContext(r.Context(), util.Logger)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if userID == "" {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

cfg, err := am.store.GetAlertConfig(r.Context(), userID)

if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

d, err := yaml.Marshal(&cfg)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/yaml")
if _, err := w.Write(d); err != nil {
level.Error(logger).Log("msg", "error marshalling yaml alertmanager config", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}

func (am *MultitenantAlertmanager) setUserConfig(w http.ResponseWriter, r *http.Request) {
userID, _, err := user.ExtractOrgIDFromHTTPRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

logger := util.WithContext(r.Context(), util.Logger)

if userID == "" {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

payload, err := ioutil.ReadAll(r.Body)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

cfg := alerts.AlertConfig{}
err = yaml.Unmarshal(payload, &cfg)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

err = am.store.SetAlertConfig(r.Context(), userID, cfg)
if err != nil {
level.Error(logger).Log("err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusOK)
}

func (am *MultitenantAlertmanager) deleteUserConfig(w http.ResponseWriter, r *http.Request) {

}
101 changes: 101 additions & 0 deletions pkg/alertmanager/kv_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package alertmanager

import (
"context"

"github.com/cortexproject/cortex/pkg/storage/alerts"
"github.com/cortexproject/cortex/pkg/util/usertracker"
)

// trackedAlertPoller checks for updated user configs and
// Retrieves the updated configuration from the backend
type trackedAlertPoller struct {
tracker *usertracker.Tracker
store alerts.AlertStore

initialized bool
}

func newTrackedAlertPoller(tracker *usertracker.Tracker, store alerts.AlertStore) (*trackedAlertPoller, error) {
return &trackedAlertPoller{
tracker: tracker,
store: store,

initialized: false,
}, nil
}

func (p *trackedAlertPoller) trackedAlertStore() *trackedAlertStore {
return &trackedAlertStore{
tracker: p.tracker,
store: p.store,
}
}

// PollAlerts returns the alerts changed since the last poll
// All alert configurations are returned on the first poll
func (p *trackedAlertPoller) PollAlerts(ctx context.Context) (map[string]alerts.AlertConfig, error) {
updatedConfigs := map[string]alerts.AlertConfig{}

// First poll will return all rule groups
if !p.initialized {
p.initialized = true
return p.store.ListAlertConfigs(ctx)
}

// Get the changed users from the user update tracker
users := p.tracker.GetUpdatedUsers(ctx)

// Retrieve user configuration from the rule store
// TODO: Add Retry logic for failed requests
// TODO: store users that were failed to be updated and reattempt to retrieve on the next poll
for _, u := range users {
cfg, err := p.store.GetAlertConfig(ctx, u)
if err != nil {
return nil, err
}

updatedConfigs[u] = cfg
}

return updatedConfigs, nil
}

func (p *trackedAlertPoller) Stop() {
p.tracker.Stop()
}

type trackedAlertStore struct {
tracker *usertracker.Tracker
store alerts.AlertStore
}

// ListAlertConfigs passes through to the embedded alert store
func (w *trackedAlertStore) ListAlertConfigs(ctx context.Context) (map[string]alerts.AlertConfig, error) {
return w.store.ListAlertConfigs(ctx)
}

// GetAlertConfig passes through to the embedded alert store
func (w *trackedAlertStore) GetAlertConfig(ctx context.Context, id string) (alerts.AlertConfig, error) {
return w.store.GetAlertConfig(ctx, id)
}

// SetAlertConfig passes through to the embedded alert store, and tracks a user change
func (w *trackedAlertStore) SetAlertConfig(ctx context.Context, id string, cfg alerts.AlertConfig) error {
err := w.store.SetAlertConfig(ctx, id, cfg)
if err != nil {
return err
}

return w.tracker.UpdateUser(ctx, id)
}

// DeleteAlertConfig passes through to the embedded alert store, and tracks a user change
func (w *trackedAlertStore) DeleteAlertConfig(ctx context.Context, id string) error {
err := w.store.DeleteAlertConfig(ctx, id)
if err != nil {
return err
}

return w.tracker.UpdateUser(ctx, id)
}
52 changes: 25 additions & 27 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/weaveworks/mesh"

"github.com/cortexproject/cortex/pkg/configs"
configs_client "github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/storage/alerts"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
)
Expand Down Expand Up @@ -185,10 +184,13 @@ type MultitenantAlertmanagerConfig struct {
FallbackConfigFile string
AutoWebhookRoot string
AutoSlackRoot string

AlertStore AlertStoreConfig
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
cfg.AlertStore.RegisterFlags(f)
flag.StringVar(&cfg.DataDir, "alertmanager.storage.path", "data/", "Base path for data storage.")
flag.DurationVar(&cfg.Retention, "alertmanager.storage.retention", 5*24*time.Hour, "How long to keep data for.")

Expand All @@ -214,22 +216,20 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
type MultitenantAlertmanager struct {
cfg *MultitenantAlertmanagerConfig

configsAPI configs_client.Client
store alerts.AlertStore
poller alerts.AlertPoller

// The fallback config is stored as a string and parsed every time it's needed
// because we mutate the parsed results and don't want those changes to take
// effect here.
fallbackConfig string

// All the organization configurations that we have. Only used for instrumentation.
cfgs map[string]configs.Config
cfgs map[string]alerts.AlertConfig

alertmanagersMtx sync.Mutex
alertmanagers map[string]*Alertmanager

latestConfig configs.ID
latestMutex sync.RWMutex

meshRouter *gossipFactory
srvDiscovery *srvDiscovery

Expand All @@ -238,17 +238,12 @@ type MultitenantAlertmanager struct {
}

// NewMultitenantAlertmanager creates a new MultitenantAlertmanager.
func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg configs_client.Config) (*MultitenantAlertmanager, error) {
func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig) (*MultitenantAlertmanager, error) {
err := os.MkdirAll(cfg.DataDir, 0777)
if err != nil {
return nil, fmt.Errorf("unable to create Alertmanager data directory %q: %s", cfg.DataDir, err)
}

configsAPI, err := configs_client.New(cfgCfg)
if err != nil {
return nil, err
}

mrouter := initMesh(cfg.MeshListenAddr, cfg.MeshHWAddr, cfg.MeshNickname, cfg.MeshPassword)
mrouter.Start()

Expand All @@ -265,11 +260,17 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, cfgCfg confi
}

gf := newGossipFactory(mrouter)

poller, store, err := NewAlertStore(cfg.AlertStore)
if err != nil {
return nil, fmt.Errorf("unable to create Alertmanager storage, %s", err)
}

am := &MultitenantAlertmanager{
cfg: cfg,
configsAPI: configsAPI,
poller: poller,
store: store,
fallbackConfig: string(fallbackConfig),
cfgs: map[string]configs.Config{},
alertmanagers: map[string]*Alertmanager{},
meshRouter: &gf,
srvDiscovery: newSRVDiscovery(cfg.MeshPeerService, cfg.MeshPeerHost, cfg.MeshPeerRefreshInterval),
Expand Down Expand Up @@ -314,6 +315,7 @@ func (am *MultitenantAlertmanager) Run() {

// Stop stops the MultitenantAlertmanager.
func (am *MultitenantAlertmanager) Stop() {
am.poller.Stop()
am.srvDiscovery.Stop()
close(am.stop)
<-am.done
Expand All @@ -326,7 +328,7 @@ func (am *MultitenantAlertmanager) Stop() {

// Load the full set of configurations from the server, retrying with backoff
// until we can get them.
func (am *MultitenantAlertmanager) loadAllConfigs() map[string]configs.View {
func (am *MultitenantAlertmanager) loadAllConfigs() map[string]alerts.AlertConfig {
backoff := util.NewBackoff(context.Background(), backoffConfig)
for {
cfgs, err := am.poll()
Expand All @@ -349,25 +351,21 @@ func (am *MultitenantAlertmanager) updateConfigs(now time.Time) error {
}

// poll the configuration server. Not re-entrant.
func (am *MultitenantAlertmanager) poll() (map[string]configs.View, error) {
configID := am.latestConfig
cfgs, err := am.configsAPI.GetAlerts(context.Background(), configID)
func (am *MultitenantAlertmanager) poll() (map[string]alerts.AlertConfig, error) {
cfgs, err := am.poller.PollAlerts(context.Background())
if err != nil {
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: configs server poll failed", "err", err)
return nil, err
}
am.latestMutex.Lock()
am.latestConfig = cfgs.GetLatestConfigID()
am.latestMutex.Unlock()
return cfgs.Configs, nil
return cfgs, nil
}

func (am *MultitenantAlertmanager) addNewConfigs(cfgs map[string]configs.View) {
func (am *MultitenantAlertmanager) addNewConfigs(cfgs map[string]alerts.AlertConfig) {
// TODO: instrument how many configs we have, both valid & invalid.
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
for userID, config := range cfgs {

err := am.setConfig(userID, config.Config)
err := am.setConfig(userID, config)
if err != nil {
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error applying config", "err", err)
continue
Expand Down Expand Up @@ -426,7 +424,7 @@ func (am *MultitenantAlertmanager) createTemplatesFile(userID, fn, content strin

// setConfig applies the given configuration to the alertmanager for `userID`,
// creating an alertmanager if it doesn't already exist.
func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Config) error {
func (am *MultitenantAlertmanager) setConfig(userID string, config alerts.AlertConfig) error {
_, hasExisting := am.alertmanagers[userID]
var amConfig *amconfig.Config
var err error
Expand Down Expand Up @@ -489,7 +487,7 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi
}

// alertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration.
func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) {
func alertmanagerConfigFromConfig(c alerts.AlertConfig) (*amconfig.Config, error) {
cfg, err := amconfig.Load(c.AlertmanagerConfig)
if err != nil {
return nil, fmt.Errorf("error parsing Alertmanager config: %s", err)
Expand Down
Loading