Skip to content

Commit d1529ff

Browse files
pracuccigouthamve
authored andcommitted
Add global ingestion rate limiter to distributors (#1766)
* Upgraded golang.org/x/time introducing rate.Limit.SetBurst() Signed-off-by: Marco Pracucci <[email protected]> * Added a global ingestion rate limiter Signed-off-by: Marco Pracucci <[email protected]> * Commented ReadRegistry mock to fix linter errors Signed-off-by: Marco Pracucci <[email protected]> * Added PR number to changelog entry Signed-off-by: Marco Pracucci <[email protected]> * Replaced distributors service discovery with the ring Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter errors Signed-off-by: Marco Pracucci <[email protected]> * Fixed TSDB ingester initialization after rebasing with master Signed-off-by: Marco Pracucci <[email protected]> * Tested distributor.RingConfig Signed-off-by: Marco Pracucci <[email protected]> * Enriched ring.Lifecycler tests Signed-off-by: Marco Pracucci <[email protected]> * Renamed distributor's internalDependency param into canJoinRing to make it more clear Signed-off-by: Marco Pracucci <[email protected]> * Removed 'consul' and 'ingester' from ring logs, because it's not necessarily a ring of ingesters backed by consul Signed-off-by: Marco Pracucci <[email protected]> * Fixes after rebasing with master Signed-off-by: Marco Pracucci <[email protected]> * Changed ring key reference in the ingester, to keep code consistency Signed-off-by: Marco Pracucci <[email protected]> * Moved ReadLifecycler interface definition at the place of consumption Signed-off-by: Marco Pracucci <[email protected]> * Re-introduced -distributor.limiter-reload-period but deprecated Signed-off-by: Marco Pracucci <[email protected]> * Removed spurious bmizerany/assert dependency Signed-off-by: Marco Pracucci <[email protected]> * Fixed issues after rebasing master Signed-off-by: Marco Pracucci <[email protected]> * Fixed issues after rebasing master Signed-off-by: Marco Pracucci <[email protected]> * Updated changed with a CHANGE entry too Signed-off-by: Marco Pracucci <[email protected]> * Updated doc Signed-off-by: Marco Pracucci <[email protected]> * Renamed variable in distributor.New() Signed-off-by: Marco Pracucci <[email protected]> * Fix distributor tests when running on a system without eth0 and en0 Signed-off-by: Marco Pracucci <[email protected]>
1 parent 2556418 commit d1529ff

31 files changed

+1013
-206
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
* `ruler.storage.type` has beem added to specify the rule store backend type, currently only the configdb.
1111
* `ruler.poll-interval` has been added to specify the interval in which to poll new rule groups.
1212
* [CHANGE] Use relative links from /ring page to make it work when used behind reverse proxy. #1896
13+
* [CHANGE] Deprecated `-distributor.limiter-reload-period` flag. #1766
1314
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
15+
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
1416
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
1517

1618
## 0.4.0 / 2019-12-02

docs/arguments.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,12 +291,17 @@ When running Cortex on Kubernetes, store this file in a config map and mount it
291291

292292
Valid fields are (with their corresponding flags for default values):
293293

294+
- `ingestion_rate_strategy` / `-distributor.ingestion-rate-limit-strategy`
294295
- `ingestion_rate` / `-distributor.ingestion-rate-limit`
295296
- `ingestion_burst_size` / `-distributor.ingestion-burst-size`
296297

297-
The per-tenant rate limit (and burst size), in samples per second. Enforced on a per distributor basis, actual effective rate limit will be N times higher, where N is the number of distributor replicas.
298+
The per-tenant rate limit (and burst size), in samples per second. It supports two strategies: `local` (default) and `global`.
298299

299-
**NB** Limits are reset every `-distributor.limiter-reload-period`, as such if you set a very high burst limit it will never be hit.
300+
The `local` strategy enforces the limit on a per distributor basis, actual effective rate limit will be N times higher, where N is the number of distributor replicas.
301+
302+
The `global` strategy enforces the limit globally, configuring a per-distributor local rate limiter as `ingestion_rate / N`, where N is the number of distributor replicas (it's automatically adjusted if the number of replicas change). The `ingestion_burst_size` refers to the per-distributor local rate limiter (even in the case of the `global` strategy) and should be set at least to the maximum number of samples expected in a single push request.
303+
304+
The `global` strategy requires the distributors to form their own ring, which is used to keep track of the current number of healthy distributor replicas. The ring is configured by `distributor: { ring: {}}` / `-distributor.ring.*`.
300305

301306
- `max_label_name_length` / `-validation.max-length-label-name`
302307
- `max_label_value_length` / `-validation.max-length-label-value`

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ require (
6767
go.etcd.io/bbolt v1.3.3
6868
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
6969
golang.org/x/net v0.0.0-20190923162816-aa69164e4478
70-
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
70+
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
7171
google.golang.org/api v0.11.0
7272
google.golang.org/grpc v1.25.1
7373
gopkg.in/yaml.v2 v2.2.2

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
794794
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
795795
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
796796
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
797+
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 h1:/5xXl8Y5W96D+TtHSlonuFqGHIWVuyCkGJLwGh9JJFs=
798+
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
797799
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
798800
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
799801
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

pkg/cortex/modules.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (t *Cortex) stopServer() (err error) {
151151
}
152152

153153
func (t *Cortex) initRing(cfg *Config) (err error) {
154-
t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester")
154+
t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey)
155155
if err != nil {
156156
return
157157
}
@@ -171,7 +171,14 @@ func (t *Cortex) stopOverrides() error {
171171
}
172172

173173
func (t *Cortex) initDistributor(cfg *Config) (err error) {
174-
t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.overrides, t.ring)
174+
cfg.Distributor.DistributorRing.ListenPort = cfg.Server.GRPCListenPort
175+
176+
// Check whether the distributor can join the distributors ring, which is
177+
// whenever it's not running as an internal dependency (ie. querier or
178+
// ruler's dependency)
179+
canJoinDistributorsRing := (cfg.Target == All || cfg.Target == Distributor)
180+
181+
t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.overrides, t.ring, canJoinDistributorsRing)
175182
if err != nil {
176183
return
177184
}

pkg/distributor/distributor.go

Lines changed: 60 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ import (
77
"net/http"
88
"sort"
99
"strings"
10-
"sync"
1110
"time"
1211

13-
"golang.org/x/time/rate"
1412
"google.golang.org/grpc/health/grpc_health_v1"
1513

1614
opentracing "github.com/opentracing/opentracing-go"
@@ -25,6 +23,8 @@ import (
2523
"github.com/cortexproject/cortex/pkg/ring"
2624
"github.com/cortexproject/cortex/pkg/util"
2725
"github.com/cortexproject/cortex/pkg/util/extract"
26+
"github.com/cortexproject/cortex/pkg/util/flagext"
27+
"github.com/cortexproject/cortex/pkg/util/limiter"
2828
"github.com/cortexproject/cortex/pkg/util/validation"
2929
billing "github.com/weaveworks/billing-client"
3030
"github.com/weaveworks/common/httpgrpc"
@@ -97,18 +97,20 @@ var (
9797
// forwards appends and queries to individual ingesters.
9898
type Distributor struct {
9999
cfg Config
100-
ring ring.ReadRing
100+
ingestersRing ring.ReadRing
101101
ingesterPool *ingester_client.Pool
102102
limits *validation.Overrides
103103
billingClient *billing.Client
104104

105+
// The global rate limiter requires a distributors ring to count
106+
// the number of healthy instances
107+
distributorsRing *ring.Lifecycler
108+
105109
// For handling HA replicas.
106110
Replicas *haTracker
107111

108-
// Per-user rate limiters.
109-
ingestLimitersMtx sync.RWMutex
110-
ingestLimiters map[string]*rate.Limiter
111-
quit chan struct{}
112+
// Per-user rate limiter.
113+
ingestionRateLimiter *limiter.RateLimiter
112114
}
113115

114116
// Config contains the configuration require to
@@ -120,28 +122,31 @@ type Config struct {
120122

121123
HATrackerConfig HATrackerConfig `yaml:"ha_tracker,omitempty"`
122124

123-
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
124-
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
125-
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay,omitempty"`
126-
LimiterReloadPeriod time.Duration `yaml:"limiter_reload_period,omitempty"`
125+
MaxRecvMsgSize int `yaml:"max_recv_msg_size"`
126+
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
127+
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay,omitempty"`
127128

128129
ShardByAllLabels bool `yaml:"shard_by_all_labels,omitempty"`
129130

131+
// Distributors ring
132+
DistributorRing RingConfig `yaml:"ring,omitempty"`
133+
130134
// for testing
131-
ingesterClientFactory client.Factory
135+
ingesterClientFactory client.Factory `yaml:"-"`
132136
}
133137

134138
// RegisterFlags adds the flags required to config this to the given FlagSet
135139
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
136140
cfg.BillingConfig.RegisterFlags(f)
137141
cfg.PoolConfig.RegisterFlags(f)
138142
cfg.HATrackerConfig.RegisterFlags(f)
143+
cfg.DistributorRing.RegisterFlags(f)
139144

140145
f.BoolVar(&cfg.EnableBilling, "distributor.enable-billing", false, "Report number of ingested samples to billing system.")
141146
f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "remote_write API max receive message size (bytes).")
142147
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
143148
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
144-
f.DurationVar(&cfg.LimiterReloadPeriod, "distributor.limiter-reload-period", 5*time.Minute, "Period at which to reload user ingestion limits.")
149+
flagext.DeprecatedFlag(f, "distributor.limiter-reload-period", "DEPRECATED. No more required because the local limiter is reconfigured as soon as the overrides change.")
145150
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
146151
}
147152

@@ -151,7 +156,7 @@ func (cfg *Config) Validate() error {
151156
}
152157

153158
// New constructs a new Distributor
154-
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ring ring.ReadRing) (*Distributor, error) {
159+
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ingestersRing ring.ReadRing, canJoinDistributorsRing bool) (*Distributor, error) {
155160
if cfg.ingesterClientFactory == nil {
156161
cfg.ingesterClientFactory = func(addr string) (grpc_health_v1.HealthClient, error) {
157162
return ingester_client.MakeIngesterClient(addr, clientConfig)
@@ -167,55 +172,57 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
167172
}
168173
}
169174

170-
replicationFactor.Set(float64(ring.ReplicationFactor()))
175+
replicationFactor.Set(float64(ingestersRing.ReplicationFactor()))
171176
cfg.PoolConfig.RemoteTimeout = cfg.RemoteTimeout
172177

173178
replicas, err := newClusterTracker(cfg.HATrackerConfig)
174179
if err != nil {
175180
return nil, err
176181
}
177182

178-
d := &Distributor{
179-
cfg: cfg,
180-
ring: ring,
181-
ingesterPool: ingester_client.NewPool(cfg.PoolConfig, ring, cfg.ingesterClientFactory, util.Logger),
182-
billingClient: billingClient,
183-
limits: limits,
184-
ingestLimiters: map[string]*rate.Limiter{},
185-
quit: make(chan struct{}),
186-
Replicas: replicas,
187-
}
188-
go d.loop()
189-
190-
return d, nil
191-
}
183+
// Create the configured ingestion rate limit strategy (local or global). In case
184+
// it's an internal dependency and can't join the distributors ring, we skip rate
185+
// limiting.
186+
var ingestionRateStrategy limiter.RateLimiterStrategy
187+
var distributorsRing *ring.Lifecycler
192188

193-
func (d *Distributor) loop() {
194-
if d.cfg.LimiterReloadPeriod == 0 {
195-
return
196-
}
189+
if !canJoinDistributorsRing {
190+
ingestionRateStrategy = newInfiniteIngestionRateStrategy()
191+
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy {
192+
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey)
193+
if err != nil {
194+
return nil, err
195+
}
197196

198-
ticker := time.NewTicker(d.cfg.LimiterReloadPeriod)
199-
defer ticker.Stop()
197+
distributorsRing.Start()
200198

201-
for {
202-
select {
203-
case <-ticker.C:
204-
d.ingestLimitersMtx.Lock()
205-
d.ingestLimiters = make(map[string]*rate.Limiter, len(d.ingestLimiters))
206-
d.ingestLimitersMtx.Unlock()
199+
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsRing)
200+
} else {
201+
ingestionRateStrategy = newLocalIngestionRateStrategy(limits)
202+
}
207203

208-
case <-d.quit:
209-
return
210-
}
204+
d := &Distributor{
205+
cfg: cfg,
206+
ingestersRing: ingestersRing,
207+
ingesterPool: ingester_client.NewPool(cfg.PoolConfig, ingestersRing, cfg.ingesterClientFactory, util.Logger),
208+
billingClient: billingClient,
209+
distributorsRing: distributorsRing,
210+
limits: limits,
211+
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
212+
Replicas: replicas,
211213
}
214+
215+
return d, nil
212216
}
213217

214218
// Stop stops the distributor's maintenance loop.
215219
func (d *Distributor) Stop() {
216-
close(d.quit)
217220
d.ingesterPool.Stop()
218221
d.Replicas.stop()
222+
223+
if d.distributorsRing != nil {
224+
d.distributorsRing.Shutdown()
225+
}
219226
}
220227

221228
func (d *Distributor) tokenForLabels(userID string, labels []client.LabelAdapter) (uint32, error) {
@@ -410,18 +417,18 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
410417
return &client.WriteResponse{}, lastPartialErr
411418
}
412419

413-
limiter := d.getOrCreateIngestLimiter(userID)
414-
if !limiter.AllowN(time.Now(), validatedSamples) {
420+
now := time.Now()
421+
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamples) {
415422
// Ensure the request slice is reused if the request is rate limited.
416423
client.ReuseSlice(req.Timeseries)
417424

418425
// Return a 4xx here to have the client discard the data and not retry. If a client
419426
// is sending too much data consistently we will unlikely ever catch up otherwise.
420427
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamples))
421-
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", limiter.Limit(), numSamples)
428+
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples", d.ingestionRateLimiter.Limit(now, userID), numSamples)
422429
}
423430

424-
err = ring.DoBatch(ctx, d.ring, keys, func(ingester ring.IngesterDesc, indexes []int) error {
431+
err = ring.DoBatch(ctx, d.ingestersRing, keys, func(ingester ring.IngesterDesc, indexes []int) error {
425432
timeseries := make([]client.PreallocTimeseries, 0, len(indexes))
426433
for _, i := range indexes {
427434
timeseries = append(timeseries, validatedTimeseries[i])
@@ -442,24 +449,6 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
442449
return &client.WriteResponse{}, lastPartialErr
443450
}
444451

445-
func (d *Distributor) getOrCreateIngestLimiter(userID string) *rate.Limiter {
446-
d.ingestLimitersMtx.RLock()
447-
limiter, ok := d.ingestLimiters[userID]
448-
d.ingestLimitersMtx.RUnlock()
449-
450-
if ok {
451-
return limiter
452-
}
453-
454-
limiter = rate.NewLimiter(rate.Limit(d.limits.IngestionRate(userID)), d.limits.IngestionBurstSize(userID))
455-
456-
d.ingestLimitersMtx.Lock()
457-
d.ingestLimiters[userID] = limiter
458-
d.ingestLimitersMtx.Unlock()
459-
460-
return limiter
461-
}
462-
463452
func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDesc, timeseries []client.PreallocTimeseries) error {
464453
h, err := d.ingesterPool.GetClientFor(ingester.Addr)
465454
if err != nil {
@@ -481,7 +470,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
481470

482471
// forAllIngesters runs f, in parallel, for all ingesters
483472
func (d *Distributor) forAllIngesters(ctx context.Context, reallyAll bool, f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
484-
replicationSet, err := d.ring.GetAll()
473+
replicationSet, err := d.ingestersRing.GetAll()
485474
if err != nil {
486475
return nil, err
487476
}
@@ -603,8 +592,8 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
603592
totalStats.NumSeries += r.NumSeries
604593
}
605594

606-
totalStats.IngestionRate /= float64(d.ring.ReplicationFactor())
607-
totalStats.NumSeries /= uint64(d.ring.ReplicationFactor())
595+
totalStats.IngestionRate /= float64(d.ingestersRing.ReplicationFactor())
596+
totalStats.NumSeries /= uint64(d.ingestersRing.ReplicationFactor())
608597

609598
return totalStats, nil
610599
}
@@ -624,7 +613,7 @@ func (d *Distributor) AllUserStats(ctx context.Context) ([]UserIDStats, error) {
624613
req := &client.UserStatsRequest{}
625614
ctx = user.InjectOrgID(ctx, "1") // fake: ingester insists on having an org ID
626615
// Not using d.forAllIngesters(), so we can fail after first error.
627-
replicationSet, err := d.ring.GetAll()
616+
replicationSet, err := d.ingestersRing.GetAll()
628617
if err != nil {
629618
return nil, err
630619
}

0 commit comments

Comments
 (0)