Skip to content

Commit 5ae26c4

Browse files
authored
Merge pull request #1558 from cortexproject/remove-users-more
Remove deleted users from AlertManager
2 parents af5cedc + 3790418 commit 5ae26c4

File tree

5 files changed

+39
-5
lines changed

5 files changed

+39
-5
lines changed

pkg/alertmanager/multitenant.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,10 @@ func (am *MultitenantAlertmanager) addNewConfigs(cfgs map[string]configs.View) {
368368
// TODO: instrument how many configs we have, both valid & invalid.
369369
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
370370
for userID, config := range cfgs {
371-
371+
if config.IsDeleted() {
372+
am.deleteUser(userID)
373+
continue
374+
}
372375
err := am.setConfig(userID, config.Config)
373376
if err != nil {
374377
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: error applying config", "err", err)
@@ -501,6 +504,16 @@ func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) {
501504
return cfg, nil
502505
}
503506

507+
func (am *MultitenantAlertmanager) deleteUser(userID string) {
508+
am.alertmanagersMtx.Lock()
509+
if existing, hasExisting := am.alertmanagers[userID]; hasExisting {
510+
existing.Stop()
511+
}
512+
delete(am.alertmanagers, userID)
513+
delete(am.cfgs, userID)
514+
am.alertmanagersMtx.Unlock()
515+
}
516+
504517
func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amconfig.Config) (*Alertmanager, error) {
505518
newAM, err := New(&Config{
506519
UserID: userID,

pkg/configs/configs.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,11 @@ type View struct {
127127
DeletedAt time.Time `json:"deleted_at"`
128128
}
129129

130+
// IsDeleted tells you if the config is deleted.
131+
func (v View) IsDeleted() bool {
132+
return !v.DeletedAt.IsZero()
133+
}
134+
130135
// GetVersionedRulesConfig specializes the view to just the rules config.
131136
func (v View) GetVersionedRulesConfig() *VersionedRulesConfig {
132137
if v.Config.RulesConfig.Files == nil {

pkg/ruler/ruler.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d
151151
workerWG: &sync.WaitGroup{},
152152
}
153153

154-
ruler.scheduler = newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup)
154+
ruler.scheduler = newScheduler(rulesAPI, cfg.EvaluationInterval, cfg.EvaluationInterval, ruler.newGroup, ruler.removeUser)
155155

156156
// If sharding is enabled, create/join a ring to distribute tokens to
157157
// the ruler
@@ -226,6 +226,17 @@ func (r *Ruler) newGroup(userID string, groupName string, rls []rules.Rule) (*gr
226226
return newGroup(groupName, rls, appendable, opts), nil
227227
}
228228

229+
func (r *Ruler) removeUser(userID string) error {
230+
r.notifiersMtx.Lock()
231+
defer r.notifiersMtx.Unlock()
232+
233+
if n, ok := r.notifiers[userID]; ok {
234+
n.stop()
235+
}
236+
delete(r.notifiers, userID)
237+
return nil
238+
}
239+
229240
// sendAlerts implements a rules.NotifyFunc for a Notifier.
230241
// It filters any non-firing alerts from the input.
231242
//
@@ -287,7 +298,6 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) {
287298
return nil, err
288299
}
289300

290-
// TODO: Remove notifiers for stale users. Right now this is a slow leak.
291301
r.notifiers[userID] = n
292302
return n.notifier, nil
293303
}

pkg/ruler/scheduler.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type userConfig struct {
8383
}
8484

8585
type groupFactory func(userID string, groupName string, rls []rules.Rule) (*group, error)
86+
type removeFunction func(userID string) error
8687

8788
type scheduler struct {
8889
ruleStore config_client.Client
@@ -94,19 +95,21 @@ type scheduler struct {
9495
cfgs map[string]userConfig // all rules for all users
9596
latestConfig configs.ID // # of last update received from config
9697
groupFn groupFactory // function to create a new group
98+
removeFn removeFunction // called when a user is deleted
9799
sync.RWMutex
98100
done chan struct{}
99101
}
100102

101103
// newScheduler makes a new scheduler.
102-
func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory) *scheduler {
104+
func newScheduler(ruleStore config_client.Client, evaluationInterval, pollInterval time.Duration, groupFn groupFactory, removeFn removeFunction) *scheduler {
103105
return &scheduler{
104106
ruleStore: ruleStore,
105107
evaluationInterval: evaluationInterval,
106108
pollInterval: pollInterval,
107109
q: NewSchedulingQueue(clockwork.NewRealClock()),
108110
cfgs: map[string]userConfig{},
109111
groupFn: groupFn,
112+
removeFn: removeFn,
110113

111114
done: make(chan struct{}),
112115
}
@@ -246,6 +249,9 @@ func (s *scheduler) addUserConfig(now time.Time, hasher hash.Hash64, generation
246249
if config.IsDeleted() {
247250
delete(s.cfgs, userID)
248251
s.Unlock()
252+
if s.removeFn != nil {
253+
s.removeFn(userID)
254+
}
249255
return
250256
}
251257
s.cfgs[userID] = userConfig{rules: rulesByGroup, generation: generation}

pkg/ruler/scheduler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestSchedulerComputeNextEvalTime(t *testing.T) {
6868
}
6969

7070
func TestSchedulerRulesOverlap(t *testing.T) {
71-
s := newScheduler(nil, 15, 15, nil)
71+
s := newScheduler(nil, 15, 15, nil, nil)
7272
userID := "bob"
7373
groupName := "test"
7474
next := time.Now()

0 commit comments

Comments
 (0)