-
Notifications
You must be signed in to change notification settings - Fork 816
Store chunks in DynamoDB #418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm - @jml could you take a look as well please?
waiting for some tests as well ^ |
pkg/chunk/storage_client.go
Outdated
@@ -19,8 +19,8 @@ type StorageClient interface { | |||
QueryPages(ctx context.Context, query IndexQuery, callback func(result ReadBatch, lastPage bool) (shouldContinue bool)) error | |||
|
|||
// For storing and retrieving chunks. | |||
PutChunk(ctx context.Context, key string, data []byte) error | |||
GetChunk(ctx context.Context, key string) ([]byte, error) | |||
PutChunks(ctx context.Context, chunks []Chunk, keys []string, data [][]byte) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the constraints on how chunks
, keys
, and data
relate to each other? Can we pick better types (e.g. map[string]Chunk
) to make this less easy to mess up?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The list must be the same length, and the ordering within them must be consistent.
You could probably consider this a micro optimization, as the chunk can generate the key and the buffer. I'll see if I can factor it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have managed to tidy this up.
pkg/chunk/schema.go
Outdated
@@ -49,6 +49,9 @@ type IndexQuery struct { | |||
RangeValuePrefix []byte | |||
RangeValueStart []byte | |||
|
|||
// Used when fetching chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to say why we need it when fetching chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Old code I should have removed, sorry!
pkg/chunk/storage_client.go
Outdated
PutChunk(ctx context.Context, key string, data []byte) error | ||
GetChunk(ctx context.Context, key string) ([]byte, error) | ||
PutChunks(ctx context.Context, chunks []Chunk, keys []string, data [][]byte) error | ||
GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this interface. You already have chunks (chunks []Chunk
) and this returns the same thing. Why are the inputs & outputs the same types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chunks you're passing in are "empty", in that they just describe what to fetch.
I could separate out the parsed chunk ID (ChunkDescriptor
) from the chunk itself, and embed on in another. Then it could take a ChunkDescriptor and return a Chunk. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
f.Int64Var(&cfg.ChunkTableProvisionedWriteThroughput, "dynamodb.chunk-table.write-throughput", 3000, "DynamoDB chunk tables write throughput") | ||
f.Int64Var(&cfg.ChunkTableProvisionedReadThroughput, "dynamodb.chunk-table.read-throughput", 300, "DynamoDB chunk tables read throughput") | ||
f.Int64Var(&cfg.ChunkTableInactiveWriteThroughput, "dynamodb.chunk-table.inactive-write-throughput", 1, "DynamoDB chunk tables write throughput for inactive tables.") | ||
f.Int64Var(&cfg.ChunkTableInactiveReadThroughput, "dynamodb.chunk-table.inactive-read-throughput", 300, "DynamoDB chunk tables read throughput for inactive tables") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't these flags be on PeriodicChunkTableConfig
? Better yet, could these be implemented in such a way that we don't have duplication with the periodic table config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't these flags be on PeriodicChunkTableConfig?
I followed the pattern we used for PeriodicTableConfig, where only the flags that need to be shared are actually put in the shared struct.
Better yet, could these be implemented in such a way that we don't have duplication with the periodic table config?
Eventually I want Cortex to self-tune its provisioned throughput, but for now the chunk table will need different levels as other tables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough.
dynamoDBChunks, err = a.getDynamoDBChunks(ctx, dynamoDBChunks) | ||
if err != nil { | ||
return nil, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason to do these sequentially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't expect us to be doing both simultaneously except for when we migrate, when it will only occur for a couple or hours. So I didn't think it was worth the extra code to parallelise. Will add comment to this effect.
pkg/chunk/aws_storage_client.go
Outdated
// All other errors are fatal. | ||
if err != nil { | ||
return result, err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't it make more sense to push these to immediately after we get the error, just after we do the "record dynamodb error" bit? As it is, it's disconnected from that logic, and it's not clear whether it matters that this happens after turning the responses into chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, yeah actually logic is a little more subtle. Have restructured.
pkg/chunk/aws_storage_client.go
Outdated
func (b dynamoDBReadBatch) Len() int { | ||
return len(b) | ||
// Fill 'to' with WriteRequests from 'from' until 'to' has at most max requests. Remove those requests from 'from'. | ||
func (b dynamoDBWriteBatch) takeReqs(from dynamoDBWriteBatch, max int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to
isn't defined.
Also, why is this private when Len
and Add
are public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made them all private public.
pkg/chunk/aws_storage_client.go
Outdated
} | ||
|
||
// Fill 'to' with WriteRequests from 'from' until 'to' has at most max requests. Remove those requests from 'from'. | ||
func (b dynamoDBReadRequest) takeReqs(from dynamoDBReadRequest, max int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto comment on writerequest
from[tableName].Keys = fromReqs.Keys[taken:] | ||
toFill -= taken | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you'll have fewer bugs if you make this (and equivalent write version) immutable. i.e. have it return a new request populated from from
with max
or more, without changing b
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The challenge is this needs to be done twice, first picking request from one list, then for another.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I see.
take maxRequests (mappend from1 from2)
I think it's probably OK as-is, but if you wanted to pursue immutability, you could make max
the first parameter and accept a variadic number of requests.
} | ||
return chunkValue.B |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as takeReqs
on dynamoDBWriteBatch
, right? Is there a way to avoid this duplication, perhaps using an interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. I tried, will have another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nah, can't find a nice way to unify these two. Open to suggestions though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're going to need a bigger type system.
@jml thank you for high quality feedback! I think I'v addressed most of it, but there are few open questions. PTAL? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably the last round. Haven't looked at the tests, sorry.
pkg/chunk/aws_storage_client.go
Outdated
} | ||
|
||
if err != nil { | ||
for tableName := range outstanding { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be requests
? It's entirely possible that:
a) not all the tables in outstanding
got sent to dynamodb
b) some of the tables in unprocessed
did
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch.
pkg/chunk/aws_storage_client.go
Outdated
|
||
// If we get provisionedThroughputExceededException, then no items were processed, | ||
// so back off and retry all. | ||
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see how this code is reachable. err
is last set by processChunkResponse
, which doesn't do AWS stuff.
I think it makes more sense to put it before processChunkResponse
, since according to the comment, no items were processed anyway and we're just going to retry them all.
At that point, it might as well go inside the if err != nil
block that's starting on what's currently line 463.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Comment + code change makes the logic easier to follow too.
from[tableName].Keys = fromReqs.Keys[taken:] | ||
toFill -= taken | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I see.
take maxRequests (mappend from1 from2)
I think it's probably OK as-is, but if you wanted to pursue immutability, you could make max
the first parameter and accept a variadic number of requests.
Right, but then you'd also need to have it return the new, immutable input maps too. Lets face it, its just not pretty in go. |
pkg/chunk/aws_storage_client.go
Outdated
@@ -427,14 +427,18 @@ func (a awsStorageClient) getS3Chunk(ctx context.Context, chunk Chunk) (Chunk, e | |||
return chunk, nil | |||
} | |||
|
|||
// As we're resuing the DynamoDB schema from the index for the chunk tables, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re-using
Part #141