Skip to content

Cache older index entries #1130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type StoreConfig struct {
CardinalityCacheSize int
CardinalityCacheValidity time.Duration
CardinalityLimit int

CacheLookupsOlderThan time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -73,6 +75,7 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.")
f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.")
f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.")
}

// store implements Store
Expand Down
3 changes: 3 additions & 0 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type IndexQuery struct {

// Filters for querying
ValueEqual []byte

// If the result of this lookup is immutable or not (for caching).
Immutable bool
}

// IndexEntry describes an entry in the chunk index
Expand Down
113 changes: 113 additions & 0 deletions pkg/chunk/schema_caching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package chunk

import (
"time"

"github.com/prometheus/common/model"
"github.com/weaveworks/common/mtime"
)

type schemaCaching struct {
Schema

cacheOlderThan time.Duration
}

func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName model.LabelValue) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetric(cFrom, cThrough, userID, metricName)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}

func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabel(cFrom, cThrough, userID, metricName, labelName)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}

func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(cFrom, cThrough, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}

// If the query resulted in series IDs, use this method to find chunks.
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
cFrom, cThrough, from, through := splitTimesByCacheability(from, through, model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix()))

cacheableQueries, err := s.Schema.GetChunksForSeries(cFrom, cThrough, userID, seriesID)
if err != nil {
return nil, err
}

activeQueries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}

return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}

func splitTimesByCacheability(from, through model.Time, cacheBefore model.Time) (model.Time, model.Time, model.Time, model.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really valuable in the presence of the caching front-end which will shard by day?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, if using the frontend, but the frontend is an optional component, I guess?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Plus I'd say yes it is - the caching frontend only matches exact querier, this will match individual labels, which is useful across multiple different queries.

if from.After(cacheBefore) {
return 0, 0, from, through
}

if through.Before(cacheBefore) {
return from, through, 0, 0
}

return from, cacheBefore, cacheBefore, through
}

func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery {
finalQueries := make([]IndexQuery, 0, len(cacheableQueries)+len(activeQueries))

Outer:
for _, cq := range cacheableQueries {
for _, aq := range activeQueries {
// When deduping, the bucket values only influence TableName and HashValue
// and just checking those is enough.
if cq.TableName == aq.TableName && cq.HashValue == aq.HashValue {
continue Outer
}
}

cq.Immutable = true
finalQueries = append(finalQueries, cq)
}

finalQueries = append(finalQueries, activeQueries...)

return finalQueries
}
75 changes: 75 additions & 0 deletions pkg/chunk/schema_caching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package chunk

import (
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/mtime"
)

func TestCachingSchema(t *testing.T) {
const (
userID = "userid"
periodicPrefix = "periodicPrefix"
)

dailyBuckets := makeSchema("v3")
schema := &schemaCaching{
Schema: dailyBuckets,
cacheOlderThan: 24 * time.Hour,
}

baseTime := time.Unix(0, 0)
baseTime = baseTime.Add(30*24*time.Hour - 1)

mtime.NowForce(baseTime)

for _, tc := range []struct {
from, through time.Time

cacheableIdx int
}{
{
// Completely cacheable.
baseTime.Add(-36 * time.Hour),
baseTime.Add(-25 * time.Hour),
0,
},
{
// Completely active.
baseTime.Add(-23 * time.Hour),
baseTime.Add(-2 * time.Hour),
-1,
},
{
// Mix of both but the cacheable entry is also active.
baseTime.Add(-36 * time.Hour),
baseTime.Add(-2 * time.Hour),
-1,
},
{
// Mix of both.
baseTime.Add(-50 * time.Hour),
baseTime.Add(-2 * time.Hour),
0,
},
} {
have, err := schema.GetReadQueriesForMetric(
model.TimeFromUnix(tc.from.Unix()), model.TimeFromUnix(tc.through.Unix()),
userID, model.LabelValue("foo"),
)
if err != nil {
t.Fatal(err)
}

for i := range have {
if i <= tc.cacheableIdx {
require.True(t, have[i].Immutable)
} else {
require.False(t, have[i].Immutable)
}
}
}
}
7 changes: 7 additions & 0 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks Ob
return nil, err
}

if cfg.CacheLookupsOlderThan != 0 {
schema = &schemaCaching{
Schema: schema,
cacheOlderThan: cfg.CacheLookupsOlderThan,
}
}

return &seriesStore{
store: store{
cfg: cfg,
Expand Down
12 changes: 10 additions & 2 deletions pkg/chunk/storage/caching_index_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,18 @@ func (s *cachingIndexClient) QueryPages(ctx context.Context, queries []chunk.Ind
TableName: queries[0].TableName,
HashValue: queries[0].HashValue,
})
results[key] = ReadBatch{

rb := ReadBatch{
Key: key,
Expiry: expiryTime.UnixNano(),
}

// If the query is cacheable forever, nil the expiry.
if queries[0].Immutable {
rb.Expiry = 0
}

results[key] = rb
}

err := s.IndexClient.QueryPages(ctx, cacheableMissed, func(cacheableQuery chunk.IndexQuery, r chunk.ReadBatch) bool {
Expand Down Expand Up @@ -232,7 +240,7 @@ func (s *cachingIndexClient) cacheFetch(ctx context.Context, keys []string) (bat

// Make sure the hash(key) is not a collision in the cache by looking at the
// key in the value.
if key != readBatch.Key || time.Now().After(time.Unix(0, readBatch.Expiry)) {
if key != readBatch.Key || (readBatch.Expiry != 0 && time.Now().After(time.Unix(0, readBatch.Expiry))) {
cacheCorruptErrs.Inc()
continue
}
Expand Down
74 changes: 72 additions & 2 deletions pkg/chunk/storage/caching_index_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestCachingStorageClientBasic(t *testing.T) {
assert.EqualValues(t, 1, store.queries)
}

func TestCachingStorageClient(t *testing.T) {
func TestTempCachingStorageClient(t *testing.T) {
store := &mockStore{
results: ReadBatch{
Entries: []Entry{{
Expand All @@ -64,7 +64,7 @@ func TestCachingStorageClient(t *testing.T) {
},
}
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second})
client := newCachingIndexClient(store, cache, 1*time.Second)
client := newCachingIndexClient(store, cache, 100*time.Millisecond)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo"},
{TableName: "table", HashValue: "bar"},
Expand Down Expand Up @@ -94,6 +94,76 @@ func TestCachingStorageClient(t *testing.T) {
require.NoError(t, err)
assert.EqualValues(t, len(queries), store.queries)
assert.EqualValues(t, len(queries), results)

// If we do the query after validity, it should see the queries.
time.Sleep(100 * time.Millisecond)
results = 0
err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
for iter.Next() {
results++
}
return true
})
require.NoError(t, err)
assert.EqualValues(t, 2*len(queries), store.queries)
assert.EqualValues(t, len(queries), results)
}

func TestPermCachingStorageClient(t *testing.T) {
store := &mockStore{
results: ReadBatch{
Entries: []Entry{{
Column: []byte("foo"),
Value: []byte("bar"),
}},
},
}
cache := cache.NewFifoCache("test", cache.FifoCacheConfig{Size: 10, Validity: 10 * time.Second})
client := newCachingIndexClient(store, cache, 100*time.Millisecond)
queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo", Immutable: true},
{TableName: "table", HashValue: "bar", Immutable: true},
{TableName: "table", HashValue: "baz", Immutable: true},
}
results := 0
err := client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
for iter.Next() {
results++
}
return true
})
require.NoError(t, err)
assert.EqualValues(t, len(queries), store.queries)
assert.EqualValues(t, len(queries), results)

// If we do the query to the cache again, the underlying store shouldn't see it.
results = 0
err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
for iter.Next() {
results++
}
return true
})
require.NoError(t, err)
assert.EqualValues(t, len(queries), store.queries)
assert.EqualValues(t, len(queries), results)

// If we do the query after validity, it still shouldn't see the queries.
time.Sleep(200 * time.Millisecond)
results = 0
err = client.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
for iter.Next() {
results++
}
return true
})
require.NoError(t, err)
assert.EqualValues(t, len(queries), store.queries)
assert.EqualValues(t, len(queries), results)
}

func TestCachingStorageClientEmptyResponse(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {

// Deprecated flags!!
f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Deprecated: Use -store.index-cache-read.*; Size of in-memory index cache, 0 to disable.")
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Deprecated: Use -store.index-cache-read.*; Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.")
cfg.memcacheClient.RegisterFlagsWithPrefix("index.", "Deprecated: Use -store.index-cache-read.*;", f)

cfg.indexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "Cache config for index entry reading. ", f)
f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is no longer used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how long we want to cache the active entries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see where it is used either

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is being used here: https://github.com/cortexproject/cortex/pull/1130/files#diff-d479a87a51735dca31797a0bc4af42caL95 to set the valid duration for caching mutable entries.

}

// NewStore makes the storage clients based on the configuration.
Expand All @@ -59,7 +59,7 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
// Building up from deprecated flags.
var caches []cache.Cache
if cfg.IndexCacheSize > 0 {
fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize, Validity: cfg.IndexCacheValidity}))
fifocache := cache.Instrument("fifo-index", cache.NewFifoCache("index", cache.FifoCacheConfig{Size: cfg.IndexCacheSize}))
caches = append(caches, fifocache)
}
if cfg.memcacheClient.Host != "" {
Expand All @@ -76,7 +76,6 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
var tieredCache cache.Cache
if len(caches) > 0 {
tieredCache = cache.NewTiered(caches)
cfg.indexQueriesCacheConfig.DefaultValidity = cfg.IndexCacheValidity
} else {
tieredCache, err = cache.New(cfg.indexQueriesCacheConfig)
if err != nil {
Expand Down