From a34e39f0c286e2061b1616a4054bec441f59136a Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 26 Nov 2018 16:06:42 +0530 Subject: [PATCH 1/3] Cache older index entries Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/chunk_store.go | 3 + pkg/chunk/schema.go | 3 + pkg/chunk/schema_caching.go | 112 ++++++++++++++++++++ pkg/chunk/series_store.go | 10 ++ pkg/chunk/storage/caching_storage_client.go | 12 ++- pkg/chunk/storage/factory.go | 5 +- 6 files changed, 140 insertions(+), 5 deletions(-) create mode 100644 pkg/chunk/schema_caching.go diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 3290bfcf41..f526deeef7 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -61,6 +61,8 @@ type StoreConfig struct { CardinalityCacheSize int CardinalityCacheValidity time.Duration CardinalityLimit int + + CacheLookupsOlderThan time.Duration } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -73,6 +75,7 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.") f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.") f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") + f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.") } // store implements Store diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index c326137b23..a0ad0804b7 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -58,6 +58,9 @@ type IndexQuery struct { // Filters for querying ValueEqual []byte + + // If the result of this lookup can be cached or not. + Cacheable bool } // IndexEntry describes an entry in the chunk index diff --git a/pkg/chunk/schema_caching.go b/pkg/chunk/schema_caching.go new file mode 100644 index 0000000000..d0851c2667 --- /dev/null +++ b/pkg/chunk/schema_caching.go @@ -0,0 +1,112 @@ +package chunk + +import ( + "time" + + "github.com/prometheus/common/model" +) + +type cachingSchema struct { + Schema + + cacheOlderThan time.Duration +} + +func (s *cachingSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + + cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName) + if err != nil { + return nil, err + } + + activeQueries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName) + if err != nil { + return nil, err + } + + return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil +} + +func (s *cachingSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + + cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName) + if err != nil { + return nil, err + } + + activeQueries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) + if err != nil { + return nil, err + } + + return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil +} + +func (s *cachingSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + + cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue) + if err != nil { + return nil, err + } + + activeQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue) + if err != nil { + return nil, err + } + + return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil +} + +// If the query resulted in series IDs, use this method to find chunks. +func (s *cachingSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + + cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID) + if err != nil { + return nil, err + } + + activeQueries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID) + if err != nil { + return nil, err + } + + return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil +} + +func splitTimesByCacheability(from, through model.Time, cacheBefore model.Time) (model.Time, model.Time, model.Time, model.Time) { + if from.After(cacheBefore) { + return 0, 0, from, through + } + + if through.Before(cacheBefore) { + return from, through, 0, 0 + } + + return from, cacheBefore, cacheBefore, through +} + +func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery { + finalQueries := make([]IndexQuery, 0, len(cacheableQueries)+len(activeQueries)) + +Outer: + for _, cq := range cacheableQueries { + for _, aq := range activeQueries { + // When deduping, the bucket values only influence TableName and HashValue + // and just checking those is enough. + if cq.TableName == aq.TableName && cq.HashValue == aq.HashValue { + continue Outer + } + } + + cq.Cacheable = true + finalQueries = append(finalQueries, cq) + } + + finalQueries = append(finalQueries, activeQueries...) + + return finalQueries +} diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index ac0612af2a..6ee08104a8 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "net/http" + "time" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -60,6 +61,8 @@ type seriesStore struct { cardinalityCache *cache.FifoCache writeDedupeCache cache.Cache + + cacheLookupsOlderThan time.Duration } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient, limits *validation.Overrides) (Store, error) { @@ -73,6 +76,13 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient, limit return nil, err } + if cfg.CacheLookupsOlderThan != 0 { + schema = &cachingSchema{ + Schema: schema, + cacheOlderThan: cfg.CacheLookupsOlderThan, + } + } + return &seriesStore{ store: store{ cfg: cfg, diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index cc09945b81..742fa798eb 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -100,10 +100,18 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I TableName: queries[0].TableName, HashValue: queries[0].HashValue, }) - results[key] = ReadBatch{ + + rb := ReadBatch{ Key: key, Expiry: expiryTime.UnixNano(), } + + // If the query is cacheable forever, nil the expiry. + if queries[0].Cacheable { + rb.Expiry = 0 + } + + results[key] = rb } err := s.StorageClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool { @@ -232,7 +240,7 @@ func (s *cachingStorageClient) cacheFetch(ctx context.Context, keys []string) (b // Make sure the hash(key) is not a collision in the cache by looking at the // key in the value. - if key != readBatch.Key || time.Now().After(time.Unix(0, readBatch.Expiry)) { + if key != readBatch.Key || (readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry))) { cacheCorruptErrs.Inc() continue } diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 499bdc9caa..e45bc0105d 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -39,10 +39,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Deprecated flags!! f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.") - f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Deprecated: Use -store.index-cache-read.*; Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") cfg.memcacheClient.RegisterFlagsWithPrefix("index.", "Deprecated: Use -store.index-cache-read.*;", f) cfg.indexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f) + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") } // NewStore makes the storage clients based on the configuration. @@ -52,7 +52,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf // Building up from deprecated flags. var caches []cache.Cache if cfg.IndexCacheSize > 0 { - fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize, Validity: cfg.IndexCacheValidity})) + fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize})) caches = append(caches, fifocache) } if cfg.memcacheClient.Host != "" { @@ -69,7 +69,6 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf var tieredCache cache.Cache if len(caches) > 0 { tieredCache = cache.NewTiered(caches) - cfg.indexQueriesCacheConfig.DefaultValidity = cfg.IndexCacheValidity } else { tieredCache, err = cache.New(cfg.indexQueriesCacheConfig) if err != nil { From 150ca6f554f7979d01f1107fc773f123e1d30ac3 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 26 Nov 2018 21:54:55 +0530 Subject: [PATCH 2/3] Add tests for the caching of old entries Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/schema_caching.go | 9 ++- pkg/chunk/schema_caching_test.go | 75 +++++++++++++++++++ .../storage/caching_storage_client_test.go | 74 +++++++++++++++++- 3 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 pkg/chunk/schema_caching_test.go diff --git a/pkg/chunk/schema_caching.go b/pkg/chunk/schema_caching.go index d0851c2667..e5f181d334 100644 --- a/pkg/chunk/schema_caching.go +++ b/pkg/chunk/schema_caching.go @@ -4,6 +4,7 @@ import ( "time" "github.com/prometheus/common/model" + "github.com/weaveworks/common/mtime" ) type cachingSchema struct { @@ -13,7 +14,7 @@ type cachingSchema struct { } func (s *cachingSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { - cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName) if err != nil { @@ -29,7 +30,7 @@ func (s *cachingSchema) GetReadQueriesForMetric(from, through model.Time, userID } func (s *cachingSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { - cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName) if err != nil { @@ -45,7 +46,7 @@ func (s *cachingSchema) GetReadQueriesForMetricLabel(from, through model.Time, u } func (s *cachingSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { - cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue) if err != nil { @@ -62,7 +63,7 @@ func (s *cachingSchema) GetReadQueriesForMetricLabelValue(from, through model.Ti // If the query resulted in series IDs, use this method to find chunks. func (s *cachingSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { - cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.Now().Add(-s.cacheOlderThan)) + cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID) if err != nil { diff --git a/pkg/chunk/schema_caching_test.go b/pkg/chunk/schema_caching_test.go new file mode 100644 index 0000000000..08f23ac421 --- /dev/null +++ b/pkg/chunk/schema_caching_test.go @@ -0,0 +1,75 @@ +package chunk + +import ( + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/mtime" +) + +func TestCachingSchema(t *testing.T) { + const ( + userID = "userid" + periodicPrefix = "periodicPrefix" + ) + + dailyBuckets := makeSchema("v3") + schema := &cachingSchema{ + Schema: dailyBuckets, + cacheOlderThan: 24 * time.Hour, + } + + baseTime := time.Unix(0, 0) + baseTime = baseTime.Add(30*24*time.Hour - 1) + + mtime.NowForce(baseTime) + + for _, tc := range []struct { + from, through time.Time + + cacheableIdx int + }{ + { + // Completely cacheable. + baseTime.Add(-36 * time.Hour), + baseTime.Add(-25 * time.Hour), + 0, + }, + { + // Completely active. + baseTime.Add(-23 * time.Hour), + baseTime.Add(-2 * time.Hour), + -1, + }, + { + // Mix of both but the cacheable entry is also active. + baseTime.Add(-36 * time.Hour), + baseTime.Add(-2 * time.Hour), + -1, + }, + { + // Mix of both. + baseTime.Add(-50 * time.Hour), + baseTime.Add(-2 * time.Hour), + 0, + }, + } { + have, err := schema.GetReadQueriesForMetric( + model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()), + userID, model.LabelValue("foo"), + ) + if err != nil { + t.Fatal(err) + } + + for i := range have { + if i <= tc.cacheableIdx { + require.True(t, have[i].Cacheable) + } else { + require.False(t, have[i].Cacheable) + } + } + } +} diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index a87e9910ff..71d81ba4d9 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -54,7 +54,7 @@ func TestCachingStorageClientBasic(t *testing.T) { assert.EqualValues(t, 1, store.queries) } -func TestCachingStorageClient(t *testing.T) { +func TestTempCachingStorageClient(t *testing.T) { store := &mockStore{ results: ReadBatch{ Entries: []Entry{{ @@ -64,7 +64,7 @@ func TestCachingStorageClient(t *testing.T) { }, } cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) - client := newCachingStorageClient(store, cache, 1*time.Second) + client := newCachingStorageClient(store, cache, 100*time.Millisecond) queries := []chunk.IndexQuery{ {TableName: "table", HashValue: "foo"}, {TableName: "table", HashValue: "bar"}, @@ -94,6 +94,76 @@ func TestCachingStorageClient(t *testing.T) { require.NoError(t, err) assert.EqualValues(t, len(queries), store.queries) assert.EqualValues(t, len(queries), results) + + // If we do the query after validity, it should see the queries. + time.Sleep(100 * time.Millisecond) + results = 0 + err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, 2*len(queries), store.queries) + assert.EqualValues(t, len(queries), results) +} + +func TestPermCachingStorageClient(t *testing.T) { + store := &mockStore{ + results: ReadBatch{ + Entries: []Entry{{ + Column: []byte("foo"), + Value: []byte("bar"), + }}, + }, + } + cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) + client := newCachingStorageClient(store, cache, 100*time.Millisecond) + queries := []chunk.IndexQuery{ + {TableName: "table", HashValue: "foo", Cacheable: true}, + {TableName: "table", HashValue: "bar", Cacheable: true}, + {TableName: "table", HashValue: "baz", Cacheable: true}, + } + results := 0 + err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, len(queries), store.queries) + assert.EqualValues(t, len(queries), results) + + // If we do the query to the cache again, the underlying store shouldn't see it. + results = 0 + err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, len(queries), store.queries) + assert.EqualValues(t, len(queries), results) + + // If we do the query after validity, it still shouldn't see the queries. + time.Sleep(200 * time.Millisecond) + results = 0 + err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool { + iter := batch.Iterator() + for iter.Next() { + results++ + } + return true + }) + require.NoError(t, err) + assert.EqualValues(t, len(queries), store.queries) + assert.EqualValues(t, len(queries), results) } func TestCachingStorageClientEmptyResponse(t *testing.T) { From b254144a63dbb2150463319235c172869766405c Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Mon, 7 Jan 2019 17:54:04 +0530 Subject: [PATCH 3/3] Address review feedback Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/schema.go | 4 ++-- pkg/chunk/schema_caching.go | 12 ++++++------ pkg/chunk/schema_caching_test.go | 6 +++--- pkg/chunk/series_store.go | 5 +---- pkg/chunk/storage/caching_storage_client.go | 2 +- pkg/chunk/storage/caching_storage_client_test.go | 6 +++--- 6 files changed, 16 insertions(+), 19 deletions(-) diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index a0ad0804b7..37033156fd 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -59,8 +59,8 @@ type IndexQuery struct { // Filters for querying ValueEqual []byte - // If the result of this lookup can be cached or not. - Cacheable bool + // If the result of this lookup is immutable or not (for caching). + Immutable bool } // IndexEntry describes an entry in the chunk index diff --git a/pkg/chunk/schema_caching.go b/pkg/chunk/schema_caching.go index e5f181d334..2444b0c458 100644 --- a/pkg/chunk/schema_caching.go +++ b/pkg/chunk/schema_caching.go @@ -7,13 +7,13 @@ import ( "github.com/weaveworks/common/mtime" ) -type cachingSchema struct { +type schemaCaching struct { Schema cacheOlderThan time.Duration } -func (s *cachingSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { +func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) { cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName) @@ -29,7 +29,7 @@ func (s *cachingSchema) GetReadQueriesForMetric(from, through model.Time, userID return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil } -func (s *cachingSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { +func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) { cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName) @@ -45,7 +45,7 @@ func (s *cachingSchema) GetReadQueriesForMetricLabel(from, through model.Time, u return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil } -func (s *cachingSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { +func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) { cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue) @@ -62,7 +62,7 @@ func (s *cachingSchema) GetReadQueriesForMetricLabelValue(from, through model.Ti } // If the query resulted in series IDs, use this method to find chunks. -func (s *cachingSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { +func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())) cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID) @@ -103,7 +103,7 @@ Outer: } } - cq.Cacheable = true + cq.Immutable = true finalQueries = append(finalQueries, cq) } diff --git a/pkg/chunk/schema_caching_test.go b/pkg/chunk/schema_caching_test.go index 08f23ac421..f197ba4208 100644 --- a/pkg/chunk/schema_caching_test.go +++ b/pkg/chunk/schema_caching_test.go @@ -16,7 +16,7 @@ func TestCachingSchema(t *testing.T) { ) dailyBuckets := makeSchema("v3") - schema := &cachingSchema{ + schema := &schemaCaching{ Schema: dailyBuckets, cacheOlderThan: 24 * time.Hour, } @@ -66,9 +66,9 @@ func TestCachingSchema(t *testing.T) { for i := range have { if i <= tc.cacheableIdx { - require.True(t, have[i].Cacheable) + require.True(t, have[i].Immutable) } else { - require.False(t, have[i].Cacheable) + require.False(t, have[i].Immutable) } } } diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 6ee08104a8..49aeb9a915 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -5,7 +5,6 @@ import ( "encoding/hex" "fmt" "net/http" - "time" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" @@ -61,8 +60,6 @@ type seriesStore struct { cardinalityCache *cache.FifoCache writeDedupeCache cache.Cache - - cacheLookupsOlderThan time.Duration } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient, limits *validation.Overrides) (Store, error) { @@ -77,7 +74,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient, limit } if cfg.CacheLookupsOlderThan != 0 { - schema = &cachingSchema{ + schema = &schemaCaching{ Schema: schema, cacheOlderThan: cfg.CacheLookupsOlderThan, } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 742fa798eb..410a94e63a 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -107,7 +107,7 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, queries []chunk.I } // If the query is cacheable forever, nil the expiry. - if queries[0].Cacheable { + if queries[0].Immutable { rb.Expiry = 0 } diff --git a/pkg/chunk/storage/caching_storage_client_test.go b/pkg/chunk/storage/caching_storage_client_test.go index 71d81ba4d9..6d3689ba53 100644 --- a/pkg/chunk/storage/caching_storage_client_test.go +++ b/pkg/chunk/storage/caching_storage_client_test.go @@ -122,9 +122,9 @@ func TestPermCachingStorageClient(t *testing.T) { cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second}) client := newCachingStorageClient(store, cache, 100*time.Millisecond) queries := []chunk.IndexQuery{ - {TableName: "table", HashValue: "foo", Cacheable: true}, - {TableName: "table", HashValue: "bar", Cacheable: true}, - {TableName: "table", HashValue: "baz", Cacheable: true}, + {TableName: "table", HashValue: "foo", Immutable: true}, + {TableName: "table", HashValue: "bar", Immutable: true}, + {TableName: "table", HashValue: "baz", Immutable: true}, } results := 0 err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {