Skip to content

Add global limit to the max series per user and metric #1760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
* [CHANGE] The frontend component has been refactored to be easier to re-use. When upgrading the frontend, cache entries will be discarded and re-created with the new protobuf schema. #1734
* [CHANGE] Remove direct DB/API access from the ruler
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
* [FEATURE] Global limit on the max series per user and metric #1760
* `-ingester.max-global-series-per-user`
* `-ingester.max-global-series-per-metric`
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
Expand Down
5 changes: 3 additions & 2 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,15 @@ func (c *Config) Validate() error {
if err := c.Encoding.Validate(); err != nil {
return errors.Wrap(err, "invalid encoding config")
}

if err := c.Storage.Validate(); err != nil {
return errors.Wrap(err, "invalid storage config")
}

if err := c.TSDB.Validate(); err != nil {
return errors.Wrap(err, "invalid TSDB config")
}
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
return errors.Wrap(err, "invalid limits config")
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort
cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB
cfg.Ingester.TSDBConfig = cfg.TSDB
cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels

t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
if err != nil {
Expand Down
26 changes: 17 additions & 9 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ type Config struct {
TSDBEnabled bool `yaml:"-"`
TSDBConfig tsdb.Config `yaml:"-"`

// Injected at runtime and read from the distributor config, required
// to accurately apply global limits.
ShardByAllLabels bool `yaml:"-"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(addr string, cfg client.Config) (client.HealthAndIngesterClient, error)
}
Expand Down Expand Up @@ -152,6 +156,7 @@ type Ingester struct {
chunkStore ChunkStore
lifecycler *ring.Lifecycler
limits *validation.Overrides
limiter *SeriesLimiter

quit chan struct{}
done sync.WaitGroup
Expand Down Expand Up @@ -190,15 +195,11 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,

metrics: newIngesterMetrics(registerer),

limits: limits,
chunkStore: chunkStore,
userStates: newUserStates(limits, cfg),

quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
metrics: newIngesterMetrics(registerer),
limits: limits,
chunkStore: chunkStore,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
}

var err error
Expand All @@ -207,6 +208,13 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
return nil, err
}

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg)

// Now that user states have been created, we can start the lifecycler
i.lifecycler.Start()

i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
for j := 0; j < cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func TestIngesterAppendBlankLabel(t *testing.T) {

func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.MaxSeriesPerUser = 1
limits.MaxLocalSeriesPerUser = 1

_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
defer ing.Shutdown()
Expand Down Expand Up @@ -414,7 +414,7 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) {

func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
limits := defaultLimitsTestConfig()
limits.MaxSeriesPerMetric = 1
limits.MaxLocalSeriesPerMetric = 1

_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
defer ing.Shutdown()
Expand Down
18 changes: 11 additions & 7 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,10 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,

metrics: newIngesterMetrics(registerer),

limits: limits,
chunkStore: chunkStore,
userStates: newUserStates(limits, cfg),
quit: make(chan struct{}),
metrics: newIngesterMetrics(registerer),
limits: limits,
chunkStore: chunkStore,
quit: make(chan struct{}),

TSDBState: TSDBState{
dbs: make(map[string]*tsdb.DB),
Expand All @@ -68,6 +65,13 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
return nil, err
}

// Init the limter and instantiate the user states which depend on it
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
i.userStates = newUserStates(i.limiter, cfg)

// Now that user states have been created, we can start the lifecycler
i.lifecycler.Start()

return i, nil
}

Expand Down
147 changes: 147 additions & 0 deletions pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package ingester

import (
"fmt"
"math"

"github.com/cortexproject/cortex/pkg/util/validation"
)

const (
errMaxSeriesPerMetricLimitExceeded = "per-metric series limit (local: %d global: %d actual local: %d) exceeded"
errMaxSeriesPerUserLimitExceeded = "per-user series limit (local: %d global: %d actual local: %d) exceeded"
)

// RingCount is the interface exposed by a ring implementation which allows
// to count members
type RingCount interface {
HealthyInstancesCount() int
}

// SeriesLimiter implements primitives to get the maximum number of series
// an ingester can handle for a specific tenant
type SeriesLimiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
shardByAllLabels bool
}

// NewSeriesLimiter makes a new in-memory series limiter
func NewSeriesLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int, shardByAllLabels bool) *SeriesLimiter {
return &SeriesLimiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
shardByAllLabels: shardByAllLabels,
}
}

// AssertMaxSeriesPerMetric limit has not been reached compared to the current
// number of series in input and returns an error if so.
func (l *SeriesLimiter) AssertMaxSeriesPerMetric(userID string, series int) error {
actualLimit := l.maxSeriesPerMetric(userID)
if series < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)

return fmt.Errorf(errMaxSeriesPerMetricLimitExceeded, localLimit, globalLimit, actualLimit)
}

// AssertMaxSeriesPerUser limit has not been reached compared to the current
// number of series in input and returns an error if so.
func (l *SeriesLimiter) AssertMaxSeriesPerUser(userID string, series int) error {
actualLimit := l.maxSeriesPerUser(userID)
if series < actualLimit {
return nil
}

localLimit := l.limits.MaxLocalSeriesPerUser(userID)
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)

return fmt.Errorf(errMaxSeriesPerUserLimitExceeded, localLimit, globalLimit, actualLimit)
}

// MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit.
func (l *SeriesLimiter) MaxSeriesPerQuery(userID string) int {
return l.limits.MaxSeriesPerQuery(userID)
}

func (l *SeriesLimiter) maxSeriesPerMetric(userID string) int {
localLimit := l.limits.MaxLocalSeriesPerMetric(userID)
globalLimit := l.limits.MaxGlobalSeriesPerMetric(userID)

if globalLimit > 0 {
if l.shardByAllLabels {
// We can assume that series are evenly distributed across ingesters
// so we do convert the global limit into a local limit
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
} else {
// Given a metric is always pushed to the same set of ingesters (based on
// the replication factor), we can configure the per-ingester local limit
// equal to the global limit.
localLimit = l.minNonZero(localLimit, globalLimit)
}
}

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
}

return localLimit
}

func (l *SeriesLimiter) maxSeriesPerUser(userID string) int {
localLimit := l.limits.MaxLocalSeriesPerUser(userID)

// The global limit is supported only when shard-by-all-labels is enabled,
// otherwise we wouldn't get an even split of series across ingesters and
// can't take a "local decision" without any centralized coordination.
if l.shardByAllLabels {
// We can assume that series are evenly distributed across ingesters
// so we do convert the global limit into a local limit
globalLimit := l.limits.MaxGlobalSeriesPerUser(userID)
localLimit = l.minNonZero(localLimit, l.convertGlobalToLocalLimit(globalLimit))
}

// If both the local and global limits are disabled, we just
// use the largest int value
if localLimit == 0 {
localLimit = math.MaxInt32
}

return localLimit
}

func (l *SeriesLimiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}

// Given we don't need a super accurate count (ie. when the ingesters
// topology changes) and we prefer to always be in favor of the tenant,
// we can use a per-ingester limit equal to:
// (global limit / number of ingesters) * replication factor
numIngesters := l.ring.HealthyInstancesCount()

// May happen because the number of ingesters is asynchronously updated.
// If happens, we just temporarily ignore the global limit.
if numIngesters > 0 {
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}

return 0
}

func (l *SeriesLimiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}

return first
}
Loading