6
6
"math/rand"
7
7
"net/url"
8
8
"sort"
9
+ "strconv"
9
10
"strings"
10
11
"sync"
11
12
"sync/atomic"
@@ -139,9 +140,10 @@ type Store interface {
139
140
140
141
// StoreConfig specifies config for a ChunkStore
141
142
type StoreConfig struct {
142
- S3URL string
143
- DynamoDBURL string
144
- ChunkCache * Cache
143
+ S3URL string
144
+ DynamoDBURL string
145
+ ChunkCache * Cache
146
+ DailyBucketsFrom model.Time
145
147
146
148
// Not exported as only used by tests to inject mocks
147
149
dynamodb dynamodbClient
@@ -206,6 +208,10 @@ type AWSStore struct {
206
208
tableName string
207
209
bucketName string
208
210
211
+ // Around which time to start bucketing indexes by day instead of by hour.
212
+ // Only the day matters, not the time within the day.
213
+ dailyBucketsFrom model.Time
214
+
209
215
dynamoRequests chan dynamoOp
210
216
dynamoRequestsDone sync.WaitGroup
211
217
}
@@ -332,25 +338,45 @@ func (c *AWSStore) CreateTables() error {
332
338
return err
333
339
}
334
340
335
- func bigBuckets (from , through model.Time ) []int64 {
341
+ func bigBuckets (from , through model.Time , dailyBucketsFrom model. Time ) []string {
336
342
var (
337
343
secondsInHour = int64 (time .Hour / time .Second )
338
344
fromHour = from .Unix () / secondsInHour
339
345
throughHour = through .Unix () / secondsInHour
340
- result []int64
346
+
347
+ secondsInDay = int64 (24 * time .Hour / time .Second )
348
+ fromDay = from .Unix () / secondsInDay
349
+ throughDay = through .Unix () / secondsInDay
350
+
351
+ firstDailyBucket = dailyBucketsFrom .Unix () / secondsInDay
352
+ lastHourlyBucket = firstDailyBucket * 24
353
+
354
+ result []string
341
355
)
356
+
342
357
for i := fromHour ; i <= throughHour ; i ++ {
343
- result = append (result , i )
358
+ if i > lastHourlyBucket {
359
+ break
360
+ }
361
+ result = append (result , strconv .Itoa (int (i )))
344
362
}
363
+
364
+ for i := fromDay ; i <= throughDay ; i ++ {
365
+ if i < firstDailyBucket {
366
+ continue
367
+ }
368
+ result = append (result , fmt .Sprintf ("d%d" , int (i )))
369
+ }
370
+
345
371
return result
346
372
}
347
373
348
374
func chunkName (userID , chunkID string ) string {
349
375
return fmt .Sprintf ("%s/%s" , userID , chunkID )
350
376
}
351
377
352
- func hashValue (userID string , hour int64 , metricName model.LabelValue ) string {
353
- return fmt .Sprintf ("%s:%d :%s" , userID , hour , metricName )
378
+ func hashValue (userID string , bucket string , metricName model.LabelValue ) string {
379
+ return fmt .Sprintf ("%s:%s :%s" , userID , bucket , metricName )
354
380
}
355
381
356
382
func rangeValue (label model.LabelName , value model.LabelValue , chunkID string ) []byte {
@@ -456,8 +482,8 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) ([]*dyna
456
482
}
457
483
458
484
entries := 0
459
- for _ , hour := range bigBuckets (chunk .From , chunk .Through ) {
460
- hashValue := hashValue (userID , hour , metricName )
485
+ for _ , bucket := range bigBuckets (chunk .From , chunk .Through , c . dailyBucketsFrom ) {
486
+ hashValue := hashValue (userID , bucket , metricName )
461
487
for label , value := range chunk .Metric {
462
488
if label == model .MetricNameLabel {
463
489
continue
@@ -556,18 +582,18 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug
556
582
557
583
incomingChunkSets := make (chan ByID )
558
584
incomingErrors := make (chan error )
559
- buckets := bigBuckets (from , through )
585
+ buckets := bigBuckets (from , through , c . dailyBucketsFrom )
560
586
totalLookups := int32 (0 )
561
- for _ , hour := range buckets {
562
- go func (hour int64 ) {
563
- incoming , lookups , err := c .lookupChunksFor (ctx , userID , hour , metricName , matchers )
587
+ for _ , b := range buckets {
588
+ go func (bucket string ) {
589
+ incoming , lookups , err := c .lookupChunksFor (ctx , userID , bucket , metricName , matchers )
564
590
atomic .AddInt32 (& totalLookups , lookups )
565
591
if err != nil {
566
592
incomingErrors <- err
567
593
} else {
568
594
incomingChunkSets <- incoming
569
595
}
570
- }(hour )
596
+ }(b )
571
597
}
572
598
573
599
var chunks ByID
@@ -592,17 +618,17 @@ func next(s string) string {
592
618
return result
593
619
}
594
620
595
- func (c * AWSStore ) lookupChunksFor (ctx context.Context , userID string , hour int64 , metricName model.LabelValue , matchers []* metric.LabelMatcher ) (ByID , int32 , error ) {
621
+ func (c * AWSStore ) lookupChunksFor (ctx context.Context , userID string , bucket string , metricName model.LabelValue , matchers []* metric.LabelMatcher ) (ByID , int32 , error ) {
596
622
if len (matchers ) == 0 {
597
- return c .lookupChunksForMetricName (ctx , userID , hour , metricName )
623
+ return c .lookupChunksForMetricName (ctx , userID , bucket , metricName )
598
624
}
599
625
600
626
incomingChunkSets := make (chan ByID )
601
627
incomingErrors := make (chan error )
602
628
603
629
for _ , matcher := range matchers {
604
630
go func (matcher * metric.LabelMatcher ) {
605
- incoming , err := c .lookupChunksForMatcher (ctx , userID , hour , metricName , matcher )
631
+ incoming , err := c .lookupChunksForMatcher (ctx , userID , bucket , metricName , matcher )
606
632
if err != nil {
607
633
incomingErrors <- err
608
634
} else {
@@ -624,8 +650,8 @@ func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int6
624
650
return nWayIntersect (chunkSets ), int32 (len (matchers )), lastErr
625
651
}
626
652
627
- func (c * AWSStore ) lookupChunksForMetricName (ctx context.Context , userID string , hour int64 , metricName model.LabelValue ) (ByID , int32 , error ) {
628
- hashValue := hashValue (userID , hour , metricName )
653
+ func (c * AWSStore ) lookupChunksForMetricName (ctx context.Context , userID string , bucket string , metricName model.LabelValue ) (ByID , int32 , error ) {
654
+ hashValue := hashValue (userID , bucket , metricName )
629
655
input := & dynamodb.QueryInput {
630
656
TableName : aws .String (c .tableName ),
631
657
KeyConditions : map [string ]* dynamodb.Condition {
@@ -665,8 +691,8 @@ func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string,
665
691
return chunkSet , 1 , nil
666
692
}
667
693
668
- func (c * AWSStore ) lookupChunksForMatcher (ctx context.Context , userID string , hour int64 , metricName model.LabelValue , matcher * metric.LabelMatcher ) (ByID , error ) {
669
- hashValue := hashValue (userID , hour , metricName )
694
+ func (c * AWSStore ) lookupChunksForMatcher (ctx context.Context , userID string , bucket string , metricName model.LabelValue , matcher * metric.LabelMatcher ) (ByID , error ) {
695
+ hashValue := hashValue (userID , bucket , metricName )
670
696
var rangeMinValue , rangeMaxValue []byte
671
697
if matcher .Type == metric .Equal {
672
698
nextValue := model .LabelValue (next (string (matcher .Value )))
0 commit comments