-
Notifications
You must be signed in to change notification settings - Fork 816
Use weekly tables in the chunk store #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1c4589c
to
307dae6
Compare
dbad6ec
to
c03660c
Compare
- Make ASWSStore.bigBuckets emit tuples of (table name, bucket name), and plumb this through the write path. - New cortex_table_manager binary/image, which creates / updates the DynamoDB tables. - Make the table manager responsible for exporting capacities - Add unit tests and instrument the table manager - Don't remove provisioned write capacity until after max chunk age.
8953bb9
to
f4a338f
Compare
- fast start the table manage ticker - minimum write throughput is 1, not 0 - simplify the calculateExpectedTables functions, using int64 seconds (instead of time.Time) everywhere
f4a338f
to
417edc4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! This looks really solid. I've made enough comments that it's probably worth a second quick round of review.
UsePeriodicTables bool | ||
TablePrefix string | ||
TablePeriod time.Duration | ||
PeriodicTableStartAt time.Time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment for this block of configuration. IIUC, the rest of these aren't used if UsePeriodicTables
is false—is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. 👍
@@ -475,15 +407,15 @@ func (c *AWSStore) updateIndex(ctx context.Context, userID string, chunks []Chun | |||
return err | |||
} | |||
|
|||
return c.batchWriteDynamo(ctx, c.tableName, writeReqs) | |||
return c.batchWriteDynamo(ctx, writeReqs) | |||
} | |||
|
|||
// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all | |||
// the chunks it is given. | |||
// | |||
// Creates one WriteRequest per bucket per metric per chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
per table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessarily - most of the time, buckets will live in the same table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see, it's "grouped by table".
} | ||
|
||
// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all | ||
// the chunks it is given. | ||
// | ||
// Creates one WriteRequest per bucket per metric per chunk. | ||
func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) ([]*dynamodb.WriteRequest, error) { | ||
writeReqs := []*dynamodb.WriteRequest{} | ||
func (c *AWSStore) calculateDynamoWrites(userID string, chunks []Chunk) (map[string][]*dynamodb.WriteRequest, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reckon adding a type alias for type tableName = string
and type bucketName string
and then using those in signatures like these would make the interactions between these methods easier to follow. Not strongly enough to insist on it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about just plumbing the whole bucketSpec
through... WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this better, thanks.
} | ||
buckets := cs.bigBuckets(s.from, s.through) | ||
if !reflect.DeepEqual(buckets, s.buckets) { | ||
if !reflect.DeepEqual(buckets, expected) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably add similar tests for when we're using the other bucketing strategy.
} | ||
if err := tableManager.syncTables(context.Background()); err != nil { | ||
t.Fatal(err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract this repeated block to a helper function that makes a new manager and syncs tables, doing t.Fatal
if it doesn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
for i := firstTable; i <= lastTable; i++ { | ||
table := tableDescription{ | ||
// Name construction needs to be consistent with chunk_store.bigBuckets |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this comment.
It's comments like these though that make me think we really should have a bucketing strategy interface with two distinct implementations. That way, all of the code that's required to be consistent between the manager and the chuck store can live in the same place—increasing the odds that it stays consistent. It also means less cyclomatic complexity in the implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would that interface look like? Note here we need something different to in chunk_store: we want all tables from the first to now. In chunk store we want the table for a given time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure, and getting sure would probably require doing the refactoring myself.
}); err != nil { | ||
return nil, nil, err | ||
} | ||
sort.Strings(existingTables) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please extract this to a method that lists existing tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
for ; i < len(descriptions); i++ { | ||
toCreate = append(toCreate, descriptions[i]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please extract this to a function / method that takes a list of existing tables and a list of descriptions, and returns the toCreate & toCheckThroughput lists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we've broken out the listTables function, this is basically all thats left. Still want me to do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do (it's easier to test & re-use that way!) but I can understand why you don't want to.
i++ | ||
j++ | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Took me a while to figure out that this is essentially calculating the set difference. Not sure much can be done about that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:pike:!!!
// existingTables[j].name isn't in descriptions, can ignore | ||
j++ | ||
} else { | ||
// Table existis, need to check it has correct throughput |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: exists
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Still some comments but at your discretion.
ProvisionedReadThroughput int64 | ||
|
||
// Not exported as only used by tests to inject mocks | ||
dynamodb dynamodbClient |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MakeDynamoDbClient
function can still be defined here. That'd keep main.go fairly small.
Given that @jml has done a deeper review, I've just given this a 30-minute read to get an impression of how it all works. It looks overall good to me. The complexity that both the code and the system are reaching now scares me, but I don't have a simpler alternative to offer either. So a rough 👍 - and crazy that you can produce so much code that fast :) |
dynamodbURL := flag.String("dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.") | ||
flag.StringVar(&cfg.TablePrefix, "dynamodb.periodic-table.prefix", "cortex_", "DynamoDB table prefix for the periodic tables.") | ||
flag.DurationVar(&cfg.TablePeriod, "dynamodb.periodic-table.period", 7*24*time.Hour, "DynamoDB periodic tables period.") | ||
flag.DurationVar(&cfg.CreationGracePeriod, "dynamodb.periodic-table.grace-period", 10*time.Minute, "DynamoDB periodic tables grace period (duration which table will be created/delete before/after its needed).") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its -> it's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete -> deleted
Part of #158
Introduces a new job, the table manager, which periodically calculates the required tables (and provisioned throughputs) and then creates / updates the existing tables as appropriate.
Also update the chunk store to calculate the tables a given bucket should be written to.
TODO:
bigBuckets
to also pick a table for the bucket