From 2f00a6209cfcce1a77c52bd9e33e77d0d68baf31 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 21 Sep 2018 16:47:54 +0530 Subject: [PATCH 1/3] store: cache the series-id written to store to dedupe writes We're writing the series label index to bigtable for every chunk We now cache the series-id and write only if we didn't write it before Signed-off-by: Goutham Veeramachaneni -------- This is a squashed commit but only including Tom's commits' description for attribution. Review feedback. Signed-off-by: Tom Wilkie Write back cache keys after they have be written to store. Signed-off-by: Tom Wilkie --- pkg/chunk/cache/cache.go | 34 ++- pkg/chunk/cache/cache_test.go | 2 +- pkg/chunk/cache/fifo_cache.go | 26 +- pkg/chunk/cache/fifo_cache_test.go | 4 +- pkg/chunk/cache/memcached_client.go | 6 +- pkg/chunk/chunk_store.go | 13 +- pkg/chunk/chunk_store_test.go | 228 ++++++++++++------ pkg/chunk/inmemory_storage_client.go | 4 + pkg/chunk/schema.go | 129 +++++++++- pkg/chunk/series_store.go | 106 +++++++- pkg/chunk/storage/caching_fixtures.go | 2 +- .../storage/caching_storage_client_test.go | 6 +- pkg/chunk/storage/factory.go | 32 ++- 13 files changed, 466 insertions(+), 126 deletions(-) diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index a21f29cdf9..6f625ed589 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -3,6 +3,7 @@ package cache import ( "context" "flag" + "time" ) // Cache byte arrays by key. @@ -15,11 +16,17 @@ type Cache interface { // Config for building Caches. type Config struct { EnableDiskcache bool + EnableFifoCache bool + + DefaultValidity time.Duration background BackgroundConfig memcache MemcachedConfig memcacheClient MemcachedClientConfig diskcache DiskcacheConfig + fifocache FifoCacheConfig + + prefix string // For tests to inject specific implementations. Cache Cache @@ -28,11 +35,14 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") + f.BoolVar(&cfg.EnableFifoCache, "cache.enable-fifocache", false, "Enable in-mem cache") + f.DurationVar(&cfg.DefaultValidity, "cache.default-validity", 0, "The default validity of entries for caches unless overridden.") cfg.background.RegisterFlags(f) cfg.memcache.RegisterFlags(f) cfg.memcacheClient.RegisterFlags(f) cfg.diskcache.RegisterFlags(f) + cfg.fifocache.RegisterFlags(f) } // New creates a new Cache using Config. @@ -43,23 +53,41 @@ func New(cfg Config) (Cache, error) { caches := []Cache{} + if cfg.EnableFifoCache { + prefix := "" + if cfg.prefix != "" { + prefix = cfg.prefix + } + + if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 { + cfg.fifocache.Validity = cfg.DefaultValidity + } + + cache := NewFifoCache(prefix, cfg.fifocache) + caches = append(caches, Instrument(cfg.prefix+"fifocache", cache)) + } + if cfg.EnableDiskcache { cache, err := NewDiskcache(cfg.diskcache) if err != nil { return nil, err } - caches = append(caches, Instrument("diskcache", cache)) + caches = append(caches, Instrument(cfg.prefix+"diskcache", cache)) } if cfg.memcacheClient.Host != "" { + if cfg.memcache.Expiration == 0 && cfg.DefaultValidity != 0 { + cfg.memcache.Expiration = cfg.DefaultValidity + } + client := NewMemcachedClient(cfg.memcacheClient) cache := NewMemcached(cfg.memcache, client) - caches = append(caches, Instrument("memcache", cache)) + caches = append(caches, Instrument(cfg.prefix+"memcache", cache)) } cache := NewTiered(caches) if len(caches) > 1 { - cache = Instrument("tiered", cache) + cache = Instrument(cfg.prefix+"tiered", cache) } cache = NewBackground(cfg.background, cache) diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index a950192234..cd82e1d515 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -167,7 +167,7 @@ func TestDiskcache(t *testing.T) { } func TestFifoCache(t *testing.T) { - cache := cache.NewFifoCache("test", 1e3, 1*time.Hour) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{1e3, 1 * time.Hour}) testCache(t, cache) } diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index 74a6d666ee..6d333fd686 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -2,6 +2,7 @@ package cache import ( "context" + "flag" "sync" "time" @@ -53,6 +54,18 @@ var ( }, []string{"cache"}) ) +// FifoCacheConfig holds config for the FifoCache. +type FifoCacheConfig struct { + Size int + Validity time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *FifoCacheConfig) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.Size, "fifocache.size", 0, "The number of entries to cache.") + f.DurationVar(&cfg.Validity, "fifocache.duration", 0, "The expiry duration for the cache.") +} + // FifoCache is a simple string -> interface{} cache which uses a fifo slide to // manage evictions. O(1) inserts and updates, O(1) gets. type FifoCache struct { @@ -82,12 +95,12 @@ type cacheEntry struct { } // NewFifoCache returns a new initialised FifoCache of size. -func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { +func NewFifoCache(name string, cfg FifoCacheConfig) *FifoCache { return &FifoCache{ - size: size, - validity: validity, - entries: make([]cacheEntry, 0, size), - index: make(map[string]int, size), + size: cfg.Size, + validity: cfg.Validity, + entries: make([]cacheEntry, 0, cfg.Size), + index: make(map[string]int, cfg.Size), name: name, entriesAdded: cacheEntriesAdded.WithLabelValues(name), @@ -216,8 +229,7 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { index, ok := c.index[key] if ok { updated := c.entries[index].updated - if time.Now().Sub(updated) < c.validity { - + if c.validity == 0 || time.Now().Sub(updated) < c.validity { return c.entries[index].value, true } diff --git a/pkg/chunk/cache/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go index 72e2e5cf4a..230107e112 100644 --- a/pkg/chunk/cache/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -14,7 +14,7 @@ const size = 10 const overwrite = 5 func TestFifoCache(t *testing.T) { - c := NewFifoCache("test", size, 1*time.Minute) + c := NewFifoCache("test", FifoCacheConfig{size, 1 * time.Minute}) ctx := context.Background() // Check put / get works @@ -74,7 +74,7 @@ func TestFifoCache(t *testing.T) { } func TestFifoCacheExpiry(t *testing.T) { - c := NewFifoCache("test", size, 5*time.Millisecond) + c := NewFifoCache("test", FifoCacheConfig{size, 5 * time.Millisecond}) ctx := context.Background() c.Put(ctx, []string{"0"}, []interface{}{0}) diff --git a/pkg/chunk/cache/memcached_client.go b/pkg/chunk/cache/memcached_client.go index eec25e5529..944e26454c 100644 --- a/pkg/chunk/cache/memcached_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -41,15 +41,11 @@ type MemcachedClientConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { - cfg.registerFlagsWithPrefix("", f) + cfg.RegisterFlagsWithPrefix("", f) } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - cfg.registerFlagsWithPrefix(prefix, f) -} - -func (cfg *MemcachedClientConfig) registerFlagsWithPrefix(prefix string, f *flag.FlagSet) { if prefix != "" { prefix = prefix + "." } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index d4266a9a91..28594cff94 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -21,7 +22,7 @@ import ( ) var ( - indexEntriesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ + indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_entries_per_chunk", Help: "Number of entries written to storage per chunk.", @@ -36,7 +37,7 @@ var ( }, HashBuckets: 1024, }) - cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ + cacheCorrupt = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_corrupt_chunks_total", Help: "Total count of corrupt chunks found in cache.", @@ -44,9 +45,7 @@ var ( ) func init() { - prometheus.MustRegister(indexEntriesPerChunk) prometheus.MustRegister(rowWrites) - prometheus.MustRegister(cacheCorrupt) } // StoreConfig specifies config for a ChunkStore @@ -58,11 +57,15 @@ type StoreConfig struct { CardinalityCacheSize int CardinalityCacheValidity time.Duration CardinalityLimit int + + EntryCache cache.Config } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) + cfg.EntryCache.RegisterFlags(f) + f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.") @@ -77,6 +80,8 @@ type store struct { storage StorageClient schema Schema *Fetcher + + entryCache cache.Cache } func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 62e7e63292..bbaceb749b 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -17,6 +17,7 @@ import ( "github.com/weaveworks/common/test" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" "github.com/weaveworks/cortex/pkg/util" "github.com/weaveworks/cortex/pkg/util/extract" @@ -24,6 +25,7 @@ import ( type schemaFactory func(cfg SchemaConfig) Schema type storeFactory func(StoreConfig, Schema, StorageClient) (Store, error) +type configFactory func() (StoreConfig, SchemaConfig) var schemas = []struct { name string @@ -40,6 +42,39 @@ var schemas = []struct { {"v9 schema", v9Schema, newSeriesStore, true}, } +var stores = []struct { + name string + configFn configFactory +}{ + { + name: "store", + configFn: func() (StoreConfig, SchemaConfig) { + var ( + storeCfg StoreConfig + schemaCfg SchemaConfig + ) + util.DefaultValues(&storeCfg, &schemaCfg) + return storeCfg, schemaCfg + }, + }, + { + name: "cached_store", + configFn: func() (StoreConfig, SchemaConfig) { + var ( + storeCfg StoreConfig + schemaCfg SchemaConfig + ) + util.DefaultValues(&storeCfg, &schemaCfg) + + storeCfg.EntryCache.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{ + Size: 500, + }) + + return storeCfg, schemaCfg + }, + }, +} + // newTestStore creates a new Store for testing. func newTestChunkStore(t *testing.T, schemaFactory schemaFactory, storeFactory storeFactory) Store { var ( @@ -48,6 +83,10 @@ func newTestChunkStore(t *testing.T, schemaFactory schemaFactory, storeFactory s ) util.DefaultValues(&storeCfg, &schemaCfg) + return newTestChunkStoreConfig(t, storeCfg, schemaCfg, schemaFactory, storeFactory) +} + +func newTestChunkStoreConfig(t *testing.T, storeCfg StoreConfig, schemaCfg SchemaConfig, schemaFactory schemaFactory, storeFactory storeFactory) Store { storage := NewMockStorage() tableManager, err := NewTableManager(schemaCfg, maxChunkAge, storage) require.NoError(t, err) @@ -202,62 +241,65 @@ func TestChunkStore_Get(t *testing.T) { }, } { for _, schema := range schemas { - t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { - t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) - defer store.Stop() - - if err := store.Put(ctx, []Chunk{ - fooChunk1, - fooChunk2, - barChunk1, - barChunk2, - }); err != nil { - t.Fatal(err) - } - - matchers, err := promql.ParseMetricSelector(tc.query) - if err != nil { - t.Fatal(err) - } - - metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) - if schema.requireMetricName && (!ok || metricNameMatcher.Type != labels.MatchEqual) { - return - } - - // Query with ordinary time-range - chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) - require.NoError(t, err) - - matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now) - require.NoError(t, err) - - sort.Sort(ByFingerprint(matrix1)) - if !reflect.DeepEqual(tc.expect, matrix1) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix1)) - } - - // Pushing end of time-range into future should yield exact same resultset - chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...) - require.NoError(t, err) - - matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now) - require.NoError(t, err) - - sort.Sort(ByFingerprint(matrix2)) - if !reflect.DeepEqual(tc.expect, matrix2) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix2)) - } - - // Query with both begin & end of time-range in future should yield empty resultset - matrix3, err := store.Get(ctx, now.Add(time.Hour), now.Add(time.Hour*2), matchers...) - require.NoError(t, err) - if len(matrix3) != 0 { - t.Fatalf("%s: future query should yield empty resultset ... actually got %v chunks: %#v", - tc.query, len(matrix3), matrix3) - } - }) + for _, storeCase := range stores { + t.Run(fmt.Sprintf("%s / %s / %s", tc.query, schema.name, storeCase.name), func(t *testing.T) { + t.Log("========= Running query", tc.query, "with schema", schema.name) + storeCfg, schemaCfg := storeCase.configFn() + store := newTestChunkStoreConfig(t, storeCfg, schemaCfg, schema.schemaFn, schema.storeFn) + defer store.Stop() + + if err := store.Put(ctx, []Chunk{ + fooChunk1, + fooChunk2, + barChunk1, + barChunk2, + }); err != nil { + t.Fatal(err) + } + + matchers, err := promql.ParseMetricSelector(tc.query) + if err != nil { + t.Fatal(err) + } + + metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) + if schema.requireMetricName && (!ok || metricNameMatcher.Type != labels.MatchEqual) { + return + } + + // Query with ordinary time-range + chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) + require.NoError(t, err) + + matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now) + require.NoError(t, err) + + sort.Sort(ByFingerprint(matrix1)) + if !reflect.DeepEqual(tc.expect, matrix1) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix1)) + } + + // Pushing end of time-range into future should yield exact same resultset + chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...) + require.NoError(t, err) + + matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now) + require.NoError(t, err) + + sort.Sort(ByFingerprint(matrix2)) + if !reflect.DeepEqual(tc.expect, matrix2) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix2)) + } + + // Query with both begin & end of time-range in future should yield empty resultset + matrix3, err := store.Get(ctx, now.Add(time.Hour), now.Add(time.Hour*2), matchers...) + require.NoError(t, err) + if len(matrix3) != 0 { + t.Fatalf("%s: future query should yield empty resultset ... actually got %v chunks: %#v", + tc.query, len(matrix3), matrix3) + } + }) + } } } } @@ -320,27 +362,30 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { }, } { for _, schema := range schemas { - t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { - t.Log("========= Running query", tc.query, "with schema", schema.name) - store := newTestChunkStore(t, schema.schemaFn, schema.storeFn) - defer store.Stop() - - if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { - t.Fatal(err) - } - - matchers, err := promql.ParseMetricSelector(tc.query) - if err != nil { - t.Fatal(err) - } - - chunks, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) - require.NoError(t, err) - - if !reflect.DeepEqual(tc.expect, chunks) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks)) - } - }) + for _, storeCase := range stores { + t.Run(fmt.Sprintf("%s / %s / %s", tc.query, schema.name, storeCase.name), func(t *testing.T) { + t.Log("========= Running query", tc.query, "with schema", schema.name) + storeCfg, schemaCfg := storeCase.configFn() + store := newTestChunkStoreConfig(t, storeCfg, schemaCfg, schema.schemaFn, schema.storeFn) + defer store.Stop() + + if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { + t.Fatal(err) + } + + matchers, err := promql.ParseMetricSelector(tc.query) + if err != nil { + t.Fatal(err) + } + + chunks, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...) + require.NoError(t, err) + + if !reflect.DeepEqual(tc.expect, chunks) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks)) + } + }) + } } } } @@ -479,3 +524,32 @@ func TestChunkStoreLeastRead(t *testing.T) { assert.Equal(t, int(numChunks), len(chunks)) } } + +func TestIndexCachingWorks(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), userID) + now := model.Now() + metric := model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + } + + storeMaker := stores[1] + storeCfg, schemaCfg := storeMaker.configFn() + + store := newTestChunkStoreConfig(t, storeCfg, schemaCfg, v9Schema, newSeriesStore) + defer store.Stop() + + storage := store.(*seriesStore).storage.(*MockStorage) + + fooChunk1 := dummyChunkFor(now, metric) + fooChunk2 := dummyChunkFor(now.Add(1*time.Millisecond), metric) + + err := store.Put(ctx, []Chunk{fooChunk1}) + require.NoError(t, err) + n := storage.numWrites + + // Only one extra entry for the new chunk of same series. + err = store.Put(ctx, []Chunk{fooChunk2}) + require.NoError(t, err) + require.Equal(t, n+1, storage.numWrites) +} diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index 4978dede57..8d8888c95f 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -17,6 +17,8 @@ type MockStorage struct { mtx sync.RWMutex tables map[string]*mockTable objects map[string][]byte + + numWrites int } type mockTable struct { @@ -125,6 +127,8 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error { mockBatch := *batch.(*mockWriteBatch) seenWrites := map[string]bool{} + m.numWrites += len(mockBatch) + for _, req := range mockBatch { table, ok := m.tables[req.tableName] if !ok { diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 00d09e9f89..6a12a05474 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -1,6 +1,8 @@ package chunk import ( + "bytes" + "encoding/hex" "errors" "fmt" "strings" @@ -30,6 +32,10 @@ type Schema interface { // When doing a write, use this method to return the list of entries you should write to. GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetLabelWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetChunkWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetLabelEntryCacheKey(from, through model.Time, userID string, labels model.Metric) []string + // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) @@ -153,6 +159,50 @@ func (s schema) GetWriteEntries(from, through model.Time, userID string, metricN return result, nil } +func (s schema) GetLabelWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + var result []IndexEntry + + for _, bucket := range s.buckets(from, through, userID) { + entries, err := s.entries.GetLabelWriteEntries(bucket, metricName, labels, chunkID) + if err != nil { + return nil, err + } + result = append(result, entries...) + } + return result, nil +} +func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + var result []IndexEntry + + for _, bucket := range s.buckets(from, through, userID) { + entries, err := s.entries.GetChunkWriteEntries(bucket, metricName, labels, chunkID) + if err != nil { + return nil, err + } + result = append(result, entries...) + } + return result, nil + +} + +// Should only used for v9Schema +func (s schema) GetLabelEntryCacheKey(from, through model.Time, userID string, labels model.Metric) []string { + var result []string + for _, bucket := range s.buckets(from, through, userID) { + key := bytes.Join([][]byte{ + []byte(bucket.tableName), + []byte(bucket.hashKey), + sha256bytes(labels.String()), + }, + []byte("-"), + ) + + result = append(result, hex.EncodeToString(key)) + } + + return result +} + func (s schema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { var result []IndexQuery @@ -211,6 +261,9 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri type entries interface { GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) @@ -238,6 +291,13 @@ func (originalEntries) GetWriteEntries(bucket Bucket, metricName model.LabelValu return result, nil } +func (originalEntries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} +func (originalEntries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} + func (originalEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -297,6 +357,13 @@ func (base64Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, return result, nil } +func (base64Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} +func (base64Entries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} + func (base64Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { encodedBytes := encodeBase64Value(labelValue) return []IndexQuery{ @@ -335,6 +402,13 @@ func (labelNameInHashKeyEntries) GetWriteEntries(bucket Bucket, metricName model return entries, nil } +func (labelNameInHashKeyEntries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} +func (labelNameInHashKeyEntries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} + func (labelNameInHashKeyEntries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -398,6 +472,13 @@ func (v5Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } +func (v5Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} +func (v5Entries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} + func (v5Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { @@ -460,6 +541,13 @@ func (v6Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } +func (v6Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} +func (v6Entries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + return nil, ErrNotSupported +} + func (v6Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { encodedFromBytes := encodeTime(bucket.from) return []IndexQuery{ @@ -502,9 +590,24 @@ func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) { type v9Entries struct { } -func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { +func (e v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + labelEntries, err := e.GetLabelWriteEntries(bucket, metricName, labels, chunkID) + if err != nil { + return nil, err + } + + chunkEntries, err := e.GetChunkWriteEntries(bucket, metricName, labels, chunkID) + if err != nil { + return nil, err + } + + entries := append(labelEntries, chunkEntries...) + + return entries, nil +} + +func (v9Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { seriesID := sha256bytes(labels.String()) - encodedThroughBytes := encodeTime(bucket.through) entries := []IndexEntry{ // Entry for metricName -> seriesID @@ -513,12 +616,6 @@ func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab HashValue: bucket.hashKey + ":" + string(metricName), RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1), }, - // Entry for seriesID -> chunkID - { - TableName: bucket.tableName, - HashValue: bucket.hashKey + ":" + string(seriesID), - RangeValue: encodeRangeKey(encodedThroughBytes, nil, []byte(chunkID), chunkTimeRangeKeyV3), - }, } // Entries for metricName:labelName -> hash(value):seriesID @@ -539,6 +636,22 @@ func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, lab return entries, nil } +func (v9Entries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + seriesID := sha256bytes(labels.String()) + encodedThroughBytes := encodeTime(bucket.through) + + entries := []IndexEntry{ + // Entry for seriesID -> chunkID + { + TableName: bucket.tableName, + HashValue: bucket.hashKey + ":" + string(seriesID), + RangeValue: encodeRangeKey(encodedThroughBytes, nil, []byte(chunkID), chunkTimeRangeKeyV3), + }, + } + + return entries, nil +} + func (v9Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) { return []IndexQuery{ { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 1ac1c23ee1..372f8d8a10 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -2,10 +2,10 @@ package chunk import ( "context" - "errors" "fmt" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -61,14 +61,20 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor return nil, err } + entryCache, err := cache.New(cfg.EntryCache) + if err != nil { + return nil, err + } + return &seriesStore{ store: store{ - cfg: cfg, - storage: storage, - schema: schema, - Fetcher: fetcher, + cfg: cfg, + storage: storage, + schema: schema, + Fetcher: fetcher, + entryCache: entryCache, }, - cardinalityCache: cache.NewFifoCache("cardinality", cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity), + cardinalityCache: cache.NewFifoCache("cardinality", cache.FifoCacheConfig{cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity}), }, nil } @@ -291,3 +297,91 @@ func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through mo result, err := c.parseIndexEntries(ctx, entries, nil) return result, err } + +// Put implements ChunkStore +func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error { + for _, chunk := range chunks { + if err := c.PutOne(ctx, chunk.From, chunk.Through, chunk); err != nil { + return err + } + } + return nil +} + +// PutOne implements ChunkStore +func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return err + } + + // Horribly, PutChunks mutates the chunk by setting its checksum. By putting + // the chunk in a slice we are in fact passing by reference, so below we + // need to make sure we pick the chunk back out the slice. + chunks := []Chunk{chunk} + + err = c.storage.PutChunks(ctx, chunks) + if err != nil { + return err + } + + c.writeBackCache(ctx, chunks) + + writeReqs, _, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunks[0]) + if err != nil { + return err + } + + if err := c.storage.BatchWrite(ctx, writeReqs); err != nil { + return err + } + + bufs := make([][]byte, len(keysToCache)) + c.entryCache.Store(ctx, keysToCache, bufs) + return nil +} + +// calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. +func (c *seriesStore) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, []IndexEntry, []string, error) { + seenIndexEntries := map[string]struct{}{} + entries := []IndexEntry{} + keysToCache := []string{} + + metricName, err := extract.MetricNameFromMetric(chunk.Metric) + if err != nil { + return nil, nil, nil, err + } + + keys := c.schema.GetLabelEntryCacheKey(from, through, userID, chunk.Metric) + _, _, missing := c.entryCache.Fetch(context.Background(), keys) + if len(missing) != 0 { + labelEntries, err := c.schema.GetLabelWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + if err != nil { + return nil, nil, nil, err + } + + entries = append(entries, labelEntries...) + keysToCache = missing + } + + chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) + if err != nil { + return nil, nil, nil, err + } + entries = append(entries, chunkEntries...) + + indexEntriesPerChunk.Observe(float64(len(entries))) + + // Remove duplicate entries based on tableName:hashValue:rangeValue + result := c.storage.NewWriteBatch() + for _, entry := range entries { + key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) + if _, ok := seenIndexEntries[key]; !ok { + seenIndexEntries[key] = struct{}{} + rowWrites.Observe(entry.HashValue, 1) + result.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) + } + } + + return result, entries, keysToCache, nil +} diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go index 36e46bfaf0..eff4d446a6 100644 --- a/pkg/chunk/storage/caching_fixtures.go +++ b/pkg/chunk/storage/caching_fixtures.go @@ -17,7 +17,7 @@ type fixture struct { func (f fixture) Name() string { return "caching-store" } func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { storageClient, tableClient, schemaConfig, err := f.fixture.Clients() - client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", 500, 5*time.Minute), 5*time.Minute) + client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{500, 5 * time.Minute}), 5*time.Minute) return client, tableClient, schemaConfig, err } func (f fixture) Teardown() error { return f.fixture.Teardown() } diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index 1cc1f1391e..c91dcf31a8 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -34,7 +34,7 @@ func TestCachingStorageClientBasic(t *testing.T) { }}, }, } - cache := cache.NewFifoCache("test", 10, 10*time.Second) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{{ TableName: "table", @@ -63,7 +63,7 @@ func TestCachingStorageClient(t *testing.T) { }}, }, } - cache := cache.NewFifoCache("test", 10, 10*time.Second) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{ {TableName: "table", HashValue: "foo"}, @@ -113,7 +113,7 @@ func TestCachingStorageClientCollision(t *testing.T) { }, }, } - cache := cache.NewFifoCache("test", 10, 10*time.Second) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{ {TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")}, diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 4b6704e0f9..05f7afcb14 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -27,6 +27,8 @@ type Config struct { IndexCacheSize int IndexCacheValidity time.Duration memcacheClient cache.MemcachedClientConfig + + indexCache cache.Config } // RegisterFlags adds the flags required to configure this flag set. @@ -36,19 +38,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.GCPStorageConfig.RegisterFlags(f) cfg.CassandraStorageConfig.RegisterFlags(f) - f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") - f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") + // Deprecated flags!! + f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable. DEPRECATED: Use -store.index-cache-read.*") + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle. DEPRECATED: Use -store.index-cache-read.*") cfg.memcacheClient.RegisterFlagsWithPrefix("index", f) + + cfg.indexCache.RegisterFlags(f) } // Opts makes the storage clients based on the configuration. func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) { + var tieredCache cache.Cache + var err error + + // Building up from deprecated flags. var caches []cache.Cache if cfg.IndexCacheSize > 0 { - fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cfg.IndexCacheSize, cfg.IndexCacheValidity)) + fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{cfg.IndexCacheSize, cfg.IndexCacheValidity})) caches = append(caches, fifocache) } - if cfg.memcacheClient.Host != "" { client := cache.NewMemcachedClient(cfg.memcacheClient) memcache := cache.Instrument("memcache-index", cache.NewMemcached(cache.MemcachedConfig{ @@ -60,16 +68,22 @@ func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) }, memcache)) } + if len(caches) > 0 { + tieredCache = cache.NewTiered(caches) + } else { + tieredCache, err = cache.New(cfg.indexCache) + if err != nil { + return nil, err + } + } + opts, err := newStorageOpts(cfg, schemaCfg) if err != nil { return nil, errors.Wrap(err, "error creating storage client") } - if len(caches) > 0 { - tieredCache := cache.Instrument("tiered-index", cache.NewTiered(caches)) - for i := range opts { - opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.IndexCacheValidity) - } + for i := range opts { + opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.indexCache.DefaultValidity) } return opts, nil From df519ab70f4bd78ab0ff4790280fce3e4056ae12 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 2 Oct 2018 14:25:52 +0530 Subject: [PATCH 2/3] flags: Add prefixes and descriptions to flags of cache Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/cache/background.go | 13 +++++++++++-- pkg/chunk/cache/cache.go | 29 ++++++++++++++++++++--------- pkg/chunk/cache/diskcache.go | 13 +++++++++++-- pkg/chunk/cache/fifo_cache.go | 13 +++++++++++-- pkg/chunk/cache/memcached.go | 15 ++++++++++++--- pkg/chunk/cache/memcached_client.go | 12 ++++++------ pkg/chunk/chunk_store.go | 3 ++- pkg/chunk/storage/factory.go | 8 ++++---- 8 files changed, 77 insertions(+), 29 deletions(-) diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go index 47c2f89232..4b5226bef9 100644 --- a/pkg/chunk/cache/background.go +++ b/pkg/chunk/cache/background.go @@ -36,8 +36,17 @@ type BackgroundConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") - f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") + cfg.RegisterFlagsWithPrefix("", "", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { + if prefix != "" { + prefix = prefix + "." + } + + f.IntVar(&cfg.WriteBackGoroutines, prefix+"memcache.write-back-goroutines", 10, description+"How many goroutines to use to write back to memcache.") + f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many chunks to buffer for background write back.") } type backgroundCache struct { diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index 6f625ed589..0af219bd11 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -34,15 +34,26 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") - f.BoolVar(&cfg.EnableFifoCache, "cache.enable-fifocache", false, "Enable in-mem cache") - f.DurationVar(&cfg.DefaultValidity, "cache.default-validity", 0, "The default validity of entries for caches unless overridden.") - - cfg.background.RegisterFlags(f) - cfg.memcache.RegisterFlags(f) - cfg.memcacheClient.RegisterFlags(f) - cfg.diskcache.RegisterFlags(f) - cfg.fifocache.RegisterFlags(f) + cfg.RegisterFlagsWithPrefix("", "", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { + cfg.background.RegisterFlagsWithPrefix(prefix, description, f) + cfg.memcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.memcacheClient.RegisterFlagsWithPrefix(prefix, description, f) + cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f) + cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f) + + if prefix != "" { + prefix += "." + } + + f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.") + f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.") + f.DurationVar(&cfg.DefaultValidity, prefix+"cache.default-validity", 0, description+"The default validity of entries for caches unless overridden.") + + cfg.prefix = prefix } // New creates a new Cache using Config. diff --git a/pkg/chunk/cache/diskcache.go b/pkg/chunk/cache/diskcache.go index 7dbf72c808..3f32facf3b 100644 --- a/pkg/chunk/cache/diskcache.go +++ b/pkg/chunk/cache/diskcache.go @@ -32,8 +32,17 @@ type DiskcacheConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.") - f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)") + cfg.RegisterFlagsWithPrefix("", "", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *DiskcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + if prefix != "" { + prefix += "." + } + + f.StringVar(&cfg.Path, prefix+"diskcache.path", "/var/run/chunks", description+"Path to file used to cache chunks.") + f.IntVar(&cfg.Size, prefix+"diskcache.size", 1024*1024*1024, description+"Size of file (bytes)") } // Diskcache is an on-disk chunk cache. diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index 6d333fd686..a008f8f016 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -62,8 +62,17 @@ type FifoCacheConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *FifoCacheConfig) RegisterFlags(f *flag.FlagSet) { - f.IntVar(&cfg.Size, "fifocache.size", 0, "The number of entries to cache.") - f.DurationVar(&cfg.Validity, "fifocache.duration", 0, "The expiry duration for the cache.") + cfg.RegisterFlagsWithPrefix("", "", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + if prefix != "" { + prefix += "." + } + + f.IntVar(&cfg.Size, prefix+"fifocache.size", 0, description+"The number of entries to cache.") + f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", 0, description+"The expiry duration for the cache.") } // FifoCache is a simple string -> interface{} cache which uses a fifo slide to diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index 331b7e2058..77d643d590 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -41,9 +41,18 @@ type MemcachedConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long keys stay in the memcache.") - f.IntVar(&cfg.BatchSize, "memcached.batchsize", 0, "How many keys to fetch in each batch.") - f.IntVar(&cfg.Parallelism, "memcached.parallelism", 100, "Maximum active requests to memcache.") + cfg.RegisterFlagsWithPrefix("", "", f) +} + +// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet +func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { + if prefix != "" { + prefix += "." + } + + f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.") + f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 0, description+"How many keys to fetch in each batch.") + f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 100, description+"Maximum active requests to memcache.") } // Memcached type caches chunks in memcached diff --git a/pkg/chunk/cache/memcached_client.go b/pkg/chunk/cache/memcached_client.go index 944e26454c..89bef4b85f 100644 --- a/pkg/chunk/cache/memcached_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -41,19 +41,19 @@ type MemcachedClientConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", f) + cfg.RegisterFlagsWithPrefix("", "", f) } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet -func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { +func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { if prefix != "" { prefix = prefix + "." } - f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") - f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", "SRV service used to discover memcache servers.") - f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") - f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") + f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", description+"Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", description+"SRV service used to discover memcache servers.") + f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.") + f.DurationVar(&cfg.UpdateInterval, prefix+"memcached.update-interval", 1*time.Minute, description+"Period with which to poll DNS for memcache servers.") } // NewMemcachedClient creates a new MemcacheClient that gets its server list diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 28594cff94..282465a317 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -64,7 +64,8 @@ type StoreConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) - cfg.EntryCache.RegisterFlags(f) + + cfg.EntryCache.RegisterFlagsWithPrefix("store.index-cache-write", "Cache config for index entry writing. ", f) f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 05f7afcb14..395c8c769c 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -39,11 +39,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.CassandraStorageConfig.RegisterFlags(f) // Deprecated flags!! - f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable. DEPRECATED: Use -store.index-cache-read.*") - f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle. DEPRECATED: Use -store.index-cache-read.*") - cfg.memcacheClient.RegisterFlagsWithPrefix("index", f) + f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.") + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Deprecated: Use -store.index-cache-read.*; Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") + cfg.memcacheClient.RegisterFlagsWithPrefix("index", "Deprecated: Use -store.index-cache-read.*;", f) - cfg.indexCache.RegisterFlags(f) + cfg.indexCache.RegisterFlagsWithPrefix("store.index-cache-read", "Cache config for index entry reading. ", f) } // Opts makes the storage clients based on the configuration. From 11b7b47aa5ac2d28e6de40fcb03ee729c4fd94e5 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 4 Oct 2018 12:34:06 +0530 Subject: [PATCH 3/3] Review feedback. Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/cache/background.go | 9 ---- pkg/chunk/cache/cache.go | 19 ++------ pkg/chunk/cache/cache_test.go | 2 +- pkg/chunk/cache/diskcache.go | 4 -- pkg/chunk/cache/fifo_cache.go | 9 ---- pkg/chunk/cache/fifo_cache_test.go | 4 +- pkg/chunk/cache/memcached.go | 9 ---- pkg/chunk/cache/memcached_client.go | 9 ---- pkg/chunk/chunk_store.go | 12 ++--- pkg/chunk/chunk_store_test.go | 2 +- pkg/chunk/composite_store.go | 4 +- pkg/chunk/schema.go | 34 +++++-------- pkg/chunk/series_store.go | 48 ++++++++++++------- pkg/chunk/storage/caching_fixtures.go | 5 +- .../storage/caching_storage_client_test.go | 6 +-- pkg/chunk/storage/factory.go | 13 ++--- pkg/querier/frontend/results_cache.go | 2 +- 17 files changed, 71 insertions(+), 120 deletions(-) diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go index bbdd5799a0..648cc03461 100644 --- a/pkg/chunk/cache/background.go +++ b/pkg/chunk/cache/background.go @@ -30,17 +30,8 @@ type BackgroundConfig struct { WriteBackBuffer int } -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", "", f) -} - // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { - if prefix != "" { - prefix = prefix + "." - } - f.IntVar(&cfg.WriteBackGoroutines, prefix+"memcache.write-back-goroutines", 10, description+"How many goroutines to use to write back to memcache.") f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many chunks to buffer for background write back.") } diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index a202da37e4..1f230c515a 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -26,17 +26,13 @@ type Config struct { diskcache DiskcacheConfig fifocache FifoCacheConfig + // This is to name the cache metrics properly. prefix string // For tests to inject specific implementations. Cache Cache } -// RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", "", f) -} - // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) { cfg.background.RegisterFlagsWithPrefix(prefix, description, f) @@ -45,13 +41,9 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, description string, f cfg.diskcache.RegisterFlagsWithPrefix(prefix, description, f) cfg.fifocache.RegisterFlagsWithPrefix(prefix, description, f) - if prefix != "" { - prefix += "." - } - f.BoolVar(&cfg.EnableDiskcache, prefix+"cache.enable-diskcache", false, description+"Enable on-disk cache.") f.BoolVar(&cfg.EnableFifoCache, prefix+"cache.enable-fifocache", false, description+"Enable in-memory cache.") - f.DurationVar(&cfg.DefaultValidity, prefix+"cache.default-validity", 0, description+"The default validity of entries for caches unless overridden.") + f.DurationVar(&cfg.DefaultValidity, prefix+"default-validity", 0, description+"The default validity of entries for caches unless overridden.") cfg.prefix = prefix } @@ -65,16 +57,11 @@ func New(cfg Config) (Cache, error) { caches := []Cache{} if cfg.EnableFifoCache { - prefix := "" - if cfg.prefix != "" { - prefix = cfg.prefix - } - if cfg.fifocache.Validity == 0 && cfg.DefaultValidity != 0 { cfg.fifocache.Validity = cfg.DefaultValidity } - cache := NewFifoCache(prefix, cfg.fifocache) + cache := NewFifoCache(cfg.prefix+"fifocache", cfg.fifocache) caches = append(caches, Instrument(cfg.prefix+"fifocache", cache)) } diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index d96da9e5c5..706d340a68 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -167,7 +167,7 @@ func TestDiskcache(t *testing.T) { } func TestFifoCache(t *testing.T) { - cache := cache.NewFifoCache("test", cache.FifoCacheConfig{1e3, 1 * time.Hour}) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 1e3, Validity: 1 * time.Hour}) testCache(t, cache) } diff --git a/pkg/chunk/cache/diskcache.go b/pkg/chunk/cache/diskcache.go index 7414d2f412..2b84cb136a 100644 --- a/pkg/chunk/cache/diskcache.go +++ b/pkg/chunk/cache/diskcache.go @@ -65,10 +65,6 @@ func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) { // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *DiskcacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - if prefix != "" { - prefix += "." - } - f.StringVar(&cfg.Path, prefix+"diskcache.path", "/var/run/chunks", description+"Path to file used to cache chunks.") f.IntVar(&cfg.Size, prefix+"diskcache.size", 1024*1024*1024, description+"Size of file (bytes)") } diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go index a008f8f016..59c7d35b71 100644 --- a/pkg/chunk/cache/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -60,17 +60,8 @@ type FifoCacheConfig struct { Validity time.Duration } -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *FifoCacheConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", "", f) -} - // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *FifoCacheConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - if prefix != "" { - prefix += "." - } - f.IntVar(&cfg.Size, prefix+"fifocache.size", 0, description+"The number of entries to cache.") f.DurationVar(&cfg.Validity, prefix+"fifocache.duration", 0, description+"The expiry duration for the cache.") } diff --git a/pkg/chunk/cache/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go index 230107e112..b3461689d2 100644 --- a/pkg/chunk/cache/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -14,7 +14,7 @@ const size = 10 const overwrite = 5 func TestFifoCache(t *testing.T) { - c := NewFifoCache("test", FifoCacheConfig{size, 1 * time.Minute}) + c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 1 * time.Minute}) ctx := context.Background() // Check put / get works @@ -74,7 +74,7 @@ func TestFifoCache(t *testing.T) { } func TestFifoCacheExpiry(t *testing.T) { - c := NewFifoCache("test", FifoCacheConfig{size, 5 * time.Millisecond}) + c := NewFifoCache("test", FifoCacheConfig{Size: size, Validity: 5 * time.Millisecond}) ctx := context.Background() c.Put(ctx, []string{"0"}, []interface{}{0}) diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index 807e32663c..0fd1752a2f 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -39,17 +39,8 @@ type MemcachedConfig struct { Parallelism int } -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", "", f) -} - // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - if prefix != "" { - prefix += "." - } - f.DurationVar(&cfg.Expiration, prefix+"memcached.expiration", 0, description+"How long keys stay in the memcache.") f.IntVar(&cfg.BatchSize, prefix+"memcached.batchsize", 0, description+"How many keys to fetch in each batch.") f.IntVar(&cfg.Parallelism, prefix+"memcached.parallelism", 100, description+"Maximum active requests to memcache.") diff --git a/pkg/chunk/cache/memcached_client.go b/pkg/chunk/cache/memcached_client.go index 74e47430b8..8679330c01 100644 --- a/pkg/chunk/cache/memcached_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -39,17 +39,8 @@ type MemcachedClientConfig struct { UpdateInterval time.Duration } -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { - cfg.RegisterFlagsWithPrefix("", "", f) -} - // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet func (cfg *MemcachedClientConfig) RegisterFlagsWithPrefix(prefix, description string, f *flag.FlagSet) { - if prefix != "" { - prefix = prefix + "." - } - f.StringVar(&cfg.Host, prefix+"memcached.hostname", "", description+"Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") f.StringVar(&cfg.Service, prefix+"memcached.service", "memcached", description+"SRV service used to discover memcache servers.") f.DurationVar(&cfg.Timeout, prefix+"memcached.timeout", 100*time.Millisecond, description+"Maximum time to wait before giving up on memcached requests.") diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 7603503c59..f2ea07edce 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -51,7 +51,7 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - CacheConfig cache.Config + ChunkCacheConfig cache.Config MinChunkAge time.Duration QueryChunkLimit int @@ -59,14 +59,14 @@ type StoreConfig struct { CardinalityCacheValidity time.Duration CardinalityLimit int - EntryCache cache.Config + WriteDedupeCacheConfig cache.Config } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { - cfg.CacheConfig.RegisterFlags(f) + cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f) - cfg.EntryCache.RegisterFlagsWithPrefix("store.index-cache-write", "Cache config for index entry writing. ", f) + cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f) f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") @@ -82,12 +82,10 @@ type store struct { storage StorageClient schema Schema *Fetcher - - entryCache cache.Cache } func newStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.CacheConfig, storage) + fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, storage) if err != nil { return nil, err } diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 006ba7ab27..d04589b4a2 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -66,7 +66,7 @@ var stores = []struct { ) util.DefaultValues(&storeCfg, &schemaCfg) - storeCfg.EntryCache.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{ + storeCfg.WriteDedupeCacheConfig.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{ Size: 500, }) diff --git a/pkg/chunk/composite_store.go b/pkg/chunk/composite_store.go index 516d8e8f45..d4a2a5a3fa 100644 --- a/pkg/chunk/composite_store.go +++ b/pkg/chunk/composite_store.go @@ -123,11 +123,11 @@ func latest(a, b model.Time) model.Time { // NewStore creates a new Store which delegates to different stores depending // on time. func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storageOpts []StorageOpt) (Store, error) { - cache, err := cache.New(cfg.CacheConfig) + cache, err := cache.New(cfg.ChunkCacheConfig) if err != nil { return nil, err } - cfg.CacheConfig.Cache = cache + cfg.ChunkCacheConfig.Cache = cache schemaOpts := SchemaOpts(cfg, schemaCfg) diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 6a12a05474..d04ca94cba 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -1,8 +1,6 @@ package chunk import ( - "bytes" - "encoding/hex" "errors" "fmt" "strings" @@ -32,9 +30,10 @@ type Schema interface { // When doing a write, use this method to return the list of entries you should write to. GetWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) + // Should only be used with the seriesStore. TODO: Make seriesStore implement a different interface altogether. GetLabelWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) GetChunkWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) - GetLabelEntryCacheKey(from, through model.Time, userID string, labels model.Metric) []string + GetLabelEntryCacheKeys(from, through model.Time, userID string, labels model.Metric) []string // When doing a read, use these methods to return the list of entries you should query GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) @@ -171,6 +170,7 @@ func (s schema) GetLabelWriteEntries(from, through model.Time, userID string, me } return result, nil } + func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { var result []IndexEntry @@ -186,18 +186,18 @@ func (s schema) GetChunkWriteEntries(from, through model.Time, userID string, me } // Should only used for v9Schema -func (s schema) GetLabelEntryCacheKey(from, through model.Time, userID string, labels model.Metric) []string { +func (s schema) GetLabelEntryCacheKeys(from, through model.Time, userID string, labels model.Metric) []string { var result []string for _, bucket := range s.buckets(from, through, userID) { - key := bytes.Join([][]byte{ - []byte(bucket.tableName), - []byte(bucket.hashKey), - sha256bytes(labels.String()), + key := strings.Join([]string{ + bucket.tableName, + bucket.hashKey, + string(sha256bytes(labels.String())), }, - []byte("-"), + "-", ) - result = append(result, hex.EncodeToString(key)) + result = append(result, key) } return result @@ -591,19 +591,7 @@ type v9Entries struct { } func (e v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { - labelEntries, err := e.GetLabelWriteEntries(bucket, metricName, labels, chunkID) - if err != nil { - return nil, err - } - - chunkEntries, err := e.GetChunkWriteEntries(bucket, metricName, labels, chunkID) - if err != nil { - return nil, err - } - - entries := append(labelEntries, chunkEntries...) - - return entries, nil + return nil, ErrNotSupported } func (v9Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index b65311a012..718f0f11d4 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -2,6 +2,7 @@ package chunk import ( "context" + "encoding/hex" "fmt" "github.com/go-kit/kit/log/level" @@ -54,28 +55,33 @@ var ( type seriesStore struct { store cardinalityCache *cache.FifoCache + + writeDedupeCache cache.Cache } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { - fetcher, err := NewChunkFetcher(cfg.CacheConfig, storage) + fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, storage) if err != nil { return nil, err } - entryCache, err := cache.New(cfg.EntryCache) + writeDedupeCache, err := cache.New(cfg.WriteDedupeCacheConfig) if err != nil { return nil, err } return &seriesStore{ store: store{ - cfg: cfg, - storage: storage, - schema: schema, - Fetcher: fetcher, - entryCache: entryCache, + cfg: cfg, + storage: storage, + schema: schema, + Fetcher: fetcher, }, - cardinalityCache: cache.NewFifoCache("cardinality", cache.FifoCacheConfig{cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity}), + cardinalityCache: cache.NewFifoCache("cardinality", cache.FifoCacheConfig{ + Size: cfg.CardinalityCacheSize, + Validity: cfg.CardinalityCacheValidity, + }), + writeDedupeCache: writeDedupeCache, }, nil } @@ -328,7 +334,7 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun c.writeBackCache(ctx, chunks) - writeReqs, _, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunks[0]) + writeReqs, keysToCache, err := c.calculateIndexEntries(userID, from, through, chunks[0]) if err != nil { return err } @@ -338,27 +344,35 @@ func (c *seriesStore) PutOne(ctx context.Context, from, through model.Time, chun } bufs := make([][]byte, len(keysToCache)) - c.entryCache.Store(ctx, keysToCache, bufs) + c.writeDedupeCache.Store(ctx, keysToCache, bufs) return nil } // calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. -func (c *seriesStore) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, []IndexEntry, []string, error) { +func (c *seriesStore) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, []string, error) { seenIndexEntries := map[string]struct{}{} entries := []IndexEntry{} keysToCache := []string{} metricName, err := extract.MetricNameFromMetric(chunk.Metric) if err != nil { - return nil, nil, nil, err + return nil, nil, err + } + + keys := c.schema.GetLabelEntryCacheKeys(from, through, userID, chunk.Metric) + + cacheKeys := make([]string, 0, len(keys)) // Keys which translate to the strings stored in the cache. + for _, key := range keys { + // This is just encoding to remove invalid characters so that we can put them in memcache. + // We're not hashing them as the length of the key is well within memcache bounds. tableName + userid + day + 32Byte(seriesID) + cacheKeys = append(cacheKeys, hex.EncodeToString([]byte(key))) } - keys := c.schema.GetLabelEntryCacheKey(from, through, userID, chunk.Metric) - _, _, missing := c.entryCache.Fetch(context.Background(), keys) + _, _, missing := c.writeDedupeCache.Fetch(context.Background(), cacheKeys) if len(missing) != 0 { labelEntries, err := c.schema.GetLabelWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } entries = append(entries, labelEntries...) @@ -367,7 +381,7 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T chunkEntries, err := c.schema.GetChunkWriteEntries(from, through, userID, metricName, chunk.Metric, chunk.ExternalKey()) if err != nil { - return nil, nil, nil, err + return nil, nil, err } entries = append(entries, chunkEntries...) @@ -384,5 +398,5 @@ func (c *seriesStore) calculateIndexEntries(userID string, from, through model.T } } - return result, entries, keysToCache, nil + return result, keysToCache, nil } diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go index 0519d856b6..b4d490ec1e 100644 --- a/pkg/chunk/storage/caching_fixtures.go +++ b/pkg/chunk/storage/caching_fixtures.go @@ -17,7 +17,10 @@ type fixture struct { func (f fixture) Name() string { return "caching-store" } func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { storageClient, tableClient, schemaConfig, err := f.fixture.Clients() - client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{500, 5 * time.Minute}), 5*time.Minute) + client := newCachingStorageClient(storageClient, cache.NewFifoCache("index-fifo", cache.FifoCacheConfig{ + Size: 500, + Validity: 5 * time.Minute, + }), 5*time.Minute) return client, tableClient, schemaConfig, err } func (f fixture) Teardown() error { return f.fixture.Teardown() } diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index 8665acfb09..53612a9fa0 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -34,7 +34,7 @@ func TestCachingStorageClientBasic(t *testing.T) { }}, }, } - cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{{ TableName: "table", @@ -63,7 +63,7 @@ func TestCachingStorageClient(t *testing.T) { }}, }, } - cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{ {TableName: "table", HashValue: "foo"}, @@ -113,7 +113,7 @@ func TestCachingStorageClientCollision(t *testing.T) { }, }, } - cache := cache.NewFifoCache("test", cache.FifoCacheConfig{10, 10 * time.Second}) + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) client := newCachingStorageClient(store, cache, 1*time.Second) queries := []chunk.IndexQuery{ {TableName: "table", HashValue: "foo", RangeValuePrefix: []byte("bar")}, diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 24174325b8..bcc83d60a8 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -28,7 +28,7 @@ type Config struct { IndexCacheValidity time.Duration memcacheClient cache.MemcachedClientConfig - indexCache cache.Config + indexQueriesCacheConfig cache.Config } // RegisterFlags adds the flags required to configure this flag set. @@ -41,9 +41,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Deprecated flags!! f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.") f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Deprecated: Use -store.index-cache-read.*; Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") - cfg.memcacheClient.RegisterFlagsWithPrefix("index", "Deprecated: Use -store.index-cache-read.*;", f) + cfg.memcacheClient.RegisterFlagsWithPrefix("index.", "Deprecated: Use -store.index-cache-read.*;", f) - cfg.indexCache.RegisterFlagsWithPrefix("store.index-cache-read", "Cache config for index entry reading. ", f) + cfg.indexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) } // Opts makes the storage clients based on the configuration. @@ -54,7 +54,7 @@ func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) // Building up from deprecated flags. var caches []cache.Cache if cfg.IndexCacheSize > 0 { - fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{cfg.IndexCacheSize, cfg.IndexCacheValidity})) + fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize, Validity: cfg.IndexCacheValidity})) caches = append(caches, fifocache) } if cfg.memcacheClient.Host != "" { @@ -70,8 +70,9 @@ func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) if len(caches) > 0 { tieredCache = cache.NewTiered(caches) + cfg.indexQueriesCacheConfig.DefaultValidity = cfg.IndexCacheValidity } else { - tieredCache, err = cache.New(cfg.indexCache) + tieredCache, err = cache.New(cfg.indexQueriesCacheConfig) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func Opts(cfg Config, schemaCfg chunk.SchemaConfig) ([]chunk.StorageOpt, error) } for i := range opts { - opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.indexCache.DefaultValidity) + opts[i].Client = newCachingStorageClient(opts[i].Client, tieredCache, cfg.indexQueriesCacheConfig.DefaultValidity) } return opts, nil diff --git a/pkg/querier/frontend/results_cache.go b/pkg/querier/frontend/results_cache.go index ee902bcbec..7d73ff6de8 100644 --- a/pkg/querier/frontend/results_cache.go +++ b/pkg/querier/frontend/results_cache.go @@ -20,7 +20,7 @@ type resultsCacheConfig struct { } func (cfg *resultsCacheConfig) RegisterFlags(f *flag.FlagSet) { - cfg.cacheConfig.RegisterFlags(f) + cfg.cacheConfig.RegisterFlagsWithPrefix("", "", f) f.DurationVar(&cfg.MaxCacheFreshness, "frontend.max-cache-freshness", 1*time.Minute, "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.") }