Skip to content

Commit 5a8d444

Browse files
committed
make variables consistently named and add logs
Signed-off-by: Jacob Lisi <[email protected]>
1 parent e745e45 commit 5a8d444

File tree

2 files changed

+16
-16
lines changed

2 files changed

+16
-16
lines changed

pkg/ruler/ruler.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ type Ruler struct {
149149

150150
// NewRuler creates a new ruler from a distributor and chunk store.
151151
func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d *distributor.Distributor, rulesAPI RulesAPI) (*Ruler, error) {
152+
if cfg.NumWorkers <= 0 {
153+
return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers)
154+
}
155+
152156
ncfg, err := buildNotifierConfig(&cfg)
153157
if err != nil {
154158
return nil, err
@@ -180,10 +184,6 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable storage.Queryable, d
180184
}
181185
}
182186

183-
if cfg.NumWorkers <= 0 {
184-
return nil, fmt.Errorf("must have at least 1 worker, got %d", cfg.NumWorkers)
185-
}
186-
187187
workers := make([]worker, cfg.NumWorkers)
188188
for i := 0; i < cfg.NumWorkers; i++ {
189189
workers[i] = newWorker(ruler)
@@ -217,8 +217,10 @@ func (r *Ruler) Stop() {
217217
w.Stop()
218218
}
219219

220-
r.lifecycler.Shutdown()
221-
r.ring.Stop()
220+
if r.cfg.EnableSharding {
221+
r.lifecycler.Shutdown()
222+
r.ring.Stop()
223+
}
222224
}
223225

224226
func (r *Ruler) newGroup(userID string, groupName string, rls []rules.Rule) (*group, error) {
@@ -309,12 +311,9 @@ func (r *Ruler) getOrCreateNotifier(userID string) (*notifier.Manager, error) {
309311
func (r *Ruler) Evaluate(userID string, item *workItem) {
310312
ctx := user.InjectOrgID(context.Background(), userID)
311313
logger := util.WithContext(ctx, util.Logger)
312-
if r.cfg.EnableSharding {
313-
owned := r.checkRule(item.hash)
314-
if !owned {
315-
level.Debug(util.Logger).Log("msg", "ruler: skipping evaluation, not owned", "user_id", item.userID, "group", item.groupName)
316-
return
317-
}
314+
if r.cfg.EnableSharding && !r.ownsRule(item.hash) {
315+
level.Debug(util.Logger).Log("msg", "ruler: skipping evaluation, not owned", "user_id", item.userID, "group", item.groupName)
316+
return
318317
}
319318
level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(item.group.Rules()))
320319
ctx, cancelTimeout := context.WithTimeout(ctx, r.cfg.GroupTimeout)
@@ -335,18 +334,20 @@ func (r *Ruler) Evaluate(userID string, item *workItem) {
335334
rulesProcessed.Add(float64(len(item.group.Rules())))
336335
}
337336

338-
func (r *Ruler) checkRule(hash uint32) bool {
337+
func (r *Ruler) ownsRule(hash uint32) bool {
339338
rlrs, err := r.ring.Get(hash, ring.Read)
340-
// If an error occurs keep evaluating a rule as if it is owned
339+
// If an error occurs evaluate a rule as if it is owned
341340
// better to have extra datapoints for a rule than none at all
341+
// TODO: add a temporary cache of owned rule values or something to fall back on
342342
if err != nil {
343+
level.Warn(util.Logger).Log("msg", "error reading ring to verify rule group ownership", "err", err)
343344
ringCheckErrors.Inc()
344345
return true
345346
}
346347
if rlrs.Ingesters[0].Addr == r.lifecycler.Addr {
347348
return true
348349
}
349-
level.Debug(util.Logger).Log("msg", "rule not owned, address does not match", "owner", rlrs.Ingesters[0].Addr, "current", r.cfg.LifecyclerConfig.Addr)
350+
level.Debug(util.Logger).Log("msg", "rule group not owned, address does not match", "owner", rlrs.Ingesters[0].Addr, "current", r.cfg.LifecyclerConfig.Addr)
350351
return false
351352
}
352353

pkg/ruler/scheduler.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ type scheduler struct {
107107
groupFn groupFactory // function to create a new group
108108
sync.RWMutex
109109

110-
stop chan struct{}
111110
done chan struct{}
112111
}
113112

0 commit comments

Comments
 (0)