-
Notifications
You must be signed in to change notification settings - Fork 816
Cache index writes (and change some flag names) #1024
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pkg/chunk/chunk_store.go
Outdated
@@ -67,6 +79,8 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { | |||
f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.") | |||
f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.") | |||
f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") | |||
|
|||
f.IntVar(&cfg.IndexEntryCacheSize, "store.index-entry-cache", 0, "The number of index entries to cache so we don't write duplicates.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to the extra line here & above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/The number of index entries to cache so we don't write duplicates./Size of index entry cache used to deduplicate writes./
pkg/chunk/chunk_store.go
Outdated
key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) | ||
if _, ok := seenIndexEntries[key]; !ok { | ||
seenIndexEntries[key] = struct{}{} | ||
rowWrites.Observe(entry.HashValue, 1) | ||
result.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) | ||
} | ||
} | ||
c.entryCache.Store(context.Background(), cacheKeys, make([][]byte, len(cacheKeys))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little worries that if the write fails we won't try again as you've added it to the cache too early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will lose data, at least on DynamoDB where the whole write can fail and be retried.
pkg/chunk/chunk_store.go
Outdated
} | ||
|
||
return keys, keyMap | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be tempted to inline this in dedupeEntriesFromCache
as its only used there.
First round of review; we should also make this use the tiered cache so dedupes work across ingester restarts. And some tests please. |
447df20
to
3139b89
Compare
pkg/chunk/chunk_store.go
Outdated
keys := make([]string, 0, len(entries)) | ||
keyMap := make(map[string]IndexEntry, len(entries)) | ||
for _, entry := range entries { | ||
key := strings.Join([]string{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically repeating the Sprintf("%s:%s:%x",
from earlier in a different way. Be more DRY.
What sort of memory growth do you see from this? |
per bucket 😞 |
There are heuristics which can improve matters. E.g.
That has to cover a significant percentage. |
Hmm, how much improvement can these heuristics bring? We'll need to store about |
So I pushed 2 commits which move flags for making a tiered cache to single place instead of having custom ones everywhere, but I'm not a fan of the change. This breaks some flags and adds additional flags where not needed. /cc @tomwilkie |
Seriously? What's your calculation for size of each item? |
Here's my calculation: So that's 173 bytes, plus cache overhead: Times 30 million entries is 8 GB, plus Go heap expansion is 16 GB extra RAM I need per ingester. Having thought about it some more, it would be better to have the cache know that all entries for a (v9) series go together, so we remove the label name and value from the key and reduce the entries 15x. |
Oh Yes! This is indeed much, much better! |
03709c7
to
ea5c6fb
Compare
This is now ready for review. I agree that in-mem might be too much, but throwing everything into memcache will help. Further, my calculations above are wrong, we cut into the next row every day, hence the max reduction of writes is atmost Finally, I've made it so that we have now have a tiered (in-mem --> disk --> memcache) for everything. This makes it extremely confusing because the descriptions for all are same. Don't have good ideas on how to fix it. |
I'm seeing some flakiness in the test:
|
5e56dcc
to
dd69601
Compare
Those 32s are base64-encoded, so actually ~43 I don’t want this merged without sorting the question of memory usage. What are you seeing in trials? |
pkg/chunk/chunk_store.go
Outdated
return entries, nil | ||
} | ||
|
||
found, missing = c.entryCache.Fetch(context.Background(), entries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please propagate the context so the traces work properly.
pkg/chunk/chunk_store.go
Outdated
return | ||
} | ||
|
||
c.entryCache.Store(context.Background(), entries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Propagate the context.
pkg/chunk/chunk_store.go
Outdated
|
||
for _, entry := range entries { | ||
key := dedupeKey(entry) | ||
out, err := proto.Marshal(&entry) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we write the index entry to the cache? Isn't the key enough to check equality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We moved to memcache so we now need to hash the keys as they might contain non utf-8
chars which doesn't work with memcache.
Now that we have hashes, we need to store the actual entry to dedupe.
pkg/chunk/series_store.go
Outdated
@@ -58,17 +58,23 @@ type seriesStore struct { | |||
func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { | |||
fetcher, err := NewChunkFetcher(cfg.CacheConfig, storage) | |||
if err != nil { | |||
return nil, err | |||
return nil, errors.Wrap(err, "create chunk fetcher") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally put error.Wrap at the very leaf of where the error is returns, in our code. Adding on half way down the stack isn't that useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I disagree. Usually adding it here let's us track if it's from the chunk fetcher or from cache creation (below). Why do you think it isn't that useful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarified what I meant f2f.
pkg/chunk/storage/factory.go
Outdated
f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") | ||
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") | ||
cfg.memcacheClient.RegisterFlagsWithPrefix("index", f) | ||
cfg.indexCache.RegisterFlagsWithPrefix("store.index-cache-read", f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't this change the flags? Won't think break backwards compatibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it does. But I couldn't come up with a descriptive one that doesn't confuse users of the two caches now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't break backward compat like this. At least leave the old flags in with a deprecated notice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@bboreham you're memory consumption calculation look about right to me. Whats more, there is very little point in storing this in process, as another 2x of the writes come from other ingesters. Therefore we shouldn't use the FIFO cache, and only memcached for this. Considering that, I make it:
Which would double the memory usage of cortex on the write path. I'm inclined to believe this is too much too. I chatted with @gouthamve, and believe that moving this caching to the write path in the series store would allow us to only cache (userid, day, series ID) and use that to avoid writing out the label index, without too much of a layering violation. This would result in some code duplication between the chunk store and series store, as their write paths are currently shared, but I think that will be fine. WDYT? |
Sure; get something that caches just once per series then we can see if the layering/duplication can be improved. |
I've got something working here. Will deploy to dev and let you know. |
pkg/chunk/series_store.go
Outdated
} | ||
|
||
bufs := make([][]byte, len(keysToCache)) | ||
c.entryCache.Store(context.Background(), keysToCache, bufs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call is too early again, no?
How the prefixed flags look now:
|
@gouthamve give this a rebase into 2 changes: one that updates the flags and caches, and one that adds the write caching. Then I'll give it what I hope if a final review. |
We're writing the series label index to bigtable for every chunk We now cache the series-id and write only if we didn't write it before Signed-off-by: Goutham Veeramachaneni <[email protected]> -------- This is a squashed commit but only including Tom's commits' description for attribution. Review feedback. Signed-off-by: Tom Wilkie <[email protected]> Write back cache keys after they have be written to store. Signed-off-by: Tom Wilkie <[email protected]>
Signed-off-by: Goutham Veeramachaneni <[email protected]>
bf23019
to
6c37c87
Compare
Signed-off-by: Goutham Veeramachaneni <[email protected]>
6c37c87
to
baf626e
Compare
Split it into 2 commits @tomwilkie |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Been super detailed and nit picky - sorry! Generally looking really good.
pkg/chunk/cache/cache.go
Outdated
|
||
// For tests to inject specific implementations. | ||
Cache Cache | ||
} | ||
|
||
// RegisterFlags adds the flags required to config this to the given FlagSet. | ||
func (cfg *Config) RegisterFlags(f *flag.FlagSet) { | ||
f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") | ||
cfg.RegisterFlagsWithPrefix("", "", f) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RegisterFlags
only used in two places now: frontend and chunk store. Can we remove the function and call RegisterFlagsWithPrefix
from there?
pkg/chunk/cache/background.go
Outdated
f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") | ||
f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") | ||
cfg.RegisterFlagsWithPrefix("", "", f) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see this function being used anymore - can we remove it?
pkg/chunk/cache/cache.go
Outdated
|
||
if prefix != "" { | ||
prefix += "." | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This pattern is repeated quite a lot - perhaps we could do it here once?
pkg/chunk/cache/cache.go
Outdated
prefix := "" | ||
if cfg.prefix != "" { | ||
prefix = cfg.prefix | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? Can we use cfg.prefix below?
@@ -17,7 +17,7 @@ type fixture struct { | |||
func (f fixture) Name() string { return "caching-store" } | |||
func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { | |||
storageClient, tableClient, schemaConfig, err := f.fixture.Clients() | |||
client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", 500, 5*time.Minute), 5*time.Minute) | |||
client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{500, 5 * time.Minute}), 5*time.Minute) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle use of FifoCacheConfig
, use util.DefaultValues
.
@@ -34,7 +34,7 @@ func TestCachingStorageClientBasic(t *testing.T) { | |||
}}, | |||
}, | |||
} | |||
cache := cache.NewFifoCache("test", 10, 10*time.Second) | |||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle use of FifoCacheConfig
, use util.DefaultValues
.
@@ -63,7 +63,7 @@ func TestCachingStorageClient(t *testing.T) { | |||
}}, | |||
}, | |||
} | |||
cache := cache.NewFifoCache("test", 10, 10*time.Second) | |||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle use of FifoCacheConfig
, use util.DefaultValues
.
@@ -113,7 +113,7 @@ func TestCachingStorageClientCollision(t *testing.T) { | |||
}, | |||
}, | |||
} | |||
cache := cache.NewFifoCache("test", 10, 10*time.Second) | |||
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brittle use of FifoCacheConfig
, use util.DefaultValues
.
var tieredCache cache.Cache | ||
var err error | ||
|
||
// Building up from deprecated flags. | ||
var caches []cache.Cache | ||
if cfg.IndexCacheSize > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to replication the logic in cache.NewCache
- shouldn't we be using that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It complicates things by making it harder to detect if the deprecated flags were used or not. cache.New
always returns a non null cache, even with empty config.
Though I should prolly fix that.
Oh, last thing - as we hash the write keys, we need to write the full key to the cache to and check for collisions. |
pkg/chunk/storage/factory.go
Outdated
opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.IndexCacheValidity) | ||
} | ||
for i := range opts { | ||
opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.indexCache.DefaultValidity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: cfg.indexCache.DefaultValidity
should be set to cfg.IndexCacheValidity
for backwards compat.
84e47fc
to
d305f0e
Compare
Signed-off-by: Goutham Veeramachaneni <[email protected]>
d305f0e
to
11b7b47
Compare
So, I've fixed everything except using Also, we're not hashing the keys, rather encoding them. I've noticed that the key length is |
Could you say that more loudly next time? E.g. in the PR title, in the Slack channel, in the PR description at the top. |
Depends on #1011 (The cache interface is changing there)
Fixes #957
Now, we've seen that we write an average of 11 index entries per chunk. In the
v9
schema, 10 of those entries are series dependant while one is theseries-id ---> chunkID
mapping. Essentially we're doing 10x repeated writes!Now, this PR let's you cache and dedupe the entries letting you reduce the write load on the database by 10x. But you need to make sure the cache size is
11 x numSeries
(depends on your setup), else you'll end up evicting the entries before the series can hit them.Still needs to be tested.