Skip to content

Commit 00566f6

Browse files
pracuccigouthamve
authored andcommitted
Add global limit to the max series per user and metric (#1760)
* Add global limit to the max series per user and metric: - `-ingester.max-global-series-per-user` - `-ingester.max-global-series-per-metric` Signed-off-by: Marco Pracucci <[email protected]> * Commented ReadRingMock struct and functions Signed-off-by: Marco Pracucci <[email protected]> * Vendored new testify dependencies used for mocking Signed-off-by: Marco Pracucci <[email protected]> * Fixed data race in SeriesLimiter test Signed-off-by: Marco Pracucci <[email protected]> * Add HealthyIngestersCount() to the ring lifecycler, to avoid having to create a new ring client for the ingesters Signed-off-by: Marco Pracucci <[email protected]> * Converted string error const into error instance var in limits.go Signed-off-by: Marco Pracucci <[email protected]> * Added PR number reference to changelog Signed-off-by: Marco Pracucci <[email protected]> * Do not update ring lifecycler counters unless the CAS operation succeeded Signed-off-by: Marco Pracucci <[email protected]> * Fixed nil pointer dereference in the ring lifecycler when updating counters and the ring desc is nil Signed-off-by: Marco Pracucci <[email protected]> * Reduce to the minimum the block of code wrapped by mutex in the ring's lifecycler updateCounters() Signed-off-by: Marco Pracucci <[email protected]> * Added a comment to explain what userStates struct is Signed-off-by: Marco Pracucci <[email protected]> * Renamed ring's HealthyIngestersCount() to HealthyInstancesCount() to make it more generic Signed-off-by: Marco Pracucci <[email protected]> * Fixes after rebasing with master Signed-off-by: Marco Pracucci <[email protected]>
1 parent 9e9a183 commit 00566f6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+5561
-67
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
66
* [CHANGE] Remove direct DB/API access from the ruler
77
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
8+
* [FEATURE] Global limit on the max series per user and metric #1760
9+
* `-ingester.max-global-series-per-user`
10+
* `-ingester.max-global-series-per-metric`
811
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
912
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
1013
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,7 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
582582
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
583583
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
584584
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
585+
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
585586
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
586587
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
587588
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

pkg/cortex/cortex.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,14 +125,15 @@ func (c *Config) Validate() error {
125125
if err := c.Encoding.Validate(); err != nil {
126126
return errors.Wrap(err, "invalid encoding config")
127127
}
128-
129128
if err := c.Storage.Validate(); err != nil {
130129
return errors.Wrap(err, "invalid storage config")
131130
}
132-
133131
if err := c.TSDB.Validate(); err != nil {
134132
return errors.Wrap(err, "invalid TSDB config")
135133
}
134+
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
135+
return errors.Wrap(err, "invalid limits config")
136+
}
136137
return nil
137138
}
138139

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
250250
cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
251251
cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB
252252
cfg.Ingester.TSDBConfig = cfg.TSDB
253+
cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels
253254

254255
t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
255256
if err != nil {

pkg/ingester/ingester.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,10 @@ type Config struct {
121121
TSDBEnabled bool `yaml:"-"`
122122
TSDBConfig tsdb.Config `yaml:"-"`
123123

124+
// Injected at runtime and read from the distributor config, required
125+
// to accurately apply global limits.
126+
ShardByAllLabels bool `yaml:"-"`
127+
124128
// For testing, you can override the address and ID of this ingester.
125129
ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error)
126130
}
@@ -152,6 +156,7 @@ type Ingester struct {
152156
chunkStore ChunkStore
153157
lifecycler *ring.Lifecycler
154158
limits *validation.Overrides
159+
limiter *SeriesLimiter
155160

156161
quit chan struct{}
157162
done sync.WaitGroup
@@ -190,15 +195,11 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
190195
i := &Ingester{
191196
cfg: cfg,
192197
clientConfig: clientConfig,
193-
194-
metrics: newIngesterMetrics(registerer),
195-
196-
limits: limits,
197-
chunkStore: chunkStore,
198-
userStates: newUserStates(limits, cfg),
199-
200-
quit: make(chan struct{}),
201-
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
198+
metrics: newIngesterMetrics(registerer),
199+
limits: limits,
200+
chunkStore: chunkStore,
201+
quit: make(chan struct{}),
202+
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
202203
}
203204

204205
var err error
@@ -207,6 +208,13 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
207208
return nil, err
208209
}
209210

211+
// Init the limter and instantiate the user states which depend on it
212+
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
213+
i.userStates = newUserStates(i.limiter, cfg)
214+
215+
// Now that user states have been created, we can start the lifecycler
216+
i.lifecycler.Start()
217+
210218
i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
211219
for j := 0; j < cfg.ConcurrentFlushes; j++ {
212220
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)

pkg/ingester/ingester_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func TestIngesterAppendBlankLabel(t *testing.T) {
357357

358358
func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
359359
limits := defaultLimitsTestConfig()
360-
limits.MaxSeriesPerUser = 1
360+
limits.MaxLocalSeriesPerUser = 1
361361

362362
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
363363
defer ing.Shutdown()
@@ -414,7 +414,7 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
414414

415415
func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
416416
limits := defaultLimitsTestConfig()
417-
limits.MaxSeriesPerMetric = 1
417+
limits.MaxLocalSeriesPerMetric = 1
418418

419419
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
420420
defer ing.Shutdown()

pkg/ingester/ingester_v2.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
4949
i := &Ingester{
5050
cfg: cfg,
5151
clientConfig: clientConfig,
52-
53-
metrics: newIngesterMetrics(registerer),
54-
55-
limits: limits,
56-
chunkStore: chunkStore,
57-
userStates: newUserStates(limits, cfg),
58-
quit: make(chan struct{}),
52+
metrics: newIngesterMetrics(registerer),
53+
limits: limits,
54+
chunkStore: chunkStore,
55+
quit: make(chan struct{}),
5956

6057
TSDBState: TSDBState{
6158
dbs: make(map[string]*tsdb.DB),
@@ -68,6 +65,13 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
6865
return nil, err
6966
}
7067

68+
// Init the limter and instantiate the user states which depend on it
69+
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
70+
i.userStates = newUserStates(i.limiter, cfg)
71+
72+
// Now that user states have been created, we can start the lifecycler
73+
i.lifecycler.Start()
74+
7175
return i, nil
7276
}
7377

pkg/ingester/limiter.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package ingester
2+
3+
import (
4+
"fmt"
5+
"math"
6+
7+
"github.com/cortexproject/cortex/pkg/util/validation"
8+
)
9+
10+
const (
11+
errMaxSeriesPerMetricLimitExceeded = "per-metric series limit (local: %d global: %d actual local: %d) exceeded"
12+
errMaxSeriesPerUserLimitExceeded = "per-user series limit (local: %d global: %d actual local: %d) exceeded"
13+
)
14+
15+
// RingCount is the interface exposed by a ring implementation which allows
16+
// to count members
17+
type RingCount interface {
18+
HealthyInstancesCount() int
19+
}
20+
21+
// SeriesLimiter implements primitives to get the maximum number of series
22+
// an ingester can handle for a specific tenant
23+
type SeriesLimiter struct {
24+
limits *validation.Overrides
25+
ring RingCount
26+
replicationFactor int
27+
shardByAllLabels bool
28+
}
29+
30+
// NewSeriesLimiter makes a new in-memory series limiter
31+
func NewSeriesLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int, shardByAllLabels bool) *SeriesLimiter {
32+
return &SeriesLimiter{
33+
limits: limits,
34+
ring: ring,
35+
replicationFactor: replicationFactor,
36+
shardByAllLabels: shardByAllLabels,
37+
}
38+
}
39+
40+
// AssertMaxSeriesPerMetric limit has not been reached compared to the current
41+
// number of series in input and returns an error if so.
42+
func (l *SeriesLimiter) AssertMaxSeriesPerMetric(userID string, series int) error {
43+
actualLimit := l.maxSeriesPerMetric(userID)
44+
if series < actualLimit {
45+
return nil
46+
}
47+
48+
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
49+
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)
50+
51+
return fmt.Errorf(errMaxSeriesPerMetricLimitExceeded, localLimit, globalLimit, actualLimit)
52+
}
53+
54+
// AssertMaxSeriesPerUser limit has not been reached compared to the current
55+
// number of series in input and returns an error if so.
56+
func (l *SeriesLimiter) AssertMaxSeriesPerUser(userID string, series int) error {
57+
actualLimit := l.maxSeriesPerUser(userID)
58+
if series < actualLimit {
59+
return nil
60+
}
61+
62+
localLimit := l.limits.MaxLocalSeriesPerUser(userID)
63+
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)
64+
65+
return fmt.Errorf(errMaxSeriesPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
66+
}
67+
68+
// MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
69+
func (l *SeriesLimiter) MaxSeriesPerQuery(userID string) int {
70+
return l.limits.MaxSeriesPerQuery(userID)
71+
}
72+
73+
func (l *SeriesLimiter) maxSeriesPerMetric(userID string) int {
74+
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
75+
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)
76+
77+
if globalLimit > 0 {
78+
if l.shardByAllLabels {
79+
// We can assume that series are evenly distributed across ingesters
80+
// so we do convert the global limit into a local limit
81+
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
82+
} else {
83+
// Given a metric is always pushed to the same set of ingesters (based on
84+
// the replication factor), we can configure the per-ingester local limit
85+
// equal to the global limit.
86+
localLimit = l.minNonZero(localLimit, globalLimit)
87+
}
88+
}
89+
90+
// If both the local and global limits are disabled, we just
91+
// use the largest int value
92+
if localLimit == 0 {
93+
localLimit = math.MaxInt32
94+
}
95+
96+
return localLimit
97+
}
98+
99+
func (l *SeriesLimiter) maxSeriesPerUser(userID string) int {
100+
localLimit := l.limits.MaxLocalSeriesPerUser(userID)
101+
102+
// The global limit is supported only when shard-by-all-labels is enabled,
103+
// otherwise we wouldn't get an even split of series across ingesters and
104+
// can't take a "local decision" without any centralized coordination.
105+
if l.shardByAllLabels {
106+
// We can assume that series are evenly distributed across ingesters
107+
// so we do convert the global limit into a local limit
108+
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)
109+
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
110+
}
111+
112+
// If both the local and global limits are disabled, we just
113+
// use the largest int value
114+
if localLimit == 0 {
115+
localLimit = math.MaxInt32
116+
}
117+
118+
return localLimit
119+
}
120+
121+
func (l *SeriesLimiter) convertGlobalToLocalLimit(globalLimit int) int {
122+
if globalLimit == 0 {
123+
return 0
124+
}
125+
126+
// Given we don't need a super accurate count (ie. when the ingesters
127+
// topology changes) and we prefer to always be in favor of the tenant,
128+
// we can use a per-ingester limit equal to:
129+
// (global limit / number of ingesters) * replication factor
130+
numIngesters := l.ring.HealthyInstancesCount()
131+
132+
// May happen because the number of ingesters is asynchronously updated.
133+
// If happens, we just temporarily ignore the global limit.
134+
if numIngesters > 0 {
135+
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
136+
}
137+
138+
return 0
139+
}
140+
141+
func (l *SeriesLimiter) minNonZero(first, second int) int {
142+
if first == 0 || (second != 0 && first > second) {
143+
return second
144+
}
145+
146+
return first
147+
}

0 commit comments

Comments
 (0)