Skip to content

Commit 17da42e

Browse files
committed
Skip index queries for high cardinality labels.
Firstly, cache the length of index rows we query (by the hash and range key). Secondly, fail for rows with > 100k, either because the cache told us so, or because we read them. Finally, allow matchers to fail on cardinality errors but proceed with the query (as long as at least one matcher succeeds), and then filter results. Notably, after this change, queries on two high-cardinality labels that would have results in a small number of series will fail. Signed-off-by: Tom Wilkie <[email protected]>
1 parent 5d50cd2 commit 17da42e

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

pkg/chunk/chunk_store.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,21 @@ func init() {
5252
type StoreConfig struct {
5353
CacheConfig cache.Config
5454

55-
MinChunkAge time.Duration
56-
QueryChunkLimit int
55+
MinChunkAge time.Duration
56+
QueryChunkLimit int
57+
CardinalityCacheSize int
58+
CardinalityCacheValidity time.Duration
59+
CardinalityLimit int
5760
}
5861

5962
// RegisterFlags adds the flags required to config this to the given FlagSet
6063
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
6164
cfg.CacheConfig.RegisterFlags(f)
6265
f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
6366
f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.")
67+
f.IntVar(&cfg.CardinalityCacheSize, "store.cardinality-cache-size", 0, "Size of in-memory cardinality cache, 0 to disable.")
68+
f.DurationVar(&cfg.CardinalityCacheValidity, "store.cardinality-cache-validity", 1*time.Hour, "Period for which entries in the cardinality cache are valid.")
69+
f.IntVar(&cfg.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
6470
}
6571

6672
// store implements Store

pkg/chunk/series_store.go

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package chunk
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"time"
68

79
"github.com/go-kit/kit/log/level"
810
"github.com/prometheus/common/model"
@@ -13,9 +15,14 @@ import (
1315
"github.com/weaveworks/cortex/pkg/util/extract"
1416
)
1517

18+
var (
19+
errCardinalityExceeded = errors.New("cardinality limit exceeded")
20+
)
21+
1622
// seriesStore implements Store
1723
type seriesStore struct {
1824
store
25+
cardinalityCache *fifoCache
1926
}
2027

2128
func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) {
@@ -31,6 +38,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor
3138
schema: schema,
3239
chunkFetcher: fetcher,
3340
},
41+
cardinalityCache: newFifoCache(cfg.CardinalityCacheSize),
3442
}, nil
3543
}
3644

@@ -49,15 +57,15 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
4957
}
5058

5159
// Ensure this query includes a metric name.
52-
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
60+
metricNameMatcher, allMatchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
5361
if !ok || metricNameMatcher.Type != labels.MatchEqual {
5462
return nil, fmt.Errorf("query must contain metric name")
5563
}
5664
level.Debug(log).Log("metric", metricNameMatcher.Value)
5765

5866
// Fetch the series IDs from the index, based on non-empty matchers from
5967
// the query.
60-
filters, matchers := util.SplitFiltersAndMatchers(matchers)
68+
_, matchers := util.SplitFiltersAndMatchers(allMatchers)
6169
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricNameMatcher.Value, matchers)
6270
if err != nil {
6371
return nil, err
@@ -94,7 +102,7 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
94102
}
95103

96104
// Filter out chunks based on the empty matchers in the query.
97-
filteredChunks := filterChunksByMatchers(allChunks, filters)
105+
filteredChunks := filterChunksByMatchers(allChunks, allMatchers)
98106
return filteredChunks, nil
99107
}
100108

@@ -124,6 +132,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
124132
// Receive chunkSets from all matchers
125133
var ids []string
126134
var lastErr error
135+
var cardinalityExceededErrors int
127136
for i := 0; i < len(matchers); i++ {
128137
select {
129138
case incoming := <-incomingIDs:
@@ -133,10 +142,16 @@ func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from
133142
ids = intersectStrings(ids, incoming)
134143
}
135144
case err := <-incomingErrors:
136-
lastErr = err
145+
if err == errCardinalityExceeded {
146+
cardinalityExceededErrors++
147+
} else {
148+
lastErr = err
149+
}
137150
}
138151
}
139-
if lastErr != nil {
152+
if cardinalityExceededErrors == len(matchers) {
153+
return nil, errCardinalityExceeded
154+
} else if lastErr != nil {
140155
return nil, lastErr
141156
}
142157

@@ -166,12 +181,32 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from,
166181
}
167182
level.Debug(log).Log("queries", len(queries))
168183

184+
for _, query := range queries {
185+
value, updated, ok := c.cardinalityCache.get(query.HashValue)
186+
if !ok {
187+
continue
188+
}
189+
entryAge := time.Now().Sub(updated)
190+
cardinality := value.(int)
191+
if entryAge < c.cfg.CardinalityCacheValidity && cardinality > c.cfg.CardinalityLimit {
192+
return nil, errCardinalityExceeded
193+
}
194+
}
195+
169196
entries, err := c.lookupEntriesByQueries(ctx, queries)
170197
if err != nil {
171198
return nil, err
172199
}
173200
level.Debug(log).Log("entries", len(entries))
174201

202+
// TODO This is not correct, will overcount for queries > 24hrs
203+
for _, query := range queries {
204+
c.cardinalityCache.put(query.HashValue, len(entries))
205+
}
206+
if len(entries) > c.cfg.CardinalityLimit {
207+
return nil, errCardinalityExceeded
208+
}
209+
175210
ids, err := c.parseIndexEntries(ctx, entries, matcher)
176211
if err != nil {
177212
return nil, err

0 commit comments

Comments
 (0)