Skip to content

Remove deleted users from AlertManager #1558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ func (am *MultitenantAlertmanager) addNewConfigs(cfgs map[string]configs.View) {
// TODO: instrument how many configs we have, both valid & invalid.
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
for userID, config := range cfgs {

if config.IsDeleted() {
am.deleteUser(userID)
continue
}
err := am.setConfig(userID, config.Config)
if err != nil {
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error applying config", "err", err)
Expand Down Expand Up @@ -497,6 +500,16 @@ func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) {
return cfg, nil
}

func (am *MultitenantAlertmanager) deleteUser(userID string) {
am.alertmanagersMtx.Lock()
if existing, hasExisting := am.alertmanagers[userID]; hasExisting {
existing.Stop()
}
delete(am.alertmanagers, userID)
delete(am.cfgs, userID)
am.alertmanagersMtx.Unlock()
}

func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amconfig.Config) (*Alertmanager, error) {
newAM, err := New(&Config{
UserID: userID,
Expand Down
5 changes: 5 additions & 0 deletions pkg/configs/configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ type View struct {
DeletedAt time.Time `json:"deleted_at"`
}

// IsDeleted tells you if the config is deleted.
func (v View) IsDeleted() bool {
return !v.DeletedAt.IsZero()
}

// GetVersionedRulesConfig specializes the view to just the rules config.
func (v View) GetVersionedRulesConfig() *VersionedRulesConfig {
if v.Config.RulesConfig.Files == nil {
Expand Down
14 changes: 12 additions & 2 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d
workerWG: &sync.WaitGroup{},
}

ruler.scheduler = newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup)
ruler.scheduler = newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup, ruler.removeUser)

// If sharding is enabled, create/join a ring to distribute tokens to
// the ruler
Expand Down Expand Up @@ -226,6 +226,17 @@ func (r *Ruler) newGroup(userID string, groupName string, rls []rules.Rule) (*gr
return newGroup(groupName, rls, appendable, opts), nil
}

func (r *Ruler) removeUser(userID string) error {
r.notifiersMtx.Lock()
defer r.notifiersMtx.Unlock()

if n, ok := r.notifiers[userID]; ok {
n.stop()
}
delete(r.notifiers, userID)
return nil
}

// sendAlerts implements a rules.NotifyFunc for a Notifier.
// It filters any non-firing alerts from the input.
//
Expand Down Expand Up @@ -287,7 +298,6 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) {
return nil, err
}

// TODO: Remove notifiers for stale users. Right now this is a slow leak.
r.notifiers[userID] = n
return n.notifier, nil
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/ruler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type userConfig struct {
}

type groupFactory func(userID string, groupName string, rls []rules.Rule) (*group, error)
type removeFunction func(userID string) error

type scheduler struct {
ruleStore config_client.Client
Expand All @@ -94,19 +95,21 @@ type scheduler struct {
cfgs map[string]userConfig // all rules for all users
latestConfig configs.ID // # of last update received from config
groupFn groupFactory // function to create a new group
removeFn removeFunction // called when a user is deleted
sync.RWMutex
done chan struct{}
}

// newScheduler makes a new scheduler.
func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) *scheduler {
func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory, removeFn removeFunction) *scheduler {
return &scheduler{
ruleStore: ruleStore,
evaluationInterval: evaluationInterval,
pollInterval: pollInterval,
q: NewSchedulingQueue(clockwork.NewRealClock()),
cfgs: map[string]userConfig{},
groupFn: groupFn,
removeFn: removeFn,

done: make(chan struct{}),
}
Expand Down Expand Up @@ -246,6 +249,9 @@ func (s *scheduler) addUserConfig(now time.Time, hasher hash.Hash64, generation
if config.IsDeleted() {
delete(s.cfgs, userID)
s.Unlock()
if s.removeFn != nil {
s.removeFn(userID)
}
return
}
s.cfgs[userID] = userConfig{rules: rulesByGroup, generation: generation}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestSchedulerComputeNextEvalTime(t *testing.T) {
}

func TestSchedulerRulesOverlap(t *testing.T) {
s := newScheduler(nil, 15, 15, nil)
s := newScheduler(nil, 15, 15, nil, nil)
userID := "bob"
groupName := "test"
next := time.Now()
Expand Down