Skip to content

Use backoff functionality from github.com/grafana/dskit #4426

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ require (
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
github.com/alicebob/miniredis/v2 v2.14.3
github.com/armon/go-metrics v0.3.6
github.com/aws/aws-sdk-go v1.38.60
github.com/aws/aws-sdk-go v1.38.68
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/dustin/go-humanize v1.0.0
github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb
github.com/felixge/fgprof v0.9.1
github.com/fsouza/fake-gcs-server v1.7.0
github.com/go-kit/kit v0.10.0
github.com/go-kit/kit v0.11.0
github.com/go-openapi/strfmt v0.20.1
github.com/go-openapi/swag v0.19.15
github.com/go-redis/redis/v8 v8.9.0
Expand All @@ -30,6 +30,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.3
github.com/gorilla/mux v1.7.3
github.com/grafana/dskit v0.0.0-20210817085554-1b69d2de136f
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2
github.com/hashicorp/consul/api v1.8.1
github.com/hashicorp/go-cleanhttp v0.5.1
Expand Down Expand Up @@ -59,9 +60,9 @@ require (
github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624
go.etcd.io/bbolt v1.3.5
go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0.0.20210225194612-fa82d11a958a
go.uber.org/atomic v1.8.0
go.uber.org/atomic v1.9.0
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
Expand Down
61 changes: 47 additions & 14 deletions go.sum

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions integration/e2e/composite_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (
"fmt"
"time"

"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"

"github.com/cortexproject/cortex/pkg/util"
)

// CompositeHTTPService abstract an higher-level service composed, under the hood,
Expand All @@ -16,13 +15,13 @@ type CompositeHTTPService struct {
services []*HTTPService

// Generic retry backoff.
retryBackoff *util.Backoff
retryBackoff *backoff.Backoff
}

func NewCompositeHTTPService(services ...*HTTPService) *CompositeHTTPService {
return &CompositeHTTPService{
services: services,
retryBackoff: util.NewBackoff(context.Background(), util.BackoffConfig{
retryBackoff: backoff.New(context.Background(), backoff.Config{
MinBackoff: 300 * time.Millisecond,
MaxBackoff: 600 * time.Millisecond,
MaxRetries: 50, // Sometimes the CI is slow ¯\_(ツ)_/¯
Expand Down
11 changes: 5 additions & 6 deletions integration/e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/pkg/util"
)

var (
Expand All @@ -43,7 +42,7 @@ type ConcreteService struct {
networkPortsContainerToLocal map[int]int

// Generic retry backoff.
retryBackoff *util.Backoff
retryBackoff *backoff.Backoff

// docker NetworkName used to start this container.
// If empty it means service is stopped.
Expand All @@ -64,7 +63,7 @@ func NewConcreteService(
command: command,
networkPortsContainerToLocal: map[int]int{},
readiness: readiness,
retryBackoff: util.NewBackoff(context.Background(), util.BackoffConfig{
retryBackoff: backoff.New(context.Background(), backoff.Config{
MinBackoff: 300 * time.Millisecond,
MaxBackoff: 600 * time.Millisecond,
MaxRetries: 50, // Sometimes the CI is slow ¯\_(ツ)_/¯
Expand All @@ -80,8 +79,8 @@ func (s *ConcreteService) Name() string { return s.name }

// Less often used options.

func (s *ConcreteService) SetBackoff(cfg util.BackoffConfig) {
s.retryBackoff = util.NewBackoff(context.Background(), cfg)
func (s *ConcreteService) SetBackoff(cfg backoff.Config) {
s.retryBackoff = backoff.New(context.Background(), cfg)
}

func (s *ConcreteService) SetEnvVars(env map[string]string) {
Expand Down
9 changes: 4 additions & 5 deletions integration/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
"testing"
"time"

"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util"
)

func TestWaitSumMetric(t *testing.T) {
Expand Down Expand Up @@ -80,15 +79,15 @@ metric_b_summary_count 1
},
}

s.SetBackoff(util.BackoffConfig{
s.SetBackoff(backoff.Config{
MinBackoff: 300 * time.Millisecond,
MaxBackoff: 600 * time.Millisecond,
MaxRetries: 50,
})
require.NoError(t, s.WaitSumMetrics(Equals(221), "metric_a"))

// No retry.
s.SetBackoff(util.BackoffConfig{
s.SetBackoff(backoff.Config{
MinBackoff: 0,
MaxBackoff: 0,
MaxRetries: 1,
Expand Down Expand Up @@ -164,7 +163,7 @@ metric_b 1000
},
}

s.SetBackoff(util.BackoffConfig{
s.SetBackoff(backoff.Config{
MinBackoff: 300 * time.Millisecond,
MaxBackoff: 600 * time.Millisecond,
MaxRetries: 50,
Expand Down
4 changes: 2 additions & 2 deletions integration/integration_memberlist_single_binary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
"testing"
"time"

"github.com/grafana/dskit/backoff"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/cortexproject/cortex/integration/ca"
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/util"
)

func TestSingleBinaryWithMemberlist(t *testing.T) {
Expand Down Expand Up @@ -145,7 +145,7 @@ func newSingleBinary(name string, servername string, join string, testFlags map[
8000,
)

backOff := util.BackoffConfig{
backOff := backoff.Config{
MinBackoff: 200 * time.Millisecond,
MaxBackoff: 500 * time.Millisecond, // Bump max backoff... things take little longer with memberlist.
MaxRetries: 100,
Expand Down
7 changes: 4 additions & 3 deletions pkg/chunk/aws/dynamodb_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"golang.org/x/time/rate"
Expand Down Expand Up @@ -60,7 +61,7 @@ type DynamoDBConfig struct {
Metrics MetricsAutoScalingConfig `yaml:"metrics"`
ChunkGangSize int `yaml:"chunk_gang_size"`
ChunkGetMaxParallelism int `yaml:"chunk_get_max_parallelism"`
BackoffConfig util.BackoffConfig `yaml:"backoff_config"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -177,7 +178,7 @@ func (a dynamoDBStorageClient) BatchWrite(ctx context.Context, input chunk.Write
outstanding := input.(dynamoDBWriteBatch)
unprocessed := dynamoDBWriteBatch{}

backoff := util.NewBackoff(ctx, a.cfg.BackoffConfig)
backoff := backoff.New(ctx, a.cfg.BackoffConfig)

for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
requests := dynamoDBWriteBatch{}
Expand Down Expand Up @@ -433,7 +434,7 @@ func (a dynamoDBStorageClient) getDynamoDBChunks(ctx context.Context, chunks []c

result := []chunk.Chunk{}
unprocessed := dynamoDBReadRequest{}
backoff := util.NewBackoff(ctx, a.cfg.BackoffConfig)
backoff := backoff.New(ctx, a.cfg.BackoffConfig)

for outstanding.Len()+unprocessed.Len() > 0 && backoff.Ongoing() {
requests := dynamoDBReadRequest{}
Expand Down
6 changes: 3 additions & 3 deletions pkg/chunk/aws/dynamodb_table_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"
"golang.org/x/time/rate"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/log"
)

Expand All @@ -30,7 +30,7 @@ type autoscale interface {

type callManager struct {
limiter *rate.Limiter
backoffConfig util.BackoffConfig
backoffConfig backoff.Config
}

type dynamoTableClient struct {
Expand Down Expand Up @@ -80,7 +80,7 @@ func (d callManager) backoffAndRetry(ctx context.Context, fn func(context.Contex
_ = d.limiter.Wait(ctx)
}

backoff := util.NewBackoff(ctx, d.backoffConfig)
backoff := backoff.New(ctx, d.backoffConfig)
for backoff.Ongoing() {
if err := fn(ctx); err != nil {
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == "ThrottlingException" {
Expand Down
4 changes: 2 additions & 2 deletions pkg/chunk/aws/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
"io"
"time"

"github.com/grafana/dskit/backoff"
"golang.org/x/time/rate"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/objectclient"
"github.com/cortexproject/cortex/pkg/chunk/testutils"
"github.com/cortexproject/cortex/pkg/util"
)

type fixture struct {
Expand Down Expand Up @@ -73,7 +73,7 @@ func dynamoDBFixture(provisionedErr, gangsize, maxParallelism int) testutils.Fix
cfg: DynamoDBConfig{
ChunkGangSize: gangsize,
ChunkGetMaxParallelism: maxParallelism,
BackoffConfig: util.BackoffConfig{
BackoffConfig: backoff.Config{
MinBackoff: 1 * time.Millisecond,
MaxBackoff: 5 * time.Millisecond,
MaxRetries: 20,
Expand Down
9 changes: 4 additions & 5 deletions pkg/chunk/aws/retryer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@ import (
"time"

"github.com/aws/aws-sdk-go/aws/request"
"github.com/grafana/dskit/backoff"
ot "github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"

"github.com/cortexproject/cortex/pkg/util"
)

// Map Cortex Backoff into AWS Retryer interface
type retryer struct {
*util.Backoff
*backoff.Backoff
maxRetries int
}

var _ request.Retryer = &retryer{}

func newRetryer(ctx context.Context, cfg util.BackoffConfig) *retryer {
func newRetryer(ctx context.Context, cfg backoff.Config) *retryer {
return &retryer{
Backoff: util.NewBackoff(ctx, cfg),
Backoff: backoff.New(ctx, cfg),
maxRetries: cfg.MaxRetries,
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -573,7 +574,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error {
var lastErr error

retries := util.NewBackoff(ctx, util.BackoffConfig{
retries := backoff.New(ctx, backoff.Config{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
Expand Down Expand Up @@ -670,7 +671,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
func (c *Compactor) discoverUsersWithRetries(ctx context.Context) ([]string, error) {
var lastErr error

retries := util.NewBackoff(ctx, util.BackoffConfig{
retries := backoff.New(ctx, backoff.Config{
MinBackoff: c.compactorCfg.retryMinBackoff,
MaxBackoff: c.compactorCfg.retryMaxBackoff,
MaxRetries: c.compactorCfg.CompactionRetries,
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/weaveworks/common/httpgrpc"
"google.golang.org/grpc"
Expand Down Expand Up @@ -197,12 +198,12 @@ func (w *frontendSchedulerWorker) stop() {
}

func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb.SchedulerForFrontendClient) {
backoffConfig := util.BackoffConfig{
backoffConfig := backoff.Config{
MinBackoff: 50 * time.Millisecond,
MaxBackoff: 1 * time.Second,
}

backoff := util.NewBackoff(ctx, backoffConfig)
backoff := backoff.New(ctx, backoffConfig)
for backoff.Ongoing() {
loop, loopErr := client.FrontendLoop(ctx)
if loopErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
Expand All @@ -17,7 +18,6 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
)

var (
Expand Down Expand Up @@ -275,7 +275,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
if i.cfg.MaxTransferRetries <= 0 {
return ring.ErrTransferDisabled
}
backoff := util.NewBackoff(ctx, util.BackoffConfig{
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
MaxRetries: i.cfg.MaxTransferRetries,
Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/blocks_finder_bucket_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -270,7 +271,7 @@ pushJobsLoop:
// scanUserBlocksWithRetries runs scanUserBlocks() retrying multiple times
// in case of error.
func (d *BucketScanBlocksFinder) scanUserBlocksWithRetries(ctx context.Context, userID string) (metas bucketindex.Blocks, deletionMarks map[ulid.ULID]*bucketindex.BlockDeletionMark, err error) {
retries := util.NewBackoff(ctx, util.BackoffConfig{
retries := backoff.New(ctx, backoff.Config{
MinBackoff: time.Second,
MaxBackoff: 30 * time.Second,
MaxRetries: 3,
Expand Down
Loading