Skip to content

Allow switching to daily instead of hourly index buckets #178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 48 additions & 22 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"net/url"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -139,9 +140,10 @@ type Store interface {

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
S3URL string
DynamoDBURL string
ChunkCache *Cache
S3URL string
DynamoDBURL string
ChunkCache *Cache
DailyBucketsFrom model.Time

// Not exported as only used by tests to inject mocks
dynamodb dynamodbClient
Expand Down Expand Up @@ -206,6 +208,10 @@ type AWSStore struct {
tableName string
bucketName string

// Around which time to start bucketing indexes by day instead of by hour.
// Only the day matters, not the time within the day.
dailyBucketsFrom model.Time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is never initialised! Which means we'll default to weekly buckets for everything...

We should do two things: copy the config straight into the AWSStore struct, and change the meaning so the time-zero-value means no weekly buckets.

Will backout this PR.


dynamoRequests chan dynamoOp
dynamoRequestsDone sync.WaitGroup
}
Expand Down Expand Up @@ -332,25 +338,45 @@ func (c *AWSStore) CreateTables() error {
return err
}

func bigBuckets(from, through model.Time) []int64 {
func (c *AWSStore) bigBuckets(from, through model.Time) []string {
var (
secondsInHour = int64(time.Hour / time.Second)
fromHour = from.Unix() / secondsInHour
throughHour = through.Unix() / secondsInHour
result []int64

secondsInDay = int64(24 * time.Hour / time.Second)
fromDay = from.Unix() / secondsInDay
throughDay = through.Unix() / secondsInDay

firstDailyBucket = c.dailyBucketsFrom.Unix() / secondsInDay
lastHourlyBucket = firstDailyBucket * 24

result []string
)

for i := fromHour; i <= throughHour; i++ {
result = append(result, i)
if i > lastHourlyBucket {
break
}
result = append(result, strconv.Itoa(int(i)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure the conversion to int is safe here - they'll run out on 2038-01-19 IIRC. Should probably use https://golang.org/pkg/strconv/#FormatInt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ignore me, these are hours, so the range is huge.

}

for i := fromDay; i <= throughDay; i++ {
if i < firstDailyBucket {
continue
}
result = append(result, fmt.Sprintf("d%d", int(i)))
}

return result
}

func chunkName(userID, chunkID string) string {
return fmt.Sprintf("%s/%s", userID, chunkID)
}

func hashValue(userID string, hour int64, metricName model.LabelValue) string {
return fmt.Sprintf("%s:%d:%s", userID, hour, metricName)
func hashValue(userID string, bucket string, metricName model.LabelValue) string {
return fmt.Sprintf("%s:%s:%s", userID, bucket, metricName)
}

func rangeValue(label model.LabelName, value model.LabelValue, chunkID string) []byte {
Expand Down Expand Up @@ -456,8 +482,8 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) ([]*dyna
}

entries := 0
for _, hour := range bigBuckets(chunk.From, chunk.Through) {
hashValue := hashValue(userID, hour, metricName)
for _, bucket := range c.bigBuckets(chunk.From, chunk.Through) {
hashValue := hashValue(userID, bucket, metricName)
for label, value := range chunk.Metric {
if label == model.MetricNameLabel {
continue
Expand Down Expand Up @@ -556,18 +582,18 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug

incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)
buckets := bigBuckets(from, through)
buckets := c.bigBuckets(from, through)
totalLookups := int32(0)
for _, hour := range buckets {
go func(hour int64) {
incoming, lookups, err := c.lookupChunksFor(ctx, userID, hour, metricName, matchers)
for _, b := range buckets {
go func(bucket string) {
incoming, lookups, err := c.lookupChunksFor(ctx, userID, bucket, metricName, matchers)
atomic.AddInt32(&totalLookups, lookups)
if err != nil {
incomingErrors <- err
} else {
incomingChunkSets <- incoming
}
}(hour)
}(b)
}

var chunks ByID
Expand All @@ -592,17 +618,17 @@ func next(s string) string {
return result
}

func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
if len(matchers) == 0 {
return c.lookupChunksForMetricName(ctx, userID, hour, metricName)
return c.lookupChunksForMetricName(ctx, userID, bucket, metricName)
}

incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)

for _, matcher := range matchers {
go func(matcher *metric.LabelMatcher) {
incoming, err := c.lookupChunksForMatcher(ctx, userID, hour, metricName, matcher)
incoming, err := c.lookupChunksForMatcher(ctx, userID, bucket, metricName, matcher)
if err != nil {
incomingErrors <- err
} else {
Expand All @@ -624,8 +650,8 @@ func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int6
return nWayIntersect(chunkSets), int32(len(matchers)), lastErr
}

func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) {
hashValue := hashValue(userID, hour, metricName)
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, bucket string, metricName model.LabelValue) (ByID, int32, error) {
hashValue := hashValue(userID, bucket, metricName)
input := &dynamodb.QueryInput{
TableName: aws.String(c.tableName),
KeyConditions: map[string]*dynamodb.Condition{
Expand Down Expand Up @@ -665,8 +691,8 @@ func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string,
return chunkSet, 1, nil
}

func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
hashValue := hashValue(userID, hour, metricName)
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
hashValue := hashValue(userID, bucket, metricName)
var rangeMinValue, rangeMaxValue []byte
if matcher.Type == metric.Equal {
nextValue := model.LabelValue(next(string(matcher.Value)))
Expand Down
41 changes: 41 additions & 0 deletions chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,44 @@ func diff(want, have interface{}) string {
})
return "\n" + text
}

func TestBigBuckets(t *testing.T) {
scenarios := []struct {
from, through, dailyBucketsFrom model.Time
buckets []string
}{
{
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
dailyBucketsFrom: model.TimeFromUnix(0).Add(1 * 24 * time.Hour),
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"},
},
{
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
dailyBucketsFrom: model.TimeFromUnix(0).Add(2*24*time.Hour) - 1, // Only the day matters for the epoch start time, not the time.
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"},
},
{
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
dailyBucketsFrom: model.TimeFromUnix(0).Add(1*24*time.Hour) - 1,
buckets: []string{"0", "d0", "d1", "d2"},
},
{
from: model.TimeFromUnix(0),
through: model.TimeFromUnix(0).Add(2 * 24 * time.Hour),
dailyBucketsFrom: model.TimeFromUnix(0).Add(99 * 24 * time.Hour),
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48"},
},
}
for i, s := range scenarios {
cs := &AWSStore{
dailyBucketsFrom: s.dailyBucketsFrom,
}
buckets := cs.bigBuckets(s.from, s.through)
if !reflect.DeepEqual(buckets, s.buckets) {
t.Fatalf("%d. unexpected buckets; want %v, got %v", i, s.buckets, buckets)
}
}
}
46 changes: 27 additions & 19 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
Expand Down Expand Up @@ -60,22 +61,23 @@ func init() {
}

type cfg struct {
mode string
listenPort int
consulHost string
consulPrefix string
s3URL string
dynamodbURL string
dynamodbCreateTables bool
dynamodbPollInterval time.Duration
memcachedHostname string
memcachedTimeout time.Duration
memcachedExpiration time.Duration
memcachedService string
remoteTimeout time.Duration
numTokens int
logSuccess bool
watchDynamo bool
mode string
listenPort int
consulHost string
consulPrefix string
s3URL string
dynamodbURL string
dynamodbCreateTables bool
dynamodbPollInterval time.Duration
dynamodbDailyBucketsFrom string
memcachedHostname string
memcachedTimeout time.Duration
memcachedExpiration time.Duration
memcachedService string
remoteTimeout time.Duration
numTokens int
logSuccess bool
watchDynamo bool

ingesterConfig ingester.Config
distributorConfig distributor.Config
Expand All @@ -95,6 +97,7 @@ func main() {
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.")
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
flag.StringVar(&cfg.dynamodbDailyBucketsFrom, "dynamodb.daily-buckets-from", "9999-01-01", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.")
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")

flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
Expand Down Expand Up @@ -246,10 +249,15 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
Expiration: cfg.memcachedExpiration,
}
}
dailyBucketsFrom, err := time.Parse("2006-01-02", cfg.dynamodbDailyBucketsFrom)
if err != nil {
return nil, fmt.Errorf("error parsing daily buckets begin date: %v", err)
}
chunkStore, err := chunk.NewAWSStore(chunk.StoreConfig{
S3URL: cfg.s3URL,
DynamoDBURL: cfg.dynamodbURL,
ChunkCache: chunkCache,
S3URL: cfg.s3URL,
DynamoDBURL: cfg.dynamodbURL,
ChunkCache: chunkCache,
DailyBucketsFrom: model.TimeFromUnix(dailyBucketsFrom.Unix()),
})
if err != nil {
return nil, err
Expand Down