diff --git a/pkg/chunk/cache/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go new file mode 100644 index 00000000000..1eb5099ece8 --- /dev/null +++ b/pkg/chunk/cache/fifo_cache.go @@ -0,0 +1,204 @@ +package cache + +import ( + "context" + "sync" + "time" + + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + cacheEntriesAdded = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "added_total", + Help: "The total number of Put calls on the cache", + }, []string{"cache"}) + + cacheEntriesAddedNew = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "added_new_total", + Help: "The total number of new entries added to the cache", + }, []string{"cache"}) + + cacheEntriesEvicted = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "evicted_total", + Help: "The total number of evicted entries", + }, []string{"cache"}) + + cacheTotalGets = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "gets_total", + Help: "The total number of Get calls", + }, []string{"cache"}) + + cacheTotalMisses = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "misses_total", + Help: "The total number of Get calls that had no valid entry", + }, []string{"cache"}) + + cacheStaleGets = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "stale_gets_total", + Help: "The total number of Get calls that had an entry which expired", + }, []string{"cache"}) +) + +// FifoCache is a simple string -> interface{} cache which uses a fifo slide to +// manage evictions. O(1) inserts and updates, O(1) gets. +type FifoCache struct { + lock sync.RWMutex + size int + validity time.Duration + entries []cacheEntry + index map[string]int + + // indexes into entries to identify the most recent and least recent entry. + first, last int + + name string + entriesAdded prometheus.Counter + entriesAddedNew prometheus.Counter + entriesEvicted prometheus.Counter + totalGets prometheus.Counter + totalMisses prometheus.Counter + staleGets prometheus.Counter +} + +type cacheEntry struct { + updated time.Time + key string + value interface{} + prev, next int +} + +// NewFifoCache returns a new initialised FifoCache of size. +func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { + return &FifoCache{ + size: size, + validity: validity, + entries: make([]cacheEntry, 0, size), + index: make(map[string]int, size), + + name: name, + entriesAdded: cacheEntriesAdded.WithLabelValues(name), + entriesAddedNew: cacheEntriesAddedNew.WithLabelValues(name), + entriesEvicted: cacheEntriesEvicted.WithLabelValues(name), + totalGets: cacheTotalGets.WithLabelValues(name), + totalMisses: cacheTotalMisses.WithLabelValues(name), + staleGets: cacheStaleGets.WithLabelValues(name), + } +} + +// Put stores the value against the key. +func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) { + span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put") + defer span.Finish() + + c.entriesAdded.Inc() + if c.size == 0 { + return + } + + c.lock.Lock() + defer c.lock.Unlock() + + // See if we already have the entry + index, ok := c.index[key] + if ok { + entry := c.entries[index] + + entry.updated = time.Now() + entry.value = value + + // Remove this entry from the FIFO linked-list. + c.entries[entry.prev].next = entry.next + c.entries[entry.next].prev = entry.prev + + // Insert it at the beginning + entry.next = c.first + entry.prev = c.last + c.entries[entry.next].prev = index + c.entries[entry.prev].next = index + c.first = index + + c.entries[index] = entry + return + } + c.entriesAddedNew.Inc() + + // Otherwise, see if we need to evict an entry. + if len(c.entries) >= c.size { + c.entriesEvicted.Inc() + index = c.last + entry := c.entries[index] + + c.last = entry.prev + c.first = index + delete(c.index, entry.key) + c.index[key] = index + + entry.updated = time.Now() + entry.value = value + entry.key = key + c.entries[index] = entry + return + } + + // Finally, no hit and we have space. + index = len(c.entries) + c.entries = append(c.entries, cacheEntry{ + updated: time.Now(), + key: key, + value: value, + prev: c.last, + next: c.first, + }) + c.entries[c.first].prev = index + c.entries[c.last].next = index + c.first = index + c.index[key] = index +} + +// Get returns the stored value against the key and when the key was last updated. +func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { + span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get") + defer span.Finish() + + c.totalGets.Inc() + if c.size == 0 { + return nil, false + } + + c.lock.RLock() + defer c.lock.RUnlock() + + index, ok := c.index[key] + if ok { + updated := c.entries[index].updated + if time.Now().Sub(updated) < c.validity { + span.LogFields(otlog.Bool("hit", true)) + return c.entries[index].value, true + } + + c.totalMisses.Inc() + c.staleGets.Inc() + span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true)) + return nil, false + } + + span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false)) + c.totalMisses.Inc() + return nil, false +} diff --git a/pkg/chunk/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go similarity index 60% rename from pkg/chunk/fifo_cache_test.go rename to pkg/chunk/cache/fifo_cache_test.go index 1beae800a5d..038103deebf 100644 --- a/pkg/chunk/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -1,9 +1,11 @@ -package chunk +package cache import ( + "context" "fmt" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -12,56 +14,73 @@ const size = 10 const overwrite = 5 func TestFifoCache(t *testing.T) { - c := newFifoCache(size) + c := NewFifoCache("test", size, 1*time.Minute) + ctx := context.Background() // Check put / get works for i := 0; i < size; i++ { - c.put(strconv.Itoa(i), i) + c.Put(ctx, strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check evictions for i := size; i < size+overwrite; i++ { - c.put(strconv.Itoa(i), i) + c.Put(ctx, strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size-overwrite; i++ { - _, _, ok := c.get(strconv.Itoa(i)) + _, ok := c.Get(ctx, strconv.Itoa(i)) require.False(t, ok) } for i := size; i < size+overwrite; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check updates work for i := size; i < size+overwrite; i++ { - c.put(strconv.Itoa(i), i*2) + c.Put(ctx, strconv.Itoa(i), i*2) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := size; i < size+overwrite; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i*2, value.(int)) } } -func (c *fifoCache) print() { +func TestFifoCacheExpiry(t *testing.T) { + c := NewFifoCache("test", size, 5*time.Millisecond) + ctx := context.Background() + + c.Put(ctx, "0", 0) + + value, ok := c.Get(ctx, "0") + require.True(t, ok) + require.Equal(t, 0, value.(int)) + + // Expire the entry. + time.Sleep(5 * time.Millisecond) + _, ok = c.Get(ctx, strconv.Itoa(0)) + require.False(t, ok) +} + +func (c *FifoCache) print() { fmt.Println("first", c.first, "last", c.last) for i, entry := range c.entries { fmt.Printf(" %d -> key: %s, value: %v, next: %d, prev: %d\n", i, entry.key, entry.value, entry.next, entry.prev) diff --git a/pkg/chunk/fifo_cache.go b/pkg/chunk/fifo_cache.go deleted file mode 100644 index 3a0bb8526f5..00000000000 --- a/pkg/chunk/fifo_cache.go +++ /dev/null @@ -1,113 +0,0 @@ -package chunk - -import ( - "sync" - "time" -) - -// fifoCache is a simple string -> interface{} cache which uses a fifo slide to -// manage evictions. O(1) inserts and updates, O(1) gets. -type fifoCache struct { - lock sync.RWMutex - size int - entries []cacheEntry - index map[string]int - - // indexes into entries to identify the most recent and least recent entry. - first, last int -} - -type cacheEntry struct { - updated time.Time - key string - value interface{} - prev, next int -} - -func newFifoCache(size int) *fifoCache { - return &fifoCache{ - size: size, - entries: make([]cacheEntry, 0, size), - index: make(map[string]int, size), - } -} - -func (c *fifoCache) put(key string, value interface{}) { - if c.size == 0 { - return - } - - c.lock.Lock() - defer c.lock.Unlock() - - // See if we already have the entry - index, ok := c.index[key] - if ok { - entry := c.entries[index] - - entry.updated = time.Now() - entry.value = value - - // Remove this entry from the FIFO linked-list. - c.entries[entry.prev].next = entry.next - c.entries[entry.next].prev = entry.prev - - // Insert it at the beginning - entry.next = c.first - entry.prev = c.last - c.entries[entry.next].prev = index - c.entries[entry.prev].next = index - c.first = index - - c.entries[index] = entry - return - } - - // Otherwise, see if we need to evict an entry. - if len(c.entries) >= c.size { - index = c.last - entry := c.entries[index] - - c.last = entry.prev - c.first = index - delete(c.index, entry.key) - c.index[key] = index - - entry.updated = time.Now() - entry.value = value - entry.key = key - c.entries[index] = entry - return - } - - // Finally, no hit and we have space. - index = len(c.entries) - c.entries = append(c.entries, cacheEntry{ - updated: time.Now(), - key: key, - value: value, - prev: c.last, - next: c.first, - }) - c.entries[c.first].prev = index - c.entries[c.last].next = index - c.first = index - c.index[key] = index -} - -func (c *fifoCache) get(key string) (value interface{}, updated time.Time, ok bool) { - if c.size == 0 { - return - } - - c.lock.RLock() - defer c.lock.RUnlock() - - var index int - index, ok = c.index[key] - if ok { - value = c.entries[index].value - updated = c.entries[index].updated - } - return -} diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 3f007844330..77e7fe14504 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -4,13 +4,13 @@ import ( "context" "errors" "fmt" - "time" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/util" "github.com/weaveworks/cortex/pkg/util/extract" ) @@ -22,7 +22,7 @@ var ( // seriesStore implements Store type seriesStore struct { store - cardinalityCache *fifoCache + cardinalityCache *cache.FifoCache } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { @@ -38,7 +38,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor schema: schema, chunkFetcher: fetcher, }, - cardinalityCache: newFifoCache(cfg.CardinalityCacheSize), + cardinalityCache: cache.NewFifoCache("cardinality", cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity), }, nil } @@ -182,13 +182,12 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) for _, query := range queries { - value, updated, ok := c.cardinalityCache.get(query.HashValue) + value, ok := c.cardinalityCache.Get(ctx, query.HashValue) if !ok { continue } - entryAge := time.Now().Sub(updated) cardinality := value.(int) - if entryAge < c.cfg.CardinalityCacheValidity && cardinality > c.cfg.CardinalityLimit { + if cardinality > c.cfg.CardinalityLimit { return nil, errCardinalityExceeded } } @@ -201,7 +200,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, // TODO This is not correct, will overcount for queries > 24hrs for _, query := range queries { - c.cardinalityCache.put(query.HashValue, len(entries)) + c.cardinalityCache.Put(ctx, query.HashValue, len(entries)) } if len(entries) > c.cfg.CardinalityLimit { return nil, errCardinalityExceeded diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go new file mode 100644 index 00000000000..35b6bb0c469 --- /dev/null +++ b/pkg/chunk/storage/caching_fixtures.go @@ -0,0 +1,27 @@ +package storage + +import ( + "time" + + "github.com/weaveworks/cortex/pkg/chunk/gcp" + + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/testutils" +) + +type fixture struct { + fixture testutils.Fixture +} + +func (f fixture) Name() string { return "caching-store" } +func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { + storageClient, tableClient, schemaConfig, err := f.fixture.Clients() + client := newCachingStorageClient(storageClient, 500, 5*time.Minute) + return client, tableClient, schemaConfig, err +} +func (f fixture) Teardown() error { return f.fixture.Teardown() } + +// Fixtures for unit testing the caching storage. +var Fixtures = []testutils.Fixture{ + fixture{gcp.Fixtures[0]}, +} diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go new file mode 100644 index 00000000000..5b58078b2af --- /dev/null +++ b/pkg/chunk/storage/caching_storage_client.go @@ -0,0 +1,113 @@ +package storage + +import ( + "bytes" + "context" + "strings" + "time" + + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +type cachingStorageClient struct { + chunk.StorageClient + cache *cache.FifoCache + validity time.Duration +} + +func newCachingStorageClient(client chunk.StorageClient, size int, validity time.Duration) chunk.StorageClient { + if size == 0 { + return client + } + + return &cachingStorageClient{ + StorageClient: client, + cache: cache.NewFifoCache("index", size, validity), + } +} + +func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { + value, ok := s.cache.Get(ctx, queryKey(query)) + if ok { + batches := value.([]chunk.ReadBatch) + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) + + return nil + } + + batches := []chunk.ReadBatch{} + cacheableQuery := chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + } // Just reads the entire row and caches it. + + err := s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) + if err != nil { + return err + } + + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) + + s.cache.Put(ctx, queryKey(query), batches) + + return nil +} + +type readBatch []cell + +func (b readBatch) Len() int { return len(b) } +func (b readBatch) RangeValue(i int) []byte { return b[i].column } +func (b readBatch) Value(i int) []byte { return b[i].value } + +type cell struct { + column []byte + value []byte +} + +func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { + return func(result chunk.ReadBatch) bool { + *readBatches = append(*readBatches, result) + return true + } +} + +func queryKey(q chunk.IndexQuery) string { + const sep = "\xff" + return q.TableName + sep + q.HashValue +} + +func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { + filter := func([]byte, []byte) bool { return true } + + if len(query.RangeValuePrefix) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) + } + } + if len(query.RangeValueStart) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return string(rangeValue) >= string(query.RangeValueStart) + } + } + if len(query.ValueEqual) != 0 { + // This is on top of the existing filters. + existingFilter := filter + filter = func(rangeValue []byte, value []byte) bool { + return existingFilter(rangeValue, value) && bytes.Equal(value, query.ValueEqual) + } + } + + finalBatch := make(readBatch, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. + for _, batch := range batches { + for i := 0; i < batch.Len(); i++ { + if filter(batch.RangeValue(i), batch.Value(i)) { + finalBatch = append(finalBatch, cell{column: batch.RangeValue(i), value: batch.Value(i)}) + } + } + } + + return finalBatch +} diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index b48b0d2f502..eddd7f2055d 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "strings" + "time" "github.com/go-kit/kit/log/level" "github.com/weaveworks/cortex/pkg/chunk" @@ -20,6 +21,9 @@ type Config struct { AWSStorageConfig aws.StorageConfig GCPStorageConfig gcp.Config CassandraStorageConfig cassandra.Config + + IndexCacheSize int + IndexCacheValidity time.Duration } // RegisterFlags adds the flags required to configure this flag set. @@ -28,26 +32,32 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.AWSStorageConfig.RegisterFlags(f) cfg.GCPStorageConfig.RegisterFlags(f) cfg.CassandraStorageConfig.RegisterFlags(f) + + f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") } // NewStorageClient makes a storage client based on the configuration. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { +func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.StorageClient, err error) { switch cfg.StorageClient { case "inmemory": - return chunk.NewMockStorage(), nil + client, err = chunk.NewMockStorage(), nil case "aws": path := strings.TrimPrefix(cfg.AWSStorageConfig.DynamoDB.URL.Path, "/") if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) + client, err = aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) case "gcp": - return gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) + client, err = gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) + client, err = cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) default: return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) } + + client = newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity) + return } // NewTableClient makes a new table client based on the configuration. diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index b6cdc0f172c..91d8532bb96 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -104,10 +104,10 @@ func TestQueryPages(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - query chunk.IndexQuery - provisionedErr int - want []chunk.IndexEntry + name string + query chunk.IndexQuery + repeat bool + want []chunk.IndexEntry }{ { "check HashValue only", @@ -115,7 +115,7 @@ func TestQueryPages(t *testing.T) { TableName: tableName, HashValue: "flip", }, - 0, + false, []chunk.IndexEntry{entries[5], entries[6], entries[7]}, }, { @@ -125,7 +125,7 @@ func TestQueryPages(t *testing.T) { HashValue: "foo", RangeValueStart: []byte("bar:2"), }, - 0, + false, []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, }, { @@ -135,7 +135,7 @@ func TestQueryPages(t *testing.T) { HashValue: "foo", RangeValuePrefix: []byte("baz:"), }, - 0, + false, []chunk.IndexEntry{entries[3], entries[4]}, }, { @@ -146,7 +146,7 @@ func TestQueryPages(t *testing.T) { RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), }, - 0, + false, []chunk.IndexEntry{entries[1]}, }, { @@ -157,27 +157,36 @@ func TestQueryPages(t *testing.T) { RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), }, - 2, + true, []chunk.IndexEntry{entries[1]}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { - for i := 0; i < read.Len(); i++ { - have = append(have, chunk.IndexEntry{ - TableName: tt.query.TableName, - HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), - }) + run := true + for run { + var have []chunk.IndexEntry + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, chunk.IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return true + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + + if tt.repeat { + tt.repeat = false + } else { + run = false } - return true - }) - require.NoError(t, err) - require.Equal(t, tt.want, have) + } }) } }) diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go index 0dda70ab42b..c1500f655c9 100644 --- a/pkg/chunk/storage/utils_test.go +++ b/pkg/chunk/storage/utils_test.go @@ -20,6 +20,7 @@ type storageClientTest func(*testing.T, chunk.StorageClient) func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { fixtures := append(aws.Fixtures, gcp.Fixtures...) + fixtures = append(fixtures, Fixtures...) cassandraFixtures, err := cassandra.Fixtures() require.NoError(t, err)