diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2088f28a4f..01ed49109e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -11,7 +11,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/limiter" "github.com/grafana/dskit/services" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -34,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" + "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" util_math "github.com/cortexproject/cortex/pkg/util/math" "github.com/cortexproject/cortex/pkg/util/validation" diff --git a/pkg/distributor/ingestion_rate_strategy.go b/pkg/distributor/ingestion_rate_strategy.go index 65ef7686ae..cc3e5dd240 100644 --- a/pkg/distributor/ingestion_rate_strategy.go +++ b/pkg/distributor/ingestion_rate_strategy.go @@ -1,9 +1,9 @@ package distributor import ( - "github.com/grafana/dskit/limiter" "golang.org/x/time/rate" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/validation" ) diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index f8ed6579ab..8a5e0d577f 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -3,12 +3,12 @@ package distributor import ( "testing" - "github.com/grafana/dskit/limiter" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/time/rate" + "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/validation" ) diff --git a/vendor/github.com/grafana/dskit/limiter/rate_limiter.go b/pkg/util/limiter/rate_limiter.go similarity index 100% rename from vendor/github.com/grafana/dskit/limiter/rate_limiter.go rename to pkg/util/limiter/rate_limiter.go diff --git a/pkg/util/limiter/rate_limiter_test.go b/pkg/util/limiter/rate_limiter_test.go new file mode 100644 index 0000000000..907624c10c --- /dev/null +++ b/pkg/util/limiter/rate_limiter_test.go @@ -0,0 +1,129 @@ +package limiter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/time/rate" +) + +func TestRateLimiter_RecheckPeriod(t *testing.T) { + strategy := &increasingLimitStrategy{} + limiter := NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + // Since the strategy increases the limit and burst value each time + // the strategy functions are called, we do assert if the recheck + // period is honored increasing the input time + assert.Equal(t, float64(1), limiter.Limit(now, "test")) + assert.Equal(t, 1, limiter.Burst(now, "test")) + + assert.Equal(t, float64(1), limiter.Limit(now.Add(9*time.Second), "test")) + assert.Equal(t, 1, limiter.Burst(now.Add(9*time.Second), "test")) + + assert.Equal(t, float64(2), limiter.Limit(now.Add(10*time.Second), "test")) + assert.Equal(t, 2, limiter.Burst(now.Add(10*time.Second), "test")) + + assert.Equal(t, float64(2), limiter.Limit(now.Add(19*time.Second), "test")) + assert.Equal(t, 2, limiter.Burst(now.Add(19*time.Second), "test")) + + assert.Equal(t, float64(3), limiter.Limit(now.Add(20*time.Second), "test")) + assert.Equal(t, 3, limiter.Burst(now.Add(20*time.Second), "test")) +} + +func TestRateLimiter_AllowN(t *testing.T) { + strategy := &staticLimitStrategy{tenants: map[string]struct { + limit float64 + burst int + }{ + "tenant-1": {limit: 10, burst: 20}, + "tenant-2": {limit: 20, burst: 40}, + }} + + limiter := NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + // Tenant #1 + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 8)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 10)) + assert.Equal(t, false, limiter.AllowN(now, "tenant-1", 3)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-1", 2)) + + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 8)) + assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-1", 3)) + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-1", 2)) + + // Tenant #2 + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 18)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 20)) + assert.Equal(t, false, limiter.AllowN(now, "tenant-2", 3)) + assert.Equal(t, true, limiter.AllowN(now, "tenant-2", 2)) + + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 18)) + assert.Equal(t, false, limiter.AllowN(now.Add(time.Second), "tenant-2", 3)) + assert.Equal(t, true, limiter.AllowN(now.Add(time.Second), "tenant-2", 2)) +} + +func BenchmarkRateLimiter_CustomMultiTenant(b *testing.B) { + strategy := &increasingLimitStrategy{} + limiter := NewRateLimiter(strategy, 10*time.Second) + now := time.Now() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + limiter.AllowN(now, "test", 1) + } +} + +func BenchmarkRateLimiter_OriginalSingleTenant(b *testing.B) { + limiter := rate.NewLimiter(rate.Limit(1), 1) + now := time.Now() + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + limiter.AllowN(now, 1) + } +} + +type increasingLimitStrategy struct { + limit float64 + burst int +} + +func (s *increasingLimitStrategy) Limit(tenantID string) float64 { + s.limit++ + return s.limit +} + +func (s *increasingLimitStrategy) Burst(tenantID string) int { + s.burst++ + return s.burst +} + +type staticLimitStrategy struct { + tenants map[string]struct { + limit float64 + burst int + } +} + +func (s *staticLimitStrategy) Limit(tenantID string) float64 { + tenant, ok := s.tenants[tenantID] + if !ok { + return 0 + } + + return tenant.limit +} + +func (s *staticLimitStrategy) Burst(tenantID string) int { + tenant, ok := s.tenants[tenantID] + if !ok { + return 0 + } + + return tenant.burst +} diff --git a/vendor/modules.txt b/vendor/modules.txt index ab2920dd48..f7bab1ef6e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -367,7 +367,6 @@ github.com/grafana/dskit/kv/codec github.com/grafana/dskit/kv/consul github.com/grafana/dskit/kv/etcd github.com/grafana/dskit/kv/memberlist -github.com/grafana/dskit/limiter github.com/grafana/dskit/middleware github.com/grafana/dskit/modules github.com/grafana/dskit/multierror