diff --git a/pkg/ruler/scheduler.go b/pkg/ruler/scheduler.go index cc03d845e9f..766ff16acf0 100644 --- a/pkg/ruler/scheduler.go +++ b/pkg/ruler/scheduler.go @@ -3,6 +3,9 @@ package ruler import ( "context" "fmt" + "hash" + "hash/fnv" + "math" "sync" "time" @@ -180,9 +183,28 @@ func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) { return cfgs, nil } +// computeNextEvalTime Computes when a user's rules should be next evaluated, based on how far we are through an evaluation cycle +func (s *scheduler) computeNextEvalTime(hasher hash.Hash64, now time.Time, userID string) time.Time { + intervalNanos := float64(s.evaluationInterval.Nanoseconds()) + // Compute how far we are into the current evaluation cycle + currentEvalCyclePoint := math.Mod(float64(now.UnixNano()), intervalNanos) + + hasher.Reset() + hasher.Write([]byte(userID)) + offset := math.Mod( + // We subtract our current point in the cycle to cause the entries + // before 'now' to wrap around to the end. + // We don't want this to come out negative, so we add the interval to it + float64(hasher.Sum64())+intervalNanos-currentEvalCyclePoint, + intervalNanos) + return now.Add(time.Duration(int64(offset))) +} + func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) { // TODO: instrument how many configs we have, both valid & invalid. level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs)) + hasher := fnv.New64a() + for userID, config := range cfgs { rulesByFilename, err := config.Config.Parse() if err != nil { @@ -203,9 +225,10 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.Version } s.Unlock() if !config.IsDeleted() { + evalTime := s.computeNextEvalTime(hasher, now, userID) for fn, rules := range rulesByFilename { level.Debug(util.Logger).Log("msg", "scheduler: updating rules for user and filename", "user_id", userID, "filename", fn, "num_rules", len(rules)) - s.addWorkItem(workItem{userID, fn, rules, now}) + s.addWorkItem(workItem{userID, fn, rules, evalTime}) } } } diff --git a/pkg/ruler/scheduler_test.go b/pkg/ruler/scheduler_test.go new file mode 100644 index 00000000000..41a8bdeb9eb --- /dev/null +++ b/pkg/ruler/scheduler_test.go @@ -0,0 +1,67 @@ +package ruler + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type fakeHasher struct { + something uint32 + data *[]byte +} + +func (h *fakeHasher) Write(data []byte) (int, error) { + h.data = &data + return len(data), nil +} +func (h *fakeHasher) Reset() { + h.data = nil +} +func (h *fakeHasher) Size() int { + return 0 +} +func (h *fakeHasher) BlockSize() int { + return 64 +} +func (h *fakeHasher) Sum([]byte) []byte { + return []byte{} +} +func (h *fakeHasher) Sum64() uint64 { + i, _ := strconv.ParseUint(string(*h.data), 10, 64) + return i +} + +func TestSchedulerComputeNextEvalTime(t *testing.T) { + h := fakeHasher{} + // normal intervals are in seconds; this is nanoseconds for the test + s := scheduler{evaluationInterval: 15} + evalTime := func(now, hashResult int64) int64 { + // We use the fake hasher to give us control over the hash output + // so that we can test the wrap-around behaviour of the modulo + fakeUserID := strconv.FormatInt(hashResult, 10) + return s.computeNextEvalTime(&h, time.Unix(0, now), fakeUserID).UnixNano() + } + { + cycleStartTime := int64(30) + cycleOffset := int64(0) // cycleStartTime % s.evaluationInterval + // Check simple case where hash >= current cycle position + assert.Equal(t, cycleStartTime+0, evalTime(cycleStartTime, cycleOffset+0)) + assert.Equal(t, cycleStartTime+1, evalTime(cycleStartTime, cycleOffset+1)) + assert.Equal(t, cycleStartTime+14, evalTime(cycleStartTime, cycleOffset+14)) + // Check things are cyclic + assert.Equal(t, evalTime(cycleStartTime, 0), evalTime(cycleStartTime, int64(s.evaluationInterval))) + } + { + midCycleTime := int64(35) + cycleOffset := int64(5) // midCycleTime % s.evaluationInterval + // Check case where hash can be either greater or less than current cycle position + assert.Equal(t, midCycleTime+0, evalTime(midCycleTime, cycleOffset+0)) + assert.Equal(t, midCycleTime+1, evalTime(midCycleTime, cycleOffset+1)) + assert.Equal(t, midCycleTime+9, evalTime(midCycleTime, cycleOffset+9)) + assert.Equal(t, midCycleTime+10, evalTime(midCycleTime, cycleOffset-5)) + assert.Equal(t, midCycleTime+14, evalTime(midCycleTime, cycleOffset-1)) + } +}