-
Notifications
You must be signed in to change notification settings - Fork 816
Index series, not chunks #875
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
2572bf4
to
bcd0443
Compare
Got this deployed in our dev env and it seems to work. Hard to draw any conclusions as there aren't any taxing queries, but there seems to be a drop in bigtable latency in line with a reduction the amount of data we're reading. |
Running in prod for a few days now and seen the worse queries go from minutes to seconds, in line with a 6-10x latency improvement. Once we rotate out the index tables, will report on their size. |
The was a bug in the way we handled chunk spanning the transition; fixed now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few thoughts.
cmd/ingester/main.go
Outdated
@@ -70,7 +70,7 @@ func main() { | |||
os.Exit(1) | |||
} | |||
|
|||
chunkStore, err := chunk.NewStore(chunkStoreConfig, schemaConfig, storageClient) | |||
chunkStore, err := chunk.NewCompositeStore(chunkStoreConfig, schemaConfig, storageClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ISTM that you could have left the exported name as NewStore() to minimise impact elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, done.
pkg/chunk/chunk_store.go
Outdated
@@ -93,22 +93,34 @@ func (c *store) Stop() { | |||
|
|||
// Put implements ChunkStore | |||
func (c *store) Put(ctx context.Context, chunks []Chunk) error { | |||
for _, chunk := range chunks { | |||
if err := c.PutOne(ctx, chunk.From, chunk.Through, chunk); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like rather a large change in performance characteristics, if you have a number of chunks to write.
pkg/chunk/chunk_store_test.go
Outdated
"github.com/weaveworks/cortex/pkg/util/extract" | ||
"golang.org/x/net/context" | ||
|
||
"github.com/weaveworks/common/test" | ||
"github.com/weaveworks/common/user" | ||
) | ||
|
||
var schemas = []struct { | ||
name string | ||
fn func(cfg SchemaConfig) Schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting storeFn
to come in this commit, even though they would all be the same at this point.
Seems to me that this schema should also allow (or lay the groundwork for) successful queries without invariant metric names... am I thinking on the right path there? |
420ed55
to
01c790c
Compare
It could, but still doesn't avoid the problem of hotspotting the "name" row. Although it would reduce the load on that row by 12x, so might be doable at this point. |
4c6a07b
to
5c2e17d
Compare
Yes, it potentially is. I don't think it will effect normal operation, as I think we only flush one chunk at once normally - we've been running this for a week or so, haven't notices anything. But may effect shutdown flushes. I think the correct solution is to potentially split out the |
How many do we have? I see the chunk caches (which are fundamentally different things), a vendored prometheus treecache (something to do with zookeeper) and some caches in k8s/client-go. I guess the grpc connection pool is a cache, but thats a different beast too. We also cache a single result in the chunk iterator, but thats just a single result. I'm not actually aware of any other caches (like this) in use in the codebase. We could use github.com/bluele/gcache, but I was never a fan of that library. OTOH, not clear using a heap for evictions if the best idea; we could thread a link list through the entries and reorder that to get LRU. |
I've tidied up this PR so it should be more reviewable. I see two things left todo: figure out what do to with chunks on store boundaries (try and bring back the batching) and accurately record cardinality or rows for multi-day queries. Let me know if you get a chance to take a look @bboreham @cboggs @gouthamve. |
By "we" I meant the Go community. I've used |
If the flush queue gets very large (and we've had it in the millions many times) this assumption breaks down.
I wrote some thoughts at #684 (comment) |
c.cache.Stop() | ||
} | ||
|
||
// Put implements ChunkStore | ||
func (c *Store) Put(ctx context.Context, chunks []Chunk) error { | ||
func (c *store) Put(ctx context.Context, chunks []Chunk) error { | ||
for _, chunk := range chunks { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make more sense to PutChunks
then calculate all the index entries and write them at once? Might alleviate the performance characteristic changes Bryan commented about
pkg/chunk/schema_config.go
Outdated
@@ -60,6 +60,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { | |||
f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.") | |||
f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema (Deprecated).") | |||
f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema (Deprecated).") | |||
f.Var(&cfg.V9SchemaFrom, "dynamodb.v9-schema-from", "The data (in the format YYYY-MM-DD) after which we enable v9 schema (Series indexing).") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, data should be date
pkg/chunk/series_store.go
Outdated
level.Debug(log).Log("Chunk IDs", len(chunkIDs)) | ||
|
||
// Protect ourselves against OOMing. | ||
if len(chunkIDs) > c.cfg.QueryChunkLimit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make more sense to do this after filtering out the chunks by time?
I also have some concerns about putting one chunk vs many chunks, and added a comment with a possible idea. I like the idea of splitting |
Promote the composite schema abstraction to "composite chunk store" - a chunk store which delegates to different chunk stores based on time. This allows us to vary the store implementation over time, and not just the schema. This will unblock the new bigtable storage adapter (using columns instead of rows), and allow us to more easily implement the iterative intersections and indexing of series instead of chunks. Corner case when a writing chunks which span multiple stores: they are written to both stores, and instead of using the chunk start/end we use the schema start/end. This will lead to duplication of the index entries on schema migrations, but is actually already the case for day boundaries anyway. It will lead to duplicate writes of the chunk on schema migrations - they should be deduped by the underlying store. Signed-off-by: Tom Wilkie <[email protected]>
We should index the series, not chunks; this will reduce the number of entries in the index by `replication factor * (chunk size / bucket size)`, or 3 * 6hrs / 24hrs - ie 12x. This will however mean we need another index from series to chunks, introducing 1 extra write an N extra reads per query. Expectation is a reduction in query latency (and bigtable query usage, and memory cost) by 12x, and then an increase by 2x as we have to do a bunch of queries. This change introduces the seriesStore, a new chunk store implementation that, combined with the v9 schema, indexes series not chunks. I tried to adapt the original chunk store to support this style of indexing - easy on the write path, but the read path became even more of a rats nest. So I factored out the common bits as best I could and made a new chunk store. Signed-off-by: Tom Wilkie <[email protected]> Tidy up some of the logging. Signed-off-by: Tom Wilkie <[email protected]>
I've looked at go-cache, it still uses a background goroutine to periodically expunge entries from the cache. AFAICT this means the cache can grow without bounds between these periods, and it also locks the entire cache to do this. The heap cache isn't ideal, I'm going to update it to use a simple FIFO list for evictions, but I think its better that go-cache. |
fifoCache is a simple string -> interface{} cache which uses a fifo to manage evictions. O(1) inserts, updates and gets. Signed-off-by: Tom Wilkie <[email protected]>
Firstly, cache the length of index rows we query (by the hash and range key). Secondly, fail for rows with > 100k, either because the cache told us so, or because we read them. Finally, allow matchers to fail on cardinality errors but proceed with the query (as long as at least one matcher succeeds), and then filter results. Notably, after this change, queries on two high-cardinality labels that would have results in a small number of series will fail. Signed-off-by: Tom Wilkie <[email protected]>
Signed-off-by: Tom Wilkie <[email protected]>
Signed-off-by: Goutham Veeramachaneni <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am approving this since it has some nice changes. If anyone is worried about the performance characteristic change they can leave it in v6 until those are updated.
Fixes #433 , fixes #884
Consist of 4 changes:
Move compositeSchema abstraction up to componsiteStore.
Promote the composite schema abstraction to "composite chunk store" - a chunk store which delegates to different chunk stores based on time. This allows us to vary the store implementation over time, and not just the schema. This will unblock the new bigtable storage adapter (using columns instead of rows), and allow us to more easily implement the iterative intersections and indexing of series instead of chunks.
Corner case when a writing chunks which span multiple stores: they are written to both stores, and instead of using the chunk start/end we use the schema start/end. This will lead to duplication of the index entries on schema migrations, but is actually already the case for day boundaries anyway. It will lead to duplicate writes of the chunk on schema migrations - they should be deduped by the underlying store.
Index series, not chunks
We should index the series, not chunks; this will reduce the number of entries in the index by
replication factor * (chunk size / bucket size)
, or 3 * 6hrs / 24hrs - ie 12x. This will however mean we need another index from series to chunks, introducing 1 extra write an N extra reads per query. Expectation is a reduction in query latency (and bigtable query usage, and memory cost) by 12x, and then an increase by 2x as we have to do a bunch of queries.This change introduces the seriesStore, a new chunk store implementation that, combined with the v9 schema, indexes series not chunks.
I tried to adapt the original chunk store to support this style of indexing - easy on the write path, but the read path became even more of a rats nest. So I factored out the common bits as best I could and made a new chunk store.
Add heapCache, a cache that uses a heap for evictions.
heapCache is a simple string -> interface{} cache which uses a heap to manage evictions. O(log N) inserts and updates, O(1) gets.
Skip index queries for high cardinality labels.
Firstly, cache the length of index rows we query (by the hash and range key). Secondly, fail for rows with > 100k, either because the cache told us so, or because we read them. Finally, allow matchers to fail on cardinality errors but proceed with the query (as long as at least one matcher succeeds), and then filter results.
Notably, after this change, queries on two high-cardinality labels that would have results in a small number of series will fail.