Skip to content

Commit ef27943

Browse files
authored
Merge pull request #178 from weaveworks/daily-buckets
Allow switching to daily instead of hourly index buckets
2 parents f6a7b1e + 4ba3082 commit ef27943

File tree

3 files changed

+116
-41
lines changed

3 files changed

+116
-41
lines changed

chunk/chunk_store.go

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"math/rand"
77
"net/url"
88
"sort"
9+
"strconv"
910
"strings"
1011
"sync"
1112
"sync/atomic"
@@ -139,9 +140,10 @@ type Store interface {
139140

140141
// StoreConfig specifies config for a ChunkStore
141142
type StoreConfig struct {
142-
S3URL string
143-
DynamoDBURL string
144-
ChunkCache *Cache
143+
S3URL string
144+
DynamoDBURL string
145+
ChunkCache *Cache
146+
DailyBucketsFrom model.Time
145147

146148
// Not exported as only used by tests to inject mocks
147149
dynamodb dynamodbClient
@@ -206,6 +208,10 @@ type AWSStore struct {
206208
tableName string
207209
bucketName string
208210

211+
// Around which time to start bucketing indexes by day instead of by hour.
212+
// Only the day matters, not the time within the day.
213+
dailyBucketsFrom model.Time
214+
209215
dynamoRequests chan dynamoOp
210216
dynamoRequestsDone sync.WaitGroup
211217
}
@@ -332,25 +338,45 @@ func (c *AWSStore) CreateTables() error {
332338
return err
333339
}
334340

335-
func bigBuckets(from, through model.Time) []int64 {
341+
func (c *AWSStore) bigBuckets(from, through model.Time) []string {
336342
var (
337343
secondsInHour = int64(time.Hour / time.Second)
338344
fromHour = from.Unix() / secondsInHour
339345
throughHour = through.Unix() / secondsInHour
340-
result []int64
346+
347+
secondsInDay = int64(24 * time.Hour / time.Second)
348+
fromDay = from.Unix() / secondsInDay
349+
throughDay = through.Unix() / secondsInDay
350+
351+
firstDailyBucket = c.dailyBucketsFrom.Unix() / secondsInDay
352+
lastHourlyBucket = firstDailyBucket * 24
353+
354+
result []string
341355
)
356+
342357
for i := fromHour; i <= throughHour; i++ {
343-
result = append(result, i)
358+
if i > lastHourlyBucket {
359+
break
360+
}
361+
result = append(result, strconv.Itoa(int(i)))
344362
}
363+
364+
for i := fromDay; i <= throughDay; i++ {
365+
if i < firstDailyBucket {
366+
continue
367+
}
368+
result = append(result, fmt.Sprintf("d%d", int(i)))
369+
}
370+
345371
return result
346372
}
347373

348374
func chunkName(userID, chunkID string) string {
349375
return fmt.Sprintf("%s/%s", userID, chunkID)
350376
}
351377

352-
func hashValue(userID string, hour int64, metricName model.LabelValue) string {
353-
return fmt.Sprintf("%s:%d:%s", userID, hour, metricName)
378+
func hashValue(userID string, bucket string, metricName model.LabelValue) string {
379+
return fmt.Sprintf("%s:%s:%s", userID, bucket, metricName)
354380
}
355381

356382
func rangeValue(label model.LabelName, value model.LabelValue, chunkID string) []byte {
@@ -456,8 +482,8 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) ([]*dyna
456482
}
457483

458484
entries := 0
459-
for _, hour := range bigBuckets(chunk.From, chunk.Through) {
460-
hashValue := hashValue(userID, hour, metricName)
485+
for _, bucket := range c.bigBuckets(chunk.From, chunk.Through) {
486+
hashValue := hashValue(userID, bucket, metricName)
461487
for label, value := range chunk.Metric {
462488
if label == model.MetricNameLabel {
463489
continue
@@ -542,18 +568,18 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug
542568

543569
incomingChunkSets := make(chan ByID)
544570
incomingErrors := make(chan error)
545-
buckets := bigBuckets(from, through)
571+
buckets := c.bigBuckets(from, through)
546572
totalLookups := int32(0)
547-
for _, hour := range buckets {
548-
go func(hour int64) {
549-
incoming, lookups, err := c.lookupChunksFor(ctx, userID, hour, metricName, matchers)
573+
for _, b := range buckets {
574+
go func(bucket string) {
575+
incoming, lookups, err := c.lookupChunksFor(ctx, userID, bucket, metricName, matchers)
550576
atomic.AddInt32(&totalLookups, lookups)
551577
if err != nil {
552578
incomingErrors <- err
553579
} else {
554580
incomingChunkSets <- incoming
555581
}
556-
}(hour)
582+
}(b)
557583
}
558584

559585
var chunks ByID
@@ -591,17 +617,17 @@ func next(s string) string {
591617
return result
592618
}
593619

594-
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
620+
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
595621
if len(matchers) == 0 {
596-
return c.lookupChunksForMetricName(ctx, userID, hour, metricName)
622+
return c.lookupChunksForMetricName(ctx, userID, bucket, metricName)
597623
}
598624

599625
incomingChunkSets := make(chan ByID)
600626
incomingErrors := make(chan error)
601627

602628
for _, matcher := range matchers {
603629
go func(matcher *metric.LabelMatcher) {
604-
incoming, err := c.lookupChunksForMatcher(ctx, userID, hour, metricName, matcher)
630+
incoming, err := c.lookupChunksForMatcher(ctx, userID, bucket, metricName, matcher)
605631
if err != nil {
606632
incomingErrors <- err
607633
} else {
@@ -623,8 +649,8 @@ func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int6
623649
return nWayIntersect(chunkSets), int32(len(matchers)), lastErr
624650
}
625651

626-
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) {
627-
hashValue := hashValue(userID, hour, metricName)
652+
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, bucket string, metricName model.LabelValue) (ByID, int32, error) {
653+
hashValue := hashValue(userID, bucket, metricName)
628654
input := &dynamodb.QueryInput{
629655
TableName: aws.String(c.tableName),
630656
KeyConditions: map[string]*dynamodb.Condition{
@@ -664,8 +690,8 @@ func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string,
664690
return chunkSet, 1, nil
665691
}
666692

667-
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
668-
hashValue := hashValue(userID, hour, metricName)
693+
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
694+
hashValue := hashValue(userID, bucket, metricName)
669695
var rangeMinValue, rangeMaxValue []byte
670696
if matcher.Type == metric.Equal {
671697
nextValue := model.LabelValue(next(string(matcher.Value)))

chunk/chunk_store_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,44 @@ func diff(want, have interface{}) string {
146146
})
147147
return "\n" + text
148148
}
149+
150+
func TestBigBuckets(t *testing.T) {
151+
scenarios := []struct {
152+
from, through, dailyBucketsFrom model.Time
153+
buckets []string
154+
}{
155+
{
156+
from: model.TimeFromUnix(0),
157+
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
158+
dailyBucketsFrom: model.TimeFromUnix(0).Add(1 * 24 * time.Hour),
159+
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"},
160+
},
161+
{
162+
from: model.TimeFromUnix(0),
163+
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
164+
dailyBucketsFrom: model.TimeFromUnix(0).Add(2*24*time.Hour) - 1, // Only the day matters for the epoch start time, not the time.
165+
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"},
166+
},
167+
{
168+
from: model.TimeFromUnix(0),
169+
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1,
170+
dailyBucketsFrom: model.TimeFromUnix(0).Add(1*24*time.Hour) - 1,
171+
buckets: []string{"0", "d0", "d1", "d2"},
172+
},
173+
{
174+
from: model.TimeFromUnix(0),
175+
through: model.TimeFromUnix(0).Add(2 * 24 * time.Hour),
176+
dailyBucketsFrom: model.TimeFromUnix(0).Add(99 * 24 * time.Hour),
177+
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48"},
178+
},
179+
}
180+
for i, s := range scenarios {
181+
cs := &AWSStore{
182+
dailyBucketsFrom: s.dailyBucketsFrom,
183+
}
184+
buckets := cs.bigBuckets(s.from, s.through)
185+
if !reflect.DeepEqual(buckets, s.buckets) {
186+
t.Fatalf("%d. unexpected buckets; want %v, got %v", i, s.buckets, buckets)
187+
}
188+
}
189+
}

cmd/cortex/main.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"github.com/prometheus/client_golang/prometheus"
2424
"github.com/prometheus/common/log"
25+
"github.com/prometheus/common/model"
2526
"github.com/prometheus/common/route"
2627
"github.com/prometheus/prometheus/promql"
2728
"github.com/prometheus/prometheus/web/api/v1"
@@ -60,22 +61,23 @@ func init() {
6061
}
6162

6263
type cfg struct {
63-
mode string
64-
listenPort int
65-
consulHost string
66-
consulPrefix string
67-
s3URL string
68-
dynamodbURL string
69-
dynamodbCreateTables bool
70-
dynamodbPollInterval time.Duration
71-
memcachedHostname string
72-
memcachedTimeout time.Duration
73-
memcachedExpiration time.Duration
74-
memcachedService string
75-
remoteTimeout time.Duration
76-
numTokens int
77-
logSuccess bool
78-
watchDynamo bool
64+
mode string
65+
listenPort int
66+
consulHost string
67+
consulPrefix string
68+
s3URL string
69+
dynamodbURL string
70+
dynamodbCreateTables bool
71+
dynamodbPollInterval time.Duration
72+
dynamodbDailyBucketsFrom string
73+
memcachedHostname string
74+
memcachedTimeout time.Duration
75+
memcachedExpiration time.Duration
76+
memcachedService string
77+
remoteTimeout time.Duration
78+
numTokens int
79+
logSuccess bool
80+
watchDynamo bool
7981

8082
ingesterConfig ingester.Config
8183
distributorConfig distributor.Config
@@ -95,6 +97,7 @@ func main() {
9597
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.")
9698
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
9799
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
100+
flag.StringVar(&cfg.dynamodbDailyBucketsFrom, "dynamodb.daily-buckets-from", "9999-01-01", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.")
98101
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")
99102

100103
flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
@@ -246,10 +249,15 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) {
246249
Expiration: cfg.memcachedExpiration,
247250
}
248251
}
252+
dailyBucketsFrom, err := time.Parse("2006-01-02", cfg.dynamodbDailyBucketsFrom)
253+
if err != nil {
254+
return nil, fmt.Errorf("error parsing daily buckets begin date: %v", err)
255+
}
249256
chunkStore, err := chunk.NewAWSStore(chunk.StoreConfig{
250-
S3URL: cfg.s3URL,
251-
DynamoDBURL: cfg.dynamodbURL,
252-
ChunkCache: chunkCache,
257+
S3URL: cfg.s3URL,
258+
DynamoDBURL: cfg.dynamodbURL,
259+
ChunkCache: chunkCache,
260+
DailyBucketsFrom: model.TimeFromUnix(dailyBucketsFrom.Unix()),
253261
})
254262
if err != nil {
255263
return nil, err

0 commit comments

Comments
 (0)