-
Notifications
You must be signed in to change notification settings - Fork 816
Daily buckets #180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Daily buckets #180
Changes from all commits
6829e1b
bb27555
299f260
fa36d2b
15eb5b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"math/rand" | ||
"net/url" | ||
"sort" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
|
@@ -50,6 +51,9 @@ const ( | |
dynamoMaxBatchSize = 25 | ||
|
||
provisionedThroughputExceededException = "ProvisionedThroughputExceededException" | ||
|
||
secondsInHour = int64(time.Hour / time.Second) | ||
secondsInDay = int64(24 * time.Hour / time.Second) | ||
) | ||
|
||
var ( | ||
|
@@ -139,9 +143,10 @@ type Store interface { | |
|
||
// StoreConfig specifies config for a ChunkStore | ||
type StoreConfig struct { | ||
S3URL string | ||
DynamoDBURL string | ||
ChunkCache *Cache | ||
S3URL string | ||
DynamoDBURL string | ||
ChunkCache *Cache | ||
DailyBucketsFrom model.Time | ||
|
||
// Not exported as only used by tests to inject mocks | ||
dynamodb dynamodbClient | ||
|
@@ -206,6 +211,10 @@ type AWSStore struct { | |
tableName string | ||
bucketName string | ||
|
||
// After midnight on this day, we start bucketing indexes by day instead of by | ||
// hour. Only the day matters, not the time within the day. | ||
dailyBucketsFrom model.Time | ||
|
||
dynamoRequests chan dynamoOp | ||
dynamoRequestsDone sync.WaitGroup | ||
} | ||
|
@@ -245,11 +254,12 @@ func NewAWSStore(cfg StoreConfig) (*AWSStore, error) { | |
} | ||
|
||
store := &AWSStore{ | ||
dynamodb: dynamodbClient, | ||
s3: s3Client, | ||
chunkCache: cfg.ChunkCache, | ||
tableName: tableName, | ||
bucketName: bucketName, | ||
dynamodb: dynamodbClient, | ||
s3: s3Client, | ||
chunkCache: cfg.ChunkCache, | ||
tableName: tableName, | ||
bucketName: bucketName, | ||
dailyBucketsFrom: cfg.DailyBucketsFrom, | ||
|
||
dynamoRequests: make(chan dynamoOp), | ||
} | ||
|
@@ -332,25 +342,50 @@ func (c *AWSStore) CreateTables() error { | |
return err | ||
} | ||
|
||
func bigBuckets(from, through model.Time) []int64 { | ||
// bigBuckets generates the list of "big buckets" for a given time range. | ||
// These buckets are used in the hash key of the inverted index, and need to | ||
// be deterministic for both reads and writes. | ||
// | ||
// This function deals with any changes from one bucketing scheme to another - | ||
// for instance, it know the date at which to migrate from hourly buckets to | ||
// to weekly buckets. | ||
func (c *AWSStore) bigBuckets(from, through model.Time) []string { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I reckon this function / method could use a comment saying what it does. Especially, it should give examples of the expected bucket strings. Or point to the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
var ( | ||
secondsInHour = int64(time.Hour / time.Second) | ||
fromHour = from.Unix() / secondsInHour | ||
throughHour = through.Unix() / secondsInHour | ||
result []int64 | ||
fromHour = from.Unix() / secondsInHour | ||
throughHour = through.Unix() / secondsInHour | ||
|
||
fromDay = from.Unix() / secondsInDay | ||
throughDay = through.Unix() / secondsInDay | ||
|
||
firstDailyBucket = c.dailyBucketsFrom.Unix() / secondsInDay | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be a personal taste thing, but I'd just make There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
lastHourlyBucket = firstDailyBucket * 24 | ||
|
||
result []string | ||
) | ||
|
||
for i := fromHour; i <= throughHour; i++ { | ||
result = append(result, i) | ||
if i > lastHourlyBucket { | ||
break | ||
} | ||
result = append(result, strconv.Itoa(int(i))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be prefixed with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backwards compatibility prevents us doing that. |
||
} | ||
|
||
for i := fromDay; i <= throughDay; i++ { | ||
if i < firstDailyBucket { | ||
continue | ||
} | ||
result = append(result, fmt.Sprintf("d%d", int(i))) | ||
} | ||
|
||
return result | ||
} | ||
|
||
func chunkName(userID, chunkID string) string { | ||
return fmt.Sprintf("%s/%s", userID, chunkID) | ||
} | ||
|
||
func hashValue(userID string, hour int64, metricName model.LabelValue) string { | ||
return fmt.Sprintf("%s:%d:%s", userID, hour, metricName) | ||
func hashValue(userID string, bucket string, metricName model.LabelValue) string { | ||
return fmt.Sprintf("%s:%s:%s", userID, bucket, metricName) | ||
} | ||
|
||
func rangeValue(label model.LabelName, value model.LabelValue, chunkID string) []byte { | ||
|
@@ -456,8 +491,8 @@ func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) ([]*dyna | |
} | ||
|
||
entries := 0 | ||
for _, hour := range bigBuckets(chunk.From, chunk.Through) { | ||
hashValue := hashValue(userID, hour, metricName) | ||
for _, bucket := range c.bigBuckets(chunk.From, chunk.Through) { | ||
hashValue := hashValue(userID, bucket, metricName) | ||
for label, value := range chunk.Metric { | ||
if label == model.MetricNameLabel { | ||
continue | ||
|
@@ -542,18 +577,18 @@ func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, throug | |
|
||
incomingChunkSets := make(chan ByID) | ||
incomingErrors := make(chan error) | ||
buckets := bigBuckets(from, through) | ||
buckets := c.bigBuckets(from, through) | ||
totalLookups := int32(0) | ||
for _, hour := range buckets { | ||
go func(hour int64) { | ||
incoming, lookups, err := c.lookupChunksFor(ctx, userID, hour, metricName, matchers) | ||
for _, b := range buckets { | ||
go func(bucket string) { | ||
incoming, lookups, err := c.lookupChunksFor(ctx, userID, bucket, metricName, matchers) | ||
atomic.AddInt32(&totalLookups, lookups) | ||
if err != nil { | ||
incomingErrors <- err | ||
} else { | ||
incomingChunkSets <- incoming | ||
} | ||
}(hour) | ||
}(b) | ||
} | ||
|
||
var chunks ByID | ||
|
@@ -591,17 +626,17 @@ func next(s string) string { | |
return result | ||
} | ||
|
||
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) { | ||
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) { | ||
if len(matchers) == 0 { | ||
return c.lookupChunksForMetricName(ctx, userID, hour, metricName) | ||
return c.lookupChunksForMetricName(ctx, userID, bucket, metricName) | ||
} | ||
|
||
incomingChunkSets := make(chan ByID) | ||
incomingErrors := make(chan error) | ||
|
||
for _, matcher := range matchers { | ||
go func(matcher *metric.LabelMatcher) { | ||
incoming, err := c.lookupChunksForMatcher(ctx, userID, hour, metricName, matcher) | ||
incoming, err := c.lookupChunksForMatcher(ctx, userID, bucket, metricName, matcher) | ||
if err != nil { | ||
incomingErrors <- err | ||
} else { | ||
|
@@ -623,8 +658,8 @@ func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int6 | |
return nWayIntersect(chunkSets), int32(len(matchers)), lastErr | ||
} | ||
|
||
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) { | ||
hashValue := hashValue(userID, hour, metricName) | ||
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, bucket string, metricName model.LabelValue) (ByID, int32, error) { | ||
hashValue := hashValue(userID, bucket, metricName) | ||
input := &dynamodb.QueryInput{ | ||
TableName: aws.String(c.tableName), | ||
KeyConditions: map[string]*dynamodb.Condition{ | ||
|
@@ -664,8 +699,8 @@ func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, | |
return chunkSet, 1, nil | ||
} | ||
|
||
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) { | ||
hashValue := hashValue(userID, hour, metricName) | ||
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bucket string, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) { | ||
hashValue := hashValue(userID, bucket, metricName) | ||
var rangeMinValue, rangeMaxValue []byte | ||
if matcher.Type == metric.Equal { | ||
nextValue := model.LabelValue(next(string(matcher.Value))) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -146,3 +146,59 @@ func diff(want, have interface{}) string { | |
}) | ||
return "\n" + text | ||
} | ||
|
||
func TestBigBuckets(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Really glad you wrote tests! I found them a bit hard to understand their purpose at first, so I'm going to suggest some commentary inside. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
scenarios := []struct { | ||
from, through, dailyBucketsFrom model.Time | ||
buckets []string | ||
}{ | ||
// Buckets are by hour until we reach the `dailyBucketsFrom`, after which they are by day. | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest that above this // Buckets are by hour until we reach the `dailyBucketsFrom`, after which they are by day. |
||
from: model.TimeFromUnix(0), | ||
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1, | ||
dailyBucketsFrom: model.TimeFromUnix(0).Add(1 * 24 * time.Hour), | ||
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"}, | ||
}, | ||
|
||
// Only the day part of `dailyBucketsFrom` matters, not the time part. | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. // Only the day part of `dailyBucketsFrom` matters, not the time part. |
||
from: model.TimeFromUnix(0), | ||
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1, | ||
dailyBucketsFrom: model.TimeFromUnix(0).Add(2*24*time.Hour) - 1, | ||
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "d1", "d2"}, | ||
}, | ||
|
||
// Moving dailyBucketsFrom to the previous day compared to the above makes 24 1-hour buckets disappear. | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this example well enough to suggest documentation. It looks like there's an open/closed interval thing going on that I'm not getting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add a comment describing the behaviour this is testing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we'll have to wait for @juliusv next week There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I guess I just wanted to show that 24 1-hour buckets disappear when you make the "// Moving But yeah, the incremental value of this test case is perhaps debatable :) |
||
from: model.TimeFromUnix(0), | ||
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1, | ||
dailyBucketsFrom: model.TimeFromUnix(0).Add(1*24*time.Hour) - 1, | ||
buckets: []string{"0", "d0", "d1", "d2"}, | ||
}, | ||
|
||
// If `dailyBucketsFrom` is after the interval, everything will be bucketed by hour. | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. // If `dailyBucketsFrom` is after the interval, everything will be bucketed by hour. |
||
from: model.TimeFromUnix(0), | ||
through: model.TimeFromUnix(0).Add(2 * 24 * time.Hour), | ||
dailyBucketsFrom: model.TimeFromUnix(0).Add(99 * 24 * time.Hour), | ||
buckets: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29", "30", "31", "32", "33", "34", "35", "36", "37", "38", "39", "40", "41", "42", "43", "44", "45", "46", "47", "48"}, | ||
}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add another case that checks when There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
|
||
// Should only return daily buckets when dailyBucketsFrom is before the interval. | ||
{ | ||
from: model.TimeFromUnix(0).Add(1 * 24 * time.Hour), | ||
through: model.TimeFromUnix(0).Add(3*24*time.Hour) - 1, | ||
dailyBucketsFrom: model.TimeFromUnix(0), | ||
buckets: []string{"d1", "d2"}, | ||
}, | ||
} | ||
for i, s := range scenarios { | ||
cs := &AWSStore{ | ||
dailyBucketsFrom: s.dailyBucketsFrom, | ||
} | ||
buckets := cs.bigBuckets(s.from, s.through) | ||
if !reflect.DeepEqual(buckets, s.buckets) { | ||
t.Fatalf("%d. unexpected buckets; want %v, got %v", i, s.buckets, buckets) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import ( | |
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/common/log" | ||
"github.com/prometheus/common/model" | ||
"github.com/prometheus/common/route" | ||
"github.com/prometheus/prometheus/promql" | ||
"github.com/prometheus/prometheus/web/api/v1" | ||
|
@@ -60,22 +61,23 @@ func init() { | |
} | ||
|
||
type cfg struct { | ||
mode string | ||
listenPort int | ||
consulHost string | ||
consulPrefix string | ||
s3URL string | ||
dynamodbURL string | ||
dynamodbCreateTables bool | ||
dynamodbPollInterval time.Duration | ||
memcachedHostname string | ||
memcachedTimeout time.Duration | ||
memcachedExpiration time.Duration | ||
memcachedService string | ||
remoteTimeout time.Duration | ||
numTokens int | ||
logSuccess bool | ||
watchDynamo bool | ||
mode string | ||
listenPort int | ||
consulHost string | ||
consulPrefix string | ||
s3URL string | ||
dynamodbURL string | ||
dynamodbCreateTables bool | ||
dynamodbPollInterval time.Duration | ||
dynamodbDailyBucketsFrom string | ||
memcachedHostname string | ||
memcachedTimeout time.Duration | ||
memcachedExpiration time.Duration | ||
memcachedService string | ||
remoteTimeout time.Duration | ||
numTokens int | ||
logSuccess bool | ||
watchDynamo bool | ||
|
||
ingesterConfig ingester.Config | ||
distributorConfig distributor.Config | ||
|
@@ -95,6 +97,7 @@ func main() { | |
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.") | ||
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") | ||
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.") | ||
flag.StringVar(&cfg.dynamodbDailyBucketsFrom, "dynamodb.daily-buckets-from", "9999-01-01", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.") | ||
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.") | ||
|
||
flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") | ||
|
@@ -246,10 +249,15 @@ func setupChunkStore(cfg cfg) (chunk.Store, error) { | |
Expiration: cfg.memcachedExpiration, | ||
} | ||
} | ||
dailyBucketsFrom, err := time.Parse("2006-01-02", cfg.dynamodbDailyBucketsFrom) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats not how go date parsing works - https://golang.org/pkg/time/#Parse
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh wow, OK. That's ... interesting. |
||
if err != nil { | ||
return nil, fmt.Errorf("error parsing daily buckets begin date: %v", err) | ||
} | ||
chunkStore, err := chunk.NewAWSStore(chunk.StoreConfig{ | ||
S3URL: cfg.s3URL, | ||
DynamoDBURL: cfg.dynamodbURL, | ||
ChunkCache: chunkCache, | ||
S3URL: cfg.s3URL, | ||
DynamoDBURL: cfg.dynamodbURL, | ||
ChunkCache: chunkCache, | ||
DailyBucketsFrom: model.TimeFromUnix(dailyBucketsFrom.Unix()), | ||
}) | ||
if err != nil { | ||
return nil, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we model this with a type that only specifies day?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make it an int, but this just pushes the problem into main.go. I think I prefer it like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood. I had to dig into the model code to make sense of that. It's just an alias to a number of milliseconds, rather than, say, a struct. I guess we could make our own type alias,
Day
, to make it clearer, but not worth it IMO.