Skip to content

Spread rule evaluation over the evaluation interval #716

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion pkg/ruler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package ruler
import (
"context"
"fmt"
"hash"
"hash/fnv"
"math"
"sync"
"time"

Expand Down Expand Up @@ -180,9 +183,28 @@ func (s *scheduler) poll() (map[string]configs.VersionedRulesConfig, error) {
return cfgs, nil
}

// computeNextEvalTime Computes when a user's rules should be next evaluated, based on how far we are through an evaluation cycle
func (s *scheduler) computeNextEvalTime(hasher hash.Hash64, now time.Time, userID string) time.Time {
intervalNanos := float64(s.evaluationInterval.Nanoseconds())
// Compute how far we are into the current evaluation cycle
currentEvalCyclePoint := math.Mod(float64(now.UnixNano()), intervalNanos)

hasher.Reset()
hasher.Write([]byte(userID))
offset := math.Mod(
// We subtract our current point in the cycle to cause the entries
// before 'now' to wrap around to the end.
// We don't want this to come out negative, so we add the interval to it
float64(hasher.Sum64())+intervalNanos-currentEvalCyclePoint,
intervalNanos)
return now.Add(time.Duration(int64(offset)))
}

func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.VersionedRulesConfig) {
// TODO: instrument how many configs we have, both valid & invalid.
level.Debug(util.Logger).Log("msg", "adding configurations", "num_configs", len(cfgs))
hasher := fnv.New64a()

for userID, config := range cfgs {
rulesByFilename, err := config.Config.Parse()
if err != nil {
Expand All @@ -203,9 +225,10 @@ func (s *scheduler) addNewConfigs(now time.Time, cfgs map[string]configs.Version
}
s.Unlock()
if !config.IsDeleted() {
evalTime := s.computeNextEvalTime(hasher, now, userID)
for fn, rules := range rulesByFilename {
level.Debug(util.Logger).Log("msg", "scheduler: updating rules for user and filename", "user_id", userID, "filename", fn, "num_rules", len(rules))
s.addWorkItem(workItem{userID, fn, rules, now})
s.addWorkItem(workItem{userID, fn, rules, evalTime})
}
}
}
Expand Down
67 changes: 67 additions & 0 deletions pkg/ruler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package ruler

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

type fakeHasher struct {
something uint32
data *[]byte
}

func (h *fakeHasher) Write(data []byte) (int, error) {
h.data = &data
return len(data), nil
}
func (h *fakeHasher) Reset() {
h.data = nil
}
func (h *fakeHasher) Size() int {
return 0
}
func (h *fakeHasher) BlockSize() int {
return 64
}
func (h *fakeHasher) Sum([]byte) []byte {
return []byte{}
}
func (h *fakeHasher) Sum64() uint64 {
i, _ := strconv.ParseUint(string(*h.data), 10, 64)
return i
}

func TestSchedulerComputeNextEvalTime(t *testing.T) {
h := fakeHasher{}
// normal intervals are in seconds; this is nanoseconds for the test
s := scheduler{evaluationInterval: 15}
evalTime := func(now, hashResult int64) int64 {
// We use the fake hasher to give us control over the hash output
// so that we can test the wrap-around behaviour of the modulo
fakeUserID := strconv.FormatInt(hashResult, 10)
return s.computeNextEvalTime(&h, time.Unix(0, now), fakeUserID).UnixNano()
}
{
cycleStartTime := int64(30)
cycleOffset := int64(0) // cycleStartTime % s.evaluationInterval
// Check simple case where hash >= current cycle position
assert.Equal(t, cycleStartTime+0, evalTime(cycleStartTime, cycleOffset+0))
assert.Equal(t, cycleStartTime+1, evalTime(cycleStartTime, cycleOffset+1))
assert.Equal(t, cycleStartTime+14, evalTime(cycleStartTime, cycleOffset+14))
// Check things are cyclic
assert.Equal(t, evalTime(cycleStartTime, 0), evalTime(cycleStartTime, int64(s.evaluationInterval)))
}
{
midCycleTime := int64(35)
cycleOffset := int64(5) // midCycleTime % s.evaluationInterval
// Check case where hash can be either greater or less than current cycle position
assert.Equal(t, midCycleTime+0, evalTime(midCycleTime, cycleOffset+0))
assert.Equal(t, midCycleTime+1, evalTime(midCycleTime, cycleOffset+1))
assert.Equal(t, midCycleTime+9, evalTime(midCycleTime, cycleOffset+9))
assert.Equal(t, midCycleTime+10, evalTime(midCycleTime, cycleOffset-5))
assert.Equal(t, midCycleTime+14, evalTime(midCycleTime, cycleOffset-1))
}
}