From 1f32badc64efd6490e5f5d639e8aad7b548da63d Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Wed, 5 Jan 2022 16:42:01 +0100 Subject: [PATCH] Upgrade to dskit@01ce9286d7d5 Signed-off-by: Arve Knudsen --- CHANGELOG.md | 6 +- go.mod | 2 +- go.sum | 3 +- pkg/compactor/compactor.go | 7 +- pkg/compactor/compactor_test.go | 1 + pkg/cortex/modules.go | 2 +- pkg/distributor/distributor.go | 7 +- pkg/distributor/distributor_test.go | 9 +- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_v2.go | 7 +- pkg/ingester/lifecycle_test.go | 10 +- pkg/ruler/lifecycle_test.go | 8 +- pkg/ruler/ruler.go | 7 +- pkg/ruler/ruler_test.go | 4 +- pkg/util/dns_watcher.go | 2 +- .../github.com/grafana/dskit/closer/closer.go | 9 -- .../grafana/dskit/concurrency/runner.go | 2 +- .../grafana/dskit/crypto/tls/tls.go | 4 +- .../github.com/grafana/dskit/flagext/day.go | 6 +- .../grafana/dskit/flagext/register.go | 39 +++++++- .../grafana/dskit/grpcutil/dns_resolver.go | 80 +++++++--------- .../grafana/dskit/grpcutil/naming.go | 8 +- .../grafana/dskit/internal/math/math.go | 9 ++ .../grafana/dskit/kv/consul/client.go | 2 +- .../grafana/dskit/kv/consul/mock.go | 9 +- .../github.com/grafana/dskit/kv/etcd/etcd.go | 2 +- .../dskit/kv/memberlist/memberlist_client.go | 32 ++++--- .../grafana/dskit/kv/memberlist/mergeable.go | 5 + .../dskit/kv/memberlist/tcp_transport.go | 83 +++++++++------- vendor/github.com/grafana/dskit/math/math.go | 33 ------- vendor/github.com/grafana/dskit/math/rate.go | 59 ------------ .../grafana/dskit/middleware/grpc.go | 32 +++++-- .../grafana/dskit/ring/basic_lifecycler.go | 23 +++-- vendor/github.com/grafana/dskit/ring/http.go | 42 ++++---- .../grafana/dskit/ring/lifecycler.go | 6 +- vendor/github.com/grafana/dskit/ring/model.go | 69 ++++++++----- .../grafana/dskit/ring/replication_set.go | 23 ++++- .../dskit/ring/replication_strategy.go | 21 +++- vendor/github.com/grafana/dskit/ring/ring.go | 70 +++++++------- .../github.com/grafana/dskit/ring/ticker.go | 14 +++ .../github.com/grafana/dskit/ring/tokens.go | 3 +- vendor/github.com/grafana/dskit/ring/util.go | 15 ++- .../grafana/dskit/runtimeconfig/manager.go | 4 +- vendor/github.com/grafana/dskit/time/time.go | 96 ------------------- vendor/modules.txt | 6 +- 45 files changed, 428 insertions(+), 455 deletions(-) delete mode 100644 vendor/github.com/grafana/dskit/closer/closer.go create mode 100644 vendor/github.com/grafana/dskit/internal/math/math.go delete mode 100644 vendor/github.com/grafana/dskit/math/math.go delete mode 100644 vendor/github.com/grafana/dskit/math/rate.go create mode 100644 vendor/github.com/grafana/dskit/ring/ticker.go delete mode 100644 vendor/github.com/grafana/dskit/time/time.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eefcaea669..54d2b9fce8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 * [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562 * [CHANGE] Compactor block deletion mark migration, needed when upgrading from v1.7, is now disabled by default. #4597 +* [CHANGE] The `status_code` label on gRPC client metrics has changed from '200' and '500' to '2xx', '5xx', '4xx', 'cancel' or 'error'. 4601 +* [CHANGE] Memberlist: changed probe interval from `1s` to `5s` and probe timeout from `500ms` to `2s`. #4601 * [ENHANCEMENT] Update Go version to 1.17.5. #4602 #4604 * [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 @@ -15,9 +17,12 @@ * [ENHANCEMENT] Updated Prometheus to latest. Includes changes from prometheus#9239, adding 15 new functions. Multiple TSDB bugfixes prometheus#9438 & prometheus#9381. #4524 * [ENHANCEMENT] Query Frontend: Add setting `-frontend.forward-headers-list` in frontend to configure the set of headers from the requests to be forwarded to downstream requests. #4486 * [ENHANCEMENT] Blocks storage: Add `-blocks-storage.azure.http.*`, `-alertmanager-storage.azure.http.*`, and `-ruler-storage.azure.http.*` to configure the Azure storage client. #4581 +* [ENHANCEMENT] Optimise memberlist receive path when used as a backing store for rings with a large number of members. #4601 * [BUGFIX] AlertManager: remove stale template files. #4495 * [BUGFIX] Distributor: fix bug in query-exemplar where some results would get dropped. #4582 * [BUGFIX] Update Thanos dependency: compactor tracing support, azure blocks storage memory fix. #4585 +* [BUGFIX] Querier: Disable query scheduler SRV DNS lookup, which removes noisy log messages about "failed DNS SRV record lookup". #4601 +* [BUGFIX] Memberlist: fixed corrupted packets when sending compound messages with more than 255 messages or messages bigger than 64KB. #4601 ## 1.11.0 2021-11-25 @@ -79,7 +84,6 @@ * [BUGFIX] Compactor: fixed panic while collecting Prometheus metrics. #4483 * [BUGFIX] Update go-kit package to fix spurious log messages #4544 - ## 1.10.0 / 2021-08-03 * [CHANGE] Prevent path traversal attack from users able to control the HTTP header `X-Scope-OrgID`. #4375 (CVE-2021-36157) diff --git a/go.mod b/go.mod index c6b0c08977..bd0c6af67c 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/golang/snappy v0.0.4 github.com/gorilla/mux v1.8.0 - github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1 + github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 github.com/json-iterator/go v1.1.12 github.com/lib/pq v1.3.0 github.com/minio/minio-go/v7 v7.0.10 diff --git a/go.sum b/go.sum index c1a79dc5a8..b54d4510ed 100644 --- a/go.sum +++ b/go.sum @@ -860,8 +860,9 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7 github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1 h1:Qf+/W3Tup0nO21tgJmO14WJK0yyrm4L2UJipZP+Zoow= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= +github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 h1:IXo/V2+KKLYLD724qh3uRaZgAy3BV3HdtXuSs7lb3jU= +github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5/go.mod h1:M0/dlftwBvH7+hdNNpjMa/CUXD7gsew67mbkCuDlFXE= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 6bd4d75a60..337ec76c68 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -37,6 +37,9 @@ import ( ) const ( + // ringKey is the key under which we store the compactors ring in the KVStore. + ringKey = "compactor" + blocksMarkedForDeletionName = "cortex_compactor_blocks_marked_for_deletion_total" blocksMarkedForDeletionHelp = "Total number of blocks marked for deletion in compactor." ) @@ -367,12 +370,12 @@ func (c *Compactor) starting(ctx context.Context) error { // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { lifecyclerCfg := c.compactorCfg.ShardingRing.ToLifecyclerConfig() - c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ring.CompactorRingKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + c.ringLifecycler, err = ring.NewLifecycler(lifecyclerCfg, ring.NewNoopFlushTransferer(), "compactor", ringKey, false, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring lifecycler") } - c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ring.CompactorRingKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) + c.ring, err = ring.New(lifecyclerCfg.RingConfig, "compactor", ringKey, c.logger, prometheus.WrapRegistererWithPrefix("cortex_", c.registerer)) if err != nil { return errors.Wrap(err, "unable to initialize compactor ring") } diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index b1923a625b..ebb2f956b8 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1086,6 +1086,7 @@ func removeIgnoredLogs(input []string) []string { `level=debug component=compactor msg="unregistering instance from ring" ring=compactor`: {}, `level=info component=compactor msg="instance removed from the KV store" ring=compactor`: {}, `level=info component=compactor msg="observing tokens before going ACTIVE" ring=compactor`: {}, + `level=info component=compactor msg="lifecycler entering final sleep before shutdown" final_sleep=0s`: {}, } out := make([]string, 0, len(input)) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index d19e964634..ffe4f01ea8 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -137,7 +137,7 @@ func (t *Cortex) initServer() (services.Service, error) { func (t *Cortex) initRing() (serv services.Service, err error) { t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) - t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer)) + t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ingester.RingKey, util_log.Logger, prometheus.WrapRegistererWithPrefix("cortex_", prometheus.DefaultRegisterer)) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index e9d2b590f2..2620256080 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -54,6 +54,9 @@ var ( ) const ( + // ringKey is the key under which we store the distributors ring in the KVStore. + ringKey = "distributor" + typeSamples = "samples" typeMetadata = "metadata" @@ -211,12 +214,12 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { return nil, err } - distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", ring.DistributorRingKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) + distributorsRing, err = ring.New(cfg.DistributorRing.ToRingConfig(), "distributor", ringKey, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) if err != nil { return nil, errors.Wrap(err, "failed to initialize distributors' ring client") } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 1487db7aa2..8957f754b5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -36,6 +36,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/cortexproject/cortex/pkg/tenant" @@ -1626,7 +1627,7 @@ func BenchmarkDistributor_Push(b *testing.B) { kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) b.Cleanup(func() { assert.NoError(b, closer.Close()) }) - err := kvStore.CAS(context.Background(), ring.IngesterRingKey, + err := kvStore.CAS(context.Background(), ingester.RingKey, func(_ interface{}) (interface{}, bool, error) { d := &ring.Desc{} d.AddIngester("ingester-1", "127.0.0.1", "", ring.GenerateTokens(128, nil), ring.ACTIVE, time.Now()) @@ -1639,7 +1640,7 @@ func BenchmarkDistributor_Push(b *testing.B) { KVStore: kv.Config{Mock: kvStore}, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: 1, - }, ring.IngesterRingKey, ring.IngesterRingKey, nil, nil) + }, ingester.RingKey, ingester.RingKey, nil, nil) require.NoError(b, err) require.NoError(b, services.StartAndAwaitRunning(context.Background(), ingestersRing)) b.Cleanup(func() { @@ -1983,7 +1984,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - err := kvStore.CAS(context.Background(), ring.IngesterRingKey, + err := kvStore.CAS(context.Background(), ingester.RingKey, func(_ interface{}) (interface{}, bool, error) { return &ring.Desc{ Ingesters: ingesterDescs, @@ -2004,7 +2005,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*p }, HeartbeatTimeout: 60 * time.Minute, ReplicationFactor: rf, - }, ring.IngesterRingKey, ring.IngesterRingKey, nil, nil) + }, ingester.RingKey, ingester.RingKey, nil, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 3295e0e8a1..10275e0827 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -267,7 +267,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // During WAL recovery, it will create new user states which requires the limiter. // Hence initialise the limiter before creating the WAL. // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled || cfg.WALConfig.FlushOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer)) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, !cfg.WALConfig.WALEnabled || cfg.WALConfig.FlushOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer)) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index fa906578f0..887459df58 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -47,6 +47,11 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) +const ( + // RingKey is the key under which we store the ingesters ring in the KVStore. + RingKey = "ring" +) + const ( errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)" errTSDBIngest = "err: %v. timestamp=%s, series=%s" // Using error.Wrap puts the message before the error and if the series is too long, its truncated. @@ -511,7 +516,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, }, i.getOldestUnshippedBlockMetric) } - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer)) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", RingKey, cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown, logger, prometheus.WrapRegistererWithPrefix("cortex_", registerer)) if err != nil { return nil, err } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index f1b0af7937..ed3bf53a34 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -83,7 +83,7 @@ func TestIngesterRestart(t *testing.T) { } test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", RingKey) }) { @@ -96,7 +96,7 @@ func TestIngesterRestart(t *testing.T) { time.Sleep(200 * time.Millisecond) test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", RingKey) }) } @@ -111,7 +111,7 @@ func TestIngester_ShutdownHandler(t *testing.T) { // Make sure the ingester has been added to the ring. test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", RingKey) }) recorder := httptest.NewRecorder() @@ -120,7 +120,7 @@ func TestIngester_ShutdownHandler(t *testing.T) { // Make sure the ingester has been removed from the ring even when UnregisterFromRing is false. test.Poll(t, 100*time.Millisecond, 0, func() interface{} { - return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", ring.IngesterRingKey) + return numTokens(config.LifecyclerConfig.RingConfig.KVStore.Mock, "localhost", RingKey) }) }) } @@ -341,7 +341,7 @@ func TestIngesterFlush(t *testing.T) { // the ring, the data is in the chunk store. require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing.lifecycler)) test.Poll(t, 200*time.Millisecond, 0, func() interface{} { - r, err := ing.lifecycler.KVStore.Get(context.Background(), ring.IngesterRingKey) + r, err := ing.lifecycler.KVStore.Get(context.Background(), RingKey) if err != nil { return -1 } diff --git a/pkg/ruler/lifecycle_test.go b/pkg/ruler/lifecycle_test.go index c39923c327..358525a2d3 100644 --- a/pkg/ruler/lifecycle_test.go +++ b/pkg/ruler/lifecycle_test.go @@ -39,7 +39,7 @@ func TestRulerShutdown(t *testing.T) { // Wait until the tokens are registered in the ring test.Poll(t, 100*time.Millisecond, config.Ring.NumTokens, func() interface{} { - return numTokens(ringStore, "localhost", ring.RulerRingKey) + return numTokens(ringStore, "localhost", ringKey) }) require.Equal(t, ring.ACTIVE, r.lifecycler.GetState()) @@ -48,7 +48,7 @@ func TestRulerShutdown(t *testing.T) { // Wait until the tokens are unregistered from the ring test.Poll(t, 100*time.Millisecond, 0, func() interface{} { - return numTokens(ringStore, "localhost", ring.RulerRingKey) + return numTokens(ringStore, "localhost", ringKey) }) } @@ -75,7 +75,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck // Add an unhealthy instance to the ring. - require.NoError(t, ringStore.CAS(ctx, ring.RulerRingKey, func(in interface{}) (interface{}, bool, error) { + require.NoError(t, ringStore.CAS(ctx, ringKey, func(in interface{}) (interface{}, bool, error) { ringDesc := ring.GetOrCreateRingDesc(in) instance := ringDesc.AddIngester(unhealthyInstanceID, "1.1.1.1", "", generateSortedTokens(config.Ring.NumTokens), ring.ACTIVE, time.Now()) @@ -87,7 +87,7 @@ func TestRuler_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) { // Ensure the unhealthy instance is removed from the ring. test.Poll(t, time.Second*5, false, func() interface{} { - d, err := ringStore.Get(ctx, ring.RulerRingKey) + d, err := ringStore.Get(ctx, ringKey) if err != nil { return err } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index edf4342e44..9c33eba2fb 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -48,6 +48,9 @@ var ( ) const ( + // ringKey is the key under which we store the rulers ring in the KVStore. + ringKey = "ring" + // Number of concurrent group list and group loads operations. loadRulesConcurrency = 10 fetchRulesConcurrency = 16 @@ -316,12 +319,12 @@ func enableSharding(r *Ruler, ringStore kv.Client) error { delegate = ring.NewAutoForgetDelegate(r.cfg.Ring.HeartbeatTimeout*ringAutoForgetUnhealthyPeriods, delegate, r.logger) rulerRingName := "ruler" - r.lifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, rulerRingName, ring.RulerRingKey, ringStore, delegate, r.logger, prometheus.WrapRegistererWithPrefix("cortex_", r.registry)) + r.lifecycler, err = ring.NewBasicLifecycler(lifecyclerCfg, rulerRingName, ringKey, ringStore, delegate, r.logger, prometheus.WrapRegistererWithPrefix("cortex_", r.registry)) if err != nil { return errors.Wrap(err, "failed to initialize ruler's lifecycler") } - r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ring.RulerRingKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", r.registry), r.logger) + r.ring, err = ring.NewWithStoreClientAndStrategy(r.cfg.Ring.ToRingConfig(), rulerRingName, ringKey, ringStore, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), prometheus.WrapRegistererWithPrefix("cortex_", r.registry), r.logger) if err != nil { return errors.Wrap(err, "failed to initialize ruler's ring") } diff --git a/pkg/ruler/ruler_test.go b/pkg/ruler/ruler_test.go index 2c7eb2c0d5..97a578c684 100644 --- a/pkg/ruler/ruler_test.go +++ b/pkg/ruler/ruler_test.go @@ -436,7 +436,7 @@ func TestGetRules(t *testing.T) { } if tc.sharding { - err := kvStore.CAS(context.Background(), ring.RulerRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { d, _ := in.(*ring.Desc) if d == nil { d = ring.NewDesc() @@ -946,7 +946,7 @@ func TestSharding(t *testing.T) { } if tc.setupRing != nil { - err := kvStore.CAS(context.Background(), ring.RulerRingKey, func(in interface{}) (out interface{}, retry bool, err error) { + err := kvStore.CAS(context.Background(), ringKey, func(in interface{}) (out interface{}, retry bool, err error) { d, _ := in.(*ring.Desc) if d == nil { d = ring.NewDesc() diff --git a/pkg/util/dns_watcher.go b/pkg/util/dns_watcher.go index 584d91c79c..4b37852e44 100644 --- a/pkg/util/dns_watcher.go +++ b/pkg/util/dns_watcher.go @@ -33,7 +33,7 @@ func NewDNSWatcher(address string, dnsLookupPeriod time.Duration, notifications return nil, err } - watcher, err := resolver.Resolve(address) + watcher, err := resolver.Resolve(address, "") if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/closer/closer.go b/vendor/github.com/grafana/dskit/closer/closer.go deleted file mode 100644 index 3402f5d579..0000000000 --- a/vendor/github.com/grafana/dskit/closer/closer.go +++ /dev/null @@ -1,9 +0,0 @@ -package closer - -// Func is like http.HandlerFunc but for io.Closers. -type Func func() error - -// Close implements io.Closer. -func (f Func) Close() error { - return f() -} diff --git a/vendor/github.com/grafana/dskit/concurrency/runner.go b/vendor/github.com/grafana/dskit/concurrency/runner.go index 846b136cf5..a6740f3ac9 100644 --- a/vendor/github.com/grafana/dskit/concurrency/runner.go +++ b/vendor/github.com/grafana/dskit/concurrency/runner.go @@ -6,7 +6,7 @@ import ( "golang.org/x/sync/errgroup" - "github.com/grafana/dskit/math" + "github.com/grafana/dskit/internal/math" "github.com/grafana/dskit/multierror" ) diff --git a/vendor/github.com/grafana/dskit/crypto/tls/tls.go b/vendor/github.com/grafana/dskit/crypto/tls/tls.go index 9886b208dd..a6fa46f073 100644 --- a/vendor/github.com/grafana/dskit/crypto/tls/tls.go +++ b/vendor/github.com/grafana/dskit/crypto/tls/tls.go @@ -4,7 +4,7 @@ import ( "crypto/tls" "crypto/x509" "flag" - "io/ioutil" + "os" "github.com/pkg/errors" "google.golang.org/grpc" @@ -44,7 +44,7 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) { // read ca certificates if cfg.CAPath != "" { var caCertPool *x509.CertPool - caCert, err := ioutil.ReadFile(cfg.CAPath) + caCert, err := os.ReadFile(cfg.CAPath) if err != nil { return nil, errors.Wrapf(err, "error loading ca cert: %s", cfg.CAPath) } diff --git a/vendor/github.com/grafana/dskit/flagext/day.go b/vendor/github.com/grafana/dskit/flagext/day.go index 8370ac0d57..9db695c832 100644 --- a/vendor/github.com/grafana/dskit/flagext/day.go +++ b/vendor/github.com/grafana/dskit/flagext/day.go @@ -25,12 +25,12 @@ func NewDayValue(t model.Time) DayValue { // String implements flag.Value func (v DayValue) String() string { - return v.Time.Time().Format(time.RFC3339) + return v.Time.Time().UTC().Format(time.RFC3339) } // Set implements flag.Value func (v *DayValue) Set(s string) error { - t, err := time.Parse("2006-01-02", s) + t, err := time.ParseInLocation("2006-01-02", s, time.UTC) if err != nil { return err } @@ -55,5 +55,5 @@ func (v *DayValue) UnmarshalYAML(unmarshal func(interface{}) error) error { // MarshalYAML implements yaml.Marshaler. func (v DayValue) MarshalYAML() (interface{}, error) { - return v.Time.Time().Format("2006-01-02"), nil + return v.Time.Time().UTC().Format("2006-01-02"), nil } diff --git a/vendor/github.com/grafana/dskit/flagext/register.go b/vendor/github.com/grafana/dskit/flagext/register.go index 1140843e04..1004e1ba45 100644 --- a/vendor/github.com/grafana/dskit/flagext/register.go +++ b/vendor/github.com/grafana/dskit/flagext/register.go @@ -1,12 +1,21 @@ package flagext -import "flag" +import ( + "flag" + + "github.com/go-kit/log" +) // Registerer is a thing that can RegisterFlags type Registerer interface { RegisterFlags(*flag.FlagSet) } +// RegistererWithLogger is a thing that can RegisterFlags with a Logger +type RegistererWithLogger interface { + RegisterFlags(*flag.FlagSet, log.Logger) +} + // RegisterFlags registers flags with the provided Registerers func RegisterFlags(rs ...Registerer) { for _, r := range rs { @@ -14,11 +23,33 @@ func RegisterFlags(rs ...Registerer) { } } +// RegisterFlagsWithLogger registers flags with the provided Registerers +func RegisterFlagsWithLogger(logger log.Logger, rs ...interface{}) { + for _, v := range rs { + switch r := v.(type) { + case Registerer: + r.RegisterFlags(flag.CommandLine) + case RegistererWithLogger: + r.RegisterFlags(flag.CommandLine, logger) + default: + panic("RegisterFlagsWithLogger must be passed a Registerer or RegistererWithLogger") + } + } +} + // DefaultValues initiates a set of configs (Registerers) with their defaults. -func DefaultValues(rs ...Registerer) { +func DefaultValues(rs ...interface{}) { fs := flag.NewFlagSet("", flag.PanicOnError) - for _, r := range rs { - r.RegisterFlags(fs) + logger := log.NewNopLogger() + for _, v := range rs { + switch r := v.(type) { + case Registerer: + r.RegisterFlags(fs) + case RegistererWithLogger: + r.RegisterFlags(fs, logger) + default: + panic("RegisterFlagsWithLogger must be passed a Registerer") + } } _ = fs.Parse([]string{}) } diff --git a/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go b/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go index ad9f17c782..ef9c639894 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go +++ b/vendor/github.com/grafana/dskit/grpcutil/dns_resolver.go @@ -29,8 +29,8 @@ var ( // NewDNSResolverWithFreq creates a DNS Resolver that can resolve DNS names, and // create watchers that poll the DNS server using the frequency set by freq. -func NewDNSResolverWithFreq(freq time.Duration, logger log.Logger) (Resolver, error) { - return &dnsResolver{ +func NewDNSResolverWithFreq(freq time.Duration, logger log.Logger) (*Resolver, error) { + return &Resolver{ logger: logger, freq: freq, }, nil @@ -38,12 +38,12 @@ func NewDNSResolverWithFreq(freq time.Duration, logger log.Logger) (Resolver, er // NewDNSResolver creates a DNS Resolver that can resolve DNS names, and create // watchers that poll the DNS server using the default frequency defined by defaultFreq. -func NewDNSResolver(logger log.Logger) (Resolver, error) { +func NewDNSResolver(logger log.Logger) (*Resolver, error) { return NewDNSResolverWithFreq(defaultFreq, logger) } -// dnsResolver handles name resolution for names following the DNS scheme -type dnsResolver struct { +// Resolver handles name resolution for names following the DNS scheme. +type Resolver struct { logger log.Logger // frequency of polling the DNS server that the watchers created by this resolver will use. freq time.Duration @@ -101,8 +101,12 @@ func parseTarget(target string) (host, port string, err error) { return "", "", fmt.Errorf("invalid target address %v", target) } -// Resolve creates a watcher that watches the name resolution of the target. -func (r *dnsResolver) Resolve(target string) (Watcher, error) { +// Resolve creates a watcher that watches the SRV/hostname record resolution of the target. +// +// If service is not empty, the watcher will first attempt to resolve an SRV record. +// If that fails, or service is empty, hostname record resolution is attempted instead. +// If target can be parsed as an IP address, the watcher will return it, and will not send any more updates afterwards. +func (r *Resolver) Resolve(target, service string) (Watcher, error) { host, port, err := parseTarget(target) if err != nil { return nil, err @@ -119,22 +123,24 @@ func (r *dnsResolver) Resolve(target string) (Watcher, error) { ctx, cancel := context.WithCancel(context.Background()) return &dnsWatcher{ - r: r, - logger: r.logger, - host: host, - port: port, - ctx: ctx, - cancel: cancel, - t: time.NewTimer(0), + r: r, + logger: r.logger, + host: host, + port: port, + service: service, + ctx: ctx, + cancel: cancel, + t: time.NewTimer(0), }, nil } // dnsWatcher watches for the name resolution update for a specific target type dnsWatcher struct { - r *dnsResolver - logger log.Logger - host string - port string + r *Resolver + logger log.Logger + host string + port string + service string // The latest resolved address set curAddrs map[string]*Update ctx context.Context @@ -164,26 +170,6 @@ func (i *ipWatcher) Close() { close(i.updateChan) } -// AddressType indicates the address type returned by name resolution. -type AddressType uint8 - -const ( - // Backend indicates the server is a backend server. - Backend AddressType = iota - // GRPCLB indicates the server is a grpclb load balancer. - GRPCLB -) - -// AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The -// name resolver used by the grpclb balancer is required to provide this type of metadata in -// its address updates. -type AddrMetadataGRPCLB struct { - // AddrType is the type of server (grpc load balancer or backend). - AddrType AddressType - // ServerName is the name of the grpc load balancer. Used for authentication. - ServerName string -} - // compileUpdate compares the old resolved addresses and newly resolved addresses, // and generates an update list func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update { @@ -203,27 +189,30 @@ func (w *dnsWatcher) compileUpdate(newAddrs map[string]*Update) []*Update { } func (w *dnsWatcher) lookupSRV() map[string]*Update { + if w.service == "" { + return nil + } + newAddrs := make(map[string]*Update) - _, srvs, err := lookupSRV(w.ctx, "grpclb", "tcp", w.host) + _, srvs, err := lookupSRV(w.ctx, w.service, "tcp", w.host) if err != nil { level.Info(w.logger).Log("msg", "failed DNS SRV record lookup", "err", err) return nil } for _, s := range srvs { - lbAddrs, err := lookupHost(w.ctx, s.Target) + addrs, err := lookupHost(w.ctx, s.Target) if err != nil { - level.Warn(w.logger).Log("msg", "failed load balancer address DNS lookup", "err", err) + level.Warn(w.logger).Log("msg", "failed SRV target DNS lookup", "target", s.Target, "err", err) continue } - for _, a := range lbAddrs { + for _, a := range addrs { a, ok := formatIP(a) if !ok { level.Error(w.logger).Log("failed IP parsing", "err", err) continue } addr := a + ":" + strconv.Itoa(int(s.Port)) - newAddrs[addr] = &Update{Addr: addr, - Metadata: AddrMetadataGRPCLB{AddrType: GRPCLB, ServerName: s.Target}} + newAddrs[addr] = &Update{Addr: addr} } } return newAddrs @@ -251,8 +240,7 @@ func (w *dnsWatcher) lookupHost() map[string]*Update { func (w *dnsWatcher) lookup() []*Update { newAddrs := w.lookupSRV() if newAddrs == nil { - // If failed to get any balancer address (either no corresponding SRV for the - // target, or caused by failure during resolution/parsing of the balancer target), + // If we failed to get any valid addresses from SRV record lookup, // return any A record info available. newAddrs = w.lookupHost() } diff --git a/vendor/github.com/grafana/dskit/grpcutil/naming.go b/vendor/github.com/grafana/dskit/grpcutil/naming.go index 441b3ad9fc..8029324406 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/naming.go +++ b/vendor/github.com/grafana/dskit/grpcutil/naming.go @@ -24,13 +24,7 @@ type Update struct { Metadata interface{} } -// Resolver creates a Watcher for a target to track its resolution changes. -type Resolver interface { - // Resolve creates a Watcher for target. - Resolve(target string) (Watcher, error) -} - -// Watcher watches for the updates on the specified target. +// Watcher watches for SRV updates on the specified target. type Watcher interface { // Next blocks until an update or error happens. It may return one or more // updates. The first call should get the full set of the results. It should diff --git a/vendor/github.com/grafana/dskit/internal/math/math.go b/vendor/github.com/grafana/dskit/internal/math/math.go new file mode 100644 index 0000000000..9d6422e50e --- /dev/null +++ b/vendor/github.com/grafana/dskit/internal/math/math.go @@ -0,0 +1,9 @@ +package math + +// Min returns the minimum of two ints. +func Min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/vendor/github.com/grafana/dskit/kv/consul/client.go b/vendor/github.com/grafana/dskit/kv/consul/client.go index b3391c9cda..69219cf748 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/client.go +++ b/vendor/github.com/grafana/dskit/kv/consul/client.go @@ -59,7 +59,7 @@ type kv interface { Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMeta, error) } -// Client is a KV.Client for Consul. +// Client is a kv.Client for Consul. type Client struct { kv codec codec.Codec diff --git a/vendor/github.com/grafana/dskit/kv/consul/mock.go b/vendor/github.com/grafana/dskit/kv/consul/mock.go index ae9c768f9f..f1f6937f08 100644 --- a/vendor/github.com/grafana/dskit/kv/consul/mock.go +++ b/vendor/github.com/grafana/dskit/kv/consul/mock.go @@ -12,7 +12,6 @@ import ( consul "github.com/hashicorp/consul/api" "github.com/prometheus/client_golang/prometheus" - "github.com/grafana/dskit/closer" "github.com/grafana/dskit/kv/codec" ) @@ -48,7 +47,7 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge // Create a closer function used to close the main loop and wait until it's done. // We need to wait until done, otherwise the goroutine leak finder used in tests // may still report it as leaked. - closer := closer.Func(func() error { + closer := closerFunc(func() error { close(m.close) m.closeWG.Wait() return nil @@ -67,6 +66,12 @@ func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config, logger log.Logge }, closer } +type closerFunc func() error + +func (c closerFunc) Close() error { + return c() +} + func copyKVPair(in *consul.KVPair) *consul.KVPair { out := *in out.Value = make([]byte, len(in.Value)) diff --git a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go index b08fb6a9dd..fa6944d4f5 100644 --- a/vendor/github.com/grafana/dskit/kv/etcd/etcd.go +++ b/vendor/github.com/grafana/dskit/kv/etcd/etcd.go @@ -38,7 +38,7 @@ type Clientv3Facade interface { clientv3.Watcher } -// Client implements ring.KVClient for etcd. +// Client implements kv.Client for etcd. type Client struct { cfg Config codec codec.Codec diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 1b21fa5c42..d7ad176d0e 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -193,7 +193,7 @@ func (cfg *KVConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { f.StringVar(&cfg.AdvertiseAddr, prefix+"memberlist.advertise-addr", mlDefaults.AdvertiseAddr, "Gossip address to advertise to other members in the cluster. Used for NAT traversal.") f.IntVar(&cfg.AdvertisePort, prefix+"memberlist.advertise-port", mlDefaults.AdvertisePort, "Gossip port to advertise to other members in the cluster. Used for NAT traversal.") - cfg.TCPTransport.RegisterFlags(f, prefix) + cfg.TCPTransport.RegisterFlagsWithPrefix(f, prefix) } func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet) { @@ -400,6 +400,13 @@ func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) { // Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet". // As we don't use UDP for sending packets, we can use higher value here. mlCfg.UDPBufferSize = 10 * 1024 * 1024 + + // For our use cases, we don't need a very fast detection of dead nodes. Since we use a TCP transport + // and we open a new TCP connection for each packet, we prefer to reduce the probe frequency and increase + // the timeout compared to defaults. + mlCfg.ProbeInterval = 5 * time.Second // Probe a random node every this interval. This setting is also the total timeout for the direct + indirect probes. + mlCfg.ProbeTimeout = 2 * time.Second // Timeout for the direct probe. + return mlCfg, nil } @@ -902,18 +909,6 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - if len(pairData) > 65535 { - // Unfortunately, memberlist will happily let us send bigger messages via gossip, - // but then it will fail to parse them properly, because its own size field is 2-bytes only. - // (github.com/hashicorp/memberlist@v0.1.4/util.go:167, makeCompoundMessage function) - // - // Typically messages are smaller (when dealing with couple of updates only), but can get bigger - // when broadcasting result of push/pull update. - level.Debug(m.logger).Log("msg", "broadcast message too big, not broadcasting", "key", key, "version", version, "len", len(pairData)) - m.numberOfBroadcastMessagesDropped.Inc() - return - } - m.addSentMessage(message{ Time: time.Now(), Size: len(pairData), @@ -1184,7 +1179,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeMu.Lock() defer m.storeMu.Unlock() - curr := m.store[key].Clone() + // Note that we do not take a deep copy of curr.value here, it is modified in-place. + // This is safe because the entire function runs under the store lock; we do not return + // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). + curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. if casVersion > 0 && curr.version != casVersion { return nil, 0, errVersionMismatch @@ -1206,7 +1204,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui m.storeRemovedTombstones.WithLabelValues(key).Add(float64(removed)) // Remove tombstones from change too. If change turns out to be empty after this, - // we don't need to change local value either! + // we don't need to gossip the change. However, the local value will be always be updated. // // Note that "result" and "change" may actually be the same Mergeable. That is why we // call RemoveTombstones on "result" first, so that we get the correct metrics. Calling @@ -1224,6 +1222,10 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui codecID: codec.CodecID(), } + // The "changes" returned by Merge() can contain references to the "result" + // state. Therefore, make sure we clone it before releasing the lock. + change = change.Clone() + return change, newVersion, nil } diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go b/vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go index a013e34988..2c02acfa46 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/mergeable.go @@ -8,11 +8,16 @@ type Mergeable interface { // Merge with other value in place. Returns change, that can be sent to other clients. // If merge doesn't result in any change, returns nil. // Error can be returned if merging with given 'other' value is not possible. + // Implementors of this method are permitted to modify the other parameter, as the + // memberlist-based KV store will not use the same "other" parameter in multiple Merge calls. // // In order for state merging to work correctly, Merge function must have some properties. When talking about the // result of the merge in the following text, we don't mean the return value ("change"), but the // end-state of receiver. That means Result of A.Merge(B) is end-state of A. // + // Memberlist-based KV store will keep the result even if Merge returned no change. Implementations should + // be careful about not changing logical value when returning empty change. + // // Idempotency: // Result of applying the same state "B" to state "A" (A.Merge(B)) multiple times has the same effect as // applying it only once. Only first Merge will return non-empty change. diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go index dc6f9a0ac1..4265a3b223 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/tcp_transport.go @@ -7,8 +7,8 @@ import ( "flag" "fmt" "io" - "io/ioutil" "net" + "strings" "sync" "time" @@ -61,8 +61,12 @@ type TCPTransportConfig struct { TLS dstls.ClientConfig `yaml:",inline"` } -// RegisterFlags registers flags. -func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet, prefix string) { +func (cfg *TCPTransportConfig) RegisterFlags(f *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(f, "") +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *TCPTransportConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { // "Defaults to hostname" -- memberlist sets it to hostname by default. f.Var(&cfg.BindAddrs, prefix+"memberlist.bind-addr", "IP address to listen on for gossip messages. Multiple addresses may be specified. Defaults to 0.0.0.0") f.IntVar(&cfg.BindPort, prefix+"memberlist.bind-port", 7946, "Port to listen on for gossip messages.") @@ -116,7 +120,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor var ok bool t := TCPTransport{ cfg: config, - logger: logger, + logger: log.With(logger, "component", "memberlist TCPTransport"), packetCh: make(chan *memberlist.Packet), connCh: make(chan net.Conn), } @@ -211,7 +215,7 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) { loopDelay = maxDelay } - level.Error(t.logger).Log("msg", "TCPTransport: Error accepting TCP connection", "err", err) + level.Error(t.logger).Log("msg", "Error accepting TCP connection", "err", err) time.Sleep(loopDelay) continue } @@ -232,7 +236,7 @@ func (t *TCPTransport) debugLog() log.Logger { } func (t *TCPTransport) handleConnection(conn net.Conn) { - t.debugLog().Log("msg", "TCPTransport: New connection", "addr", conn.RemoteAddr()) + t.debugLog().Log("msg", "New connection", "addr", conn.RemoteAddr()) closeConn := true defer func() { @@ -245,7 +249,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) { msgType := []byte{0} _, err := io.ReadFull(conn, msgType) if err != nil { - level.Error(t.logger).Log("msg", "TCPTransport: failed to read message type", "err", err) + level.Warn(t.logger).Log("msg", "failed to read message type", "err", err, "remote", conn.RemoteAddr()) return } @@ -260,33 +264,33 @@ func (t *TCPTransport) handleConnection(conn net.Conn) { t.receivedPackets.Inc() // before reading packet, read the address - b := []byte{0} - _, err := io.ReadFull(conn, b) + addrLengthBuf := []byte{0} + _, err := io.ReadFull(conn, addrLengthBuf) if err != nil { t.receivedPacketsErrors.Inc() - level.Error(t.logger).Log("msg", "TCPTransport: error while reading address:", "err", err) + level.Warn(t.logger).Log("msg", "error while reading node address length from packet", "err", err, "remote", conn.RemoteAddr()) return } - addrBuf := make([]byte, b[0]) + addrBuf := make([]byte, addrLengthBuf[0]) _, err = io.ReadFull(conn, addrBuf) if err != nil { t.receivedPacketsErrors.Inc() - level.Error(t.logger).Log("msg", "TCPTransport: error while reading address:", "err", err) + level.Warn(t.logger).Log("msg", "error while reading node address from packet", "err", err, "remote", conn.RemoteAddr()) return } // read the rest to buffer -- this is the "packet" itself - buf, err := ioutil.ReadAll(conn) + buf, err := io.ReadAll(conn) if err != nil { t.receivedPacketsErrors.Inc() - level.Error(t.logger).Log("msg", "TCPTransport: error while reading packet data:", "err", err) + level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr()) return } if len(buf) < md5.Size { t.receivedPacketsErrors.Inc() - level.Error(t.logger).Log("msg", "TCPTransport: not enough data received", "length", len(buf)) + level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr()) return } @@ -297,10 +301,10 @@ func (t *TCPTransport) handleConnection(conn net.Conn) { if !bytes.Equal(receivedDigest, expectedDigest[:]) { t.receivedPacketsErrors.Inc() - level.Warn(t.logger).Log("msg", "TCPTransport: packet digest mismatch", "expected", fmt.Sprintf("%x", expectedDigest), "received", fmt.Sprintf("%x", receivedDigest)) + level.Warn(t.logger).Log("msg", "packet digest mismatch", "expected", fmt.Sprintf("%x", expectedDigest), "received", fmt.Sprintf("%x", receivedDigest), "data_length", len(buf), "remote", conn.RemoteAddr()) } - t.debugLog().Log("msg", "TCPTransport: Received packet", "addr", addr(addrBuf), "size", len(buf), "hash", fmt.Sprintf("%x", receivedDigest)) + t.debugLog().Log("msg", "Received packet", "addr", addr(addrBuf), "size", len(buf), "hash", fmt.Sprintf("%x", receivedDigest)) t.receivedPacketsBytes.Add(float64(len(buf))) @@ -311,7 +315,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) { } } else { t.unknownConnections.Inc() - level.Error(t.logger).Log("msg", "TCPTransport: unknown message type", "msgType", msgType) + level.Error(t.logger).Log("msg", "unknown message type", "msgType", msgType, "remote", conn.RemoteAddr()) } } @@ -415,7 +419,13 @@ func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) { if err != nil { t.sentPacketsErrors.Inc() - level.Warn(t.logger).Log("msg", "TCPTransport: WriteTo failed", "addr", addr, "err", err) + logLevel := level.Warn(t.logger) + if strings.Contains(err.Error(), "connection refused") { + // The connection refused is a common error that could happen during normal operations when a node + // shutdown (or crash). It shouldn't be considered a warning condition on the sender side. + logLevel = t.debugLog() + } + logLevel.Log("msg", "WriteTo failed", "addr", addr, "err", err) // WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors, // but memberlist library doesn't seem to cope with that very well. That is why we return nil instead. @@ -440,16 +450,15 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { } }() - if t.cfg.PacketWriteTimeout > 0 { - deadline := time.Now().Add(t.cfg.PacketWriteTimeout) - err := c.SetDeadline(deadline) - if err != nil { - return fmt.Errorf("setting deadline: %v", err) - } - } + // Compute the digest *before* setting the deadline on the connection (so that the time + // it takes to compute the digest is not taken in account). + // We use md5 as quick and relatively short hash, not in cryptographic context. + // It's also used to detect if the whole packet has been received on the receiver side. + digest := md5.Sum(b) - buf := bytes.Buffer{} - buf.WriteByte(byte(packet)) + // Prepare the header *before* setting the deadline on the connection. + headerBuf := bytes.Buffer{} + headerBuf.WriteByte(byte(packet)) // We need to send our address to the other side, otherwise other side can only see IP and port from TCP header. // But that doesn't match our node address (new TCP connection has new random port), which confuses memberlist. @@ -460,10 +469,18 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { return fmt.Errorf("local address too long") } - buf.WriteByte(byte(len(ourAddr))) - buf.WriteString(ourAddr) + headerBuf.WriteByte(byte(len(ourAddr))) + headerBuf.WriteString(ourAddr) + + if t.cfg.PacketWriteTimeout > 0 { + deadline := time.Now().Add(t.cfg.PacketWriteTimeout) + err := c.SetDeadline(deadline) + if err != nil { + return fmt.Errorf("setting deadline: %v", err) + } + } - _, err = c.Write(buf.Bytes()) + _, err = c.Write(headerBuf.Bytes()) if err != nil { return fmt.Errorf("sending local address: %v", err) } @@ -476,9 +493,7 @@ func (t *TCPTransport) writeTo(b []byte, addr string) error { return fmt.Errorf("sending data: short write") } - // Append digest. We use md5 as quick and relatively short hash, not in cryptographic context. - // This helped to find some bugs, so let's keep it. - digest := md5.Sum(b) + // Append digest. n, err = c.Write(digest[:]) if err != nil { return fmt.Errorf("digest: %v", err) diff --git a/vendor/github.com/grafana/dskit/math/math.go b/vendor/github.com/grafana/dskit/math/math.go deleted file mode 100644 index 01e544384a..0000000000 --- a/vendor/github.com/grafana/dskit/math/math.go +++ /dev/null @@ -1,33 +0,0 @@ -package math - -// Max returns the maximum of two ints -func Max(a, b int) int { - if a > b { - return a - } - return b -} - -// Min returns the minimum of two ints -func Min(a, b int) int { - if a < b { - return a - } - return b -} - -// Max64 returns the maximum of two int64s -func Max64(a, b int64) int64 { - if a > b { - return a - } - return b -} - -// Min64 returns the minimum of two int64s -func Min64(a, b int64) int64 { - if a < b { - return a - } - return b -} diff --git a/vendor/github.com/grafana/dskit/math/rate.go b/vendor/github.com/grafana/dskit/math/rate.go deleted file mode 100644 index 19bbe64289..0000000000 --- a/vendor/github.com/grafana/dskit/math/rate.go +++ /dev/null @@ -1,59 +0,0 @@ -package math - -import ( - "sync" - "time" - - "go.uber.org/atomic" -) - -// EwmaRate tracks an exponentially weighted moving average of a per-second rate. -type EwmaRate struct { - newEvents atomic.Int64 - - alpha float64 - interval time.Duration - - mutex sync.RWMutex - lastRate float64 - init bool -} - -func NewEWMARate(alpha float64, interval time.Duration) *EwmaRate { - return &EwmaRate{ - alpha: alpha, - interval: interval, - } -} - -// Rate returns the per-second rate. -func (r *EwmaRate) Rate() float64 { - r.mutex.RLock() - defer r.mutex.RUnlock() - return r.lastRate -} - -// Tick assumes to be called every r.interval. -func (r *EwmaRate) Tick() { - newEvents := r.newEvents.Swap(0) - instantRate := float64(newEvents) / r.interval.Seconds() - - r.mutex.Lock() - defer r.mutex.Unlock() - - if r.init { - r.lastRate += r.alpha * (instantRate - r.lastRate) - } else { - r.init = true - r.lastRate = instantRate - } -} - -// Inc counts one event. -func (r *EwmaRate) Inc() { - r.newEvents.Inc() -} - -func (r *EwmaRate) Add(delta int64) { - r.newEvents.Add(delta) -} diff --git a/vendor/github.com/grafana/dskit/middleware/grpc.go b/vendor/github.com/grafana/dskit/middleware/grpc.go index b0d9d31ba7..66f0d37660 100644 --- a/vendor/github.com/grafana/dskit/middleware/grpc.go +++ b/vendor/github.com/grafana/dskit/middleware/grpc.go @@ -3,10 +3,12 @@ package middleware import ( "context" "io" + "strconv" "time" "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" + grpcUtils "github.com/weaveworks/common/grpc" + "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -16,7 +18,7 @@ func PrometheusGRPCUnaryInstrumentation(metric *prometheus.HistogramVec) grpc.Un return func(ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, resp, cc, opts...) - metric.WithLabelValues(method, instrument.ErrorCode(err)).Observe(time.Since(start).Seconds()) + metric.WithLabelValues(method, errorCode(err)).Observe(time.Since(start).Seconds()) return err } } @@ -51,9 +53,9 @@ func (s *instrumentedClientStream) SendMsg(m interface{}) error { } if err == io.EOF { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) } else { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) } return err @@ -66,9 +68,9 @@ func (s *instrumentedClientStream) RecvMsg(m interface{}) error { } if err == io.EOF { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(nil)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, errorCode(nil)).Observe(time.Since(s.start).Seconds()) } else { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) } return err @@ -77,7 +79,23 @@ func (s *instrumentedClientStream) RecvMsg(m interface{}) error { func (s *instrumentedClientStream) Header() (metadata.MD, error) { md, err := s.ClientStream.Header() if err != nil { - s.metric.WithLabelValues(s.method, instrument.ErrorCode(err)).Observe(time.Since(s.start).Seconds()) + s.metric.WithLabelValues(s.method, errorCode(err)).Observe(time.Since(s.start).Seconds()) } return md, err } + +func errorCode(err error) string { + respStatus := "2xx" + if err != nil { + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + statusFamily := int(errResp.Code / 100) + respStatus = strconv.Itoa(statusFamily) + "xx" + } else if grpcUtils.IsCanceled(err) { + respStatus = "cancel" + } else { + respStatus = "error" + } + } + + return respStatus +} diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 237bf49c6f..726a85430d 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -14,7 +14,6 @@ import ( "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" - dstime "github.com/grafana/dskit/time" ) type BasicLifecyclerDelegate interface { @@ -51,6 +50,10 @@ type BasicLifecyclerConfig struct { HeartbeatPeriod time.Duration TokensObservePeriod time.Duration NumTokens int + + // If true lifecycler doesn't unregister instance from the ring when it's stopping. Default value is false, + // which means unregistering. + KeepInstanceInTheRingOnShutdown bool } // BasicLifecycler is a basic ring lifecycler which allows to hook custom @@ -182,7 +185,7 @@ func (l *BasicLifecycler) starting(ctx context.Context) error { } func (l *BasicLifecycler) running(ctx context.Context) error { - heartbeatTickerStop, heartbeatTickerChan := dstime.NewDisableableTicker(l.cfg.HeartbeatPeriod) + heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(l.cfg.HeartbeatPeriod) defer heartbeatTickerStop() for { @@ -214,7 +217,7 @@ func (l *BasicLifecycler) stopping(runningError error) error { }() // Heartbeat while the stopping delegate function is running. - heartbeatTickerStop, heartbeatTickerChan := dstime.NewDisableableTicker(l.cfg.HeartbeatPeriod) + heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(l.cfg.HeartbeatPeriod) defer heartbeatTickerStop() heartbeatLoop: @@ -227,11 +230,15 @@ heartbeatLoop: } } - // Remove the instance from the ring. - if err := l.unregisterInstance(context.Background()); err != nil { - return errors.Wrapf(err, "failed to unregister instance from the ring (ring: %s)", l.ringName) + if l.cfg.KeepInstanceInTheRingOnShutdown { + level.Info(l.logger).Log("msg", "keeping instance the ring", "ring", l.ringName) + } else { + // Remove the instance from the ring. + if err := l.unregisterInstance(context.Background()); err != nil { + return errors.Wrapf(err, "failed to unregister instance from the ring (ring: %s)", l.ringName) + } + level.Info(l.logger).Log("msg", "instance removed from the ring", "ring", l.ringName) } - level.Info(l.logger).Log("msg", "instance removed from the ring", "ring", l.ringName) return nil } @@ -292,7 +299,7 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error { } func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Duration) error { - heartbeatTickerStop, heartbeatTickerChan := dstime.NewDisableableTicker(l.cfg.HeartbeatPeriod) + heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(l.cfg.HeartbeatPeriod) defer heartbeatTickerStop() // The first observation will occur after the specified period. diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index 9f961cde30..f23f08b812 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -103,6 +103,24 @@ func (r *Ring) forget(ctx context.Context, id string) error { return r.KVClient.CAS(ctx, r.key, unregister) } +type ingesterDesc struct { + ID string `json:"id"` + State string `json:"state"` + Address string `json:"address"` + HeartbeatTimestamp string `json:"timestamp"` + RegisteredTimestamp string `json:"registered_timestamp"` + Zone string `json:"zone"` + Tokens []uint32 `json:"tokens"` + NumTokens int `json:"-"` + Ownership float64 `json:"-"` +} + +type httpResponse struct { + Ingesters []ingesterDesc `json:"shards"` + Now time.Time `json:"now"` + ShowTokens bool `json:"-"` +} + func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { ingesterID := req.FormValue("forget") @@ -132,7 +150,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { sort.Strings(ingesterIDs) now := time.Now() - ingesters := []interface{}{} + var ingesters []ingesterDesc _, owned := r.countTokens() for _, id := range ingesterIDs { ing := r.ringDesc.Ingesters[id] @@ -148,17 +166,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { registeredTimestamp = ing.GetRegisteredAt().String() } - ingesters = append(ingesters, struct { - ID string `json:"id"` - State string `json:"state"` - Address string `json:"address"` - HeartbeatTimestamp string `json:"timestamp"` - RegisteredTimestamp string `json:"registered_timestamp"` - Zone string `json:"zone"` - Tokens []uint32 `json:"tokens"` - NumTokens int `json:"-"` - Ownership float64 `json:"-"` - }{ + ingesters = append(ingesters, ingesterDesc{ ID: id, State: state, Address: ing.Addr, @@ -173,11 +181,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { tokensParam := req.URL.Query().Get("tokens") - renderHTTPResponse(w, struct { - Ingesters []interface{} `json:"shards"` - Now time.Time `json:"now"` - ShowTokens bool `json:"-"` - }{ + renderHTTPResponse(w, httpResponse{ Ingesters: ingesters, Now: now, ShowTokens: tokensParam == "true", @@ -186,7 +190,7 @@ func (r *Ring) ServeHTTP(w http.ResponseWriter, req *http.Request) { // RenderHTTPResponse either responds with json or a rendered html page using the passed in template // by checking the Accepts header -func renderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Template, r *http.Request) { +func renderHTTPResponse(w http.ResponseWriter, v httpResponse, t *template.Template, r *http.Request) { accept := r.Header.Get("Accept") if strings.Contains(accept, "application/json") { writeJSONResponse(w, v) @@ -200,7 +204,7 @@ func renderHTTPResponse(w http.ResponseWriter, v interface{}, t *template.Templa } // WriteJSONResponse writes some JSON as a HTTP response. -func writeJSONResponse(w http.ResponseWriter, v interface{}) { +func writeJSONResponse(w http.ResponseWriter, v httpResponse) { w.Header().Set("Content-Type", "application/json") data, err := json.Marshal(v) diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index b13776042f..be103e1fba 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -19,7 +19,6 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/services" - dstime "github.com/grafana/dskit/time" ) // LifecyclerConfig is the config to build a Lifecycler. @@ -403,7 +402,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { autoJoinAfter := time.After(i.cfg.JoinAfter) var observeChan <-chan time.Time - heartbeatTickerStop, heartbeatTickerChan := dstime.NewDisableableTicker(i.cfg.HeartbeatPeriod) + heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) defer heartbeatTickerStop() for { @@ -480,7 +479,7 @@ func (i *Lifecycler) stopping(runningError error) error { return nil } - heartbeatTickerStop, heartbeatTickerChan := dstime.NewDisableableTicker(i.cfg.HeartbeatPeriod) + heartbeatTickerStop, heartbeatTickerChan := newDisableableTicker(i.cfg.HeartbeatPeriod) defer heartbeatTickerStop() // Mark ourselved as Leaving so no more samples are send to us. @@ -846,6 +845,7 @@ func (i *Lifecycler) processShutdown(ctx context.Context) { } // Sleep so the shutdownDuration metric can be collected. + level.Info(i.logger).Log("msg", "lifecycler entering final sleep before shutdown", "final_sleep", i.cfg.FinalSleep) time.Sleep(i.cfg.FinalSleep) } diff --git a/vendor/github.com/grafana/dskit/ring/model.go b/vendor/github.com/grafana/dskit/ring/model.go index cb2d7c7870..4166d9e6f8 100644 --- a/vendor/github.com/grafana/dskit/ring/model.go +++ b/vendor/github.com/grafana/dskit/ring/model.go @@ -4,6 +4,7 @@ import ( "container/heap" "fmt" "sort" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -173,6 +174,14 @@ func (i *InstanceDesc) IsReady(now time.Time, heartbeatTimeout time.Duration) er // (see resolveConflicts). // // This method is part of memberlist.Mergeable interface, and is only used by gossiping ring. +// +// The receiver must be normalised, that is, the token lists must sorted and not contain +// duplicates. The function guarantees that the receiver will be left in this normalised state, +// so multiple subsequent Merge calls are valid usage. +// +// The Mergeable passed as the parameter does not need to be normalised. +// +// Note: This method modifies d and mergeable to reduce allocations and copies. func (d *Desc) Merge(mergeable memberlist.Mergeable, localCAS bool) (memberlist.Mergeable, error) { return d.mergeWithTime(mergeable, localCAS, time.Now()) } @@ -192,15 +201,21 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now return nil, nil } - thisIngesterMap := buildNormalizedIngestersMap(d) - otherIngesterMap := buildNormalizedIngestersMap(other) + normalizeIngestersMap(other) + + thisIngesterMap := d.Ingesters + otherIngesterMap := other.Ingesters var updated []string + tokensChanged := false for name, oing := range otherIngesterMap { ting := thisIngesterMap[name] // ting.Timestamp will be 0, if there was no such ingester in our version if oing.Timestamp > ting.Timestamp { + if !tokensEqual(ting.Tokens, oing.Tokens) { + tokensChanged = true + } oing.Tokens = append([]uint32(nil), oing.Tokens...) // make a copy of tokens thisIngesterMap[name] = oing updated = append(updated, name) @@ -235,7 +250,7 @@ func (d *Desc) mergeWithTime(mergeable memberlist.Mergeable, localCAS bool, now } // resolveConflicts allocates lot of memory, so if we can avoid it, do that. - if conflictingTokensExist(thisIngesterMap) { + if tokensChanged && conflictingTokensExist(thisIngesterMap) { resolveConflicts(thisIngesterMap) } @@ -261,22 +276,18 @@ func (d *Desc) MergeContent() []string { return result } -// buildNormalizedIngestersMap will do the following: +// normalizeIngestersMap will do the following: // - sorts tokens and removes duplicates (only within single ingester) -// - it doesn't modify input ring -func buildNormalizedIngestersMap(inputRing *Desc) map[string]InstanceDesc { - out := map[string]InstanceDesc{} - +// - modifies the input ring +func normalizeIngestersMap(inputRing *Desc) { // Make sure LEFT ingesters have no tokens for n, ing := range inputRing.Ingesters { if ing.State == LEFT { ing.Tokens = nil + inputRing.Ingesters[n] = ing } - out[n] = ing - } - // Sort tokens, and remove duplicates - for name, ing := range out { + // Sort tokens, and remove duplicates if len(ing.Tokens) == 0 { continue } @@ -297,25 +308,39 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]InstanceDesc { } // write updated value back to map - out[name] = ing + inputRing.Ingesters[n] = ing } - - return out } -func conflictingTokensExist(normalizedIngesters map[string]InstanceDesc) bool { - count := 0 - for _, ing := range normalizedIngesters { - count += len(ing.Tokens) +// tokensEqual checks for equality of two slices. Assumes the slices are sorted. +func tokensEqual(lhs, rhs []uint32) bool { + if len(lhs) != len(rhs) { + return false + } + for i := 0; i < len(lhs); i++ { + if lhs[i] != rhs[i] { + return false + } } + return true +} + +var tokenMapPool = sync.Pool{New: func() interface{} { return make(map[uint32]struct{}) }} - tokensMap := make(map[uint32]bool, count) +func conflictingTokensExist(normalizedIngesters map[string]InstanceDesc) bool { + tokensMap := tokenMapPool.Get().(map[uint32]struct{}) + defer func() { + for k := range tokensMap { + delete(tokensMap, k) + } + tokenMapPool.Put(tokensMap) + }() for _, ing := range normalizedIngesters { for _, t := range ing.Tokens { - if tokensMap[t] { + if _, contains := tokensMap[t]; contains { return true } - tokensMap[t] = true + tokensMap[t] = struct{}{} } } return false diff --git a/vendor/github.com/grafana/dskit/ring/replication_set.go b/vendor/github.com/grafana/dskit/ring/replication_set.go index 47485895a5..461429d6fa 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_set.go +++ b/vendor/github.com/grafana/dskit/ring/replication_set.go @@ -130,6 +130,24 @@ func (r ReplicationSet) GetAddressesWithout(exclude string) []string { // HasReplicationSetChanged returns true if two replications sets are the same (with possibly different timestamps), // false if they differ in any way (number of instances, instance states, tokens, zones, ...). func HasReplicationSetChanged(before, after ReplicationSet) bool { + return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) { + i.Timestamp = 0 + }) +} + +// HasReplicationSetChangedWithoutState returns true if two replications sets +// are the same (with possibly different timestamps and instance states), +// false if they differ in any other way (number of instances, tokens, zones, ...). +func HasReplicationSetChangedWithoutState(before, after ReplicationSet) bool { + return hasReplicationSetChangedExcluding(before, after, func(i *InstanceDesc) { + i.Timestamp = 0 + i.State = PENDING + }) +} + +// Do comparison of replicasets, but apply a function first +// to be able to exclude (reset) some values +func hasReplicationSetChangedExcluding(before, after ReplicationSet, exclude func(*InstanceDesc)) bool { beforeInstances := before.Instances afterInstances := after.Instances @@ -144,9 +162,8 @@ func HasReplicationSetChanged(before, after ReplicationSet) bool { b := beforeInstances[i] a := afterInstances[i] - // Exclude the heartbeat timestamp from the comparison. - b.Timestamp = 0 - a.Timestamp = 0 + exclude(&a) + exclude(&b) if !b.Equal(a) { return true diff --git a/vendor/github.com/grafana/dskit/ring/replication_strategy.go b/vendor/github.com/grafana/dskit/ring/replication_strategy.go index e572cb77a4..44e05a5383 100644 --- a/vendor/github.com/grafana/dskit/ring/replication_strategy.go +++ b/vendor/github.com/grafana/dskit/ring/replication_strategy.go @@ -2,9 +2,8 @@ package ring import ( "fmt" + "strings" "time" - - "github.com/pkg/errors" ) type ReplicationStrategy interface { @@ -40,10 +39,12 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed instances // will cause the whole write to fail. + var unhealthy []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { + unhealthy = append(unhealthy, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } @@ -52,11 +53,15 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati // after filtering out dead ones, don't even bother trying. if len(instances) < minSuccess { var err error + var unhealthyStr string + if len(unhealthy) > 0 { + unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ",")) + } if zoneAwarenessEnabled { - err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances)) + err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d%s", minSuccess, len(instances), unhealthyStr) } else { - err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances)) + err = fmt.Errorf("at least %d live replicas required, could only find %d%s", minSuccess, len(instances), unhealthyStr) } return nil, 0, err @@ -74,17 +79,23 @@ func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []InstanceDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) { now := time.Now() // Filter out unhealthy instances. + var unhealthy []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { + unhealthy = append(unhealthy, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } // We need at least 1 healthy instance no matter what is the replication factor set to. if len(instances) == 0 { - return nil, 0, errors.New("at least 1 healthy replica required, could only find 0") + var unhealthyStr string + if len(unhealthy) > 0 { + unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ",")) + } + return nil, 0, fmt.Errorf("at least 1 healthy replica required, could only find 0%s", unhealthyStr) } return instances, len(instances) - 1, nil diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 63e3a547c4..6aaf165bf9 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -23,24 +23,12 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/flagext" - dsmath "github.com/grafana/dskit/math" + dsmath "github.com/grafana/dskit/internal/math" ) const ( unhealthy = "Unhealthy" - // IngesterRingKey is the key under which we store the ingesters ring in the KVStore. - IngesterRingKey = "ring" - - // RulerRingKey is the key under which we store the rulers ring in the KVStore. - RulerRingKey = "ring" - - // DistributorRingKey is the key under which we store the distributors ring in the KVStore. - DistributorRingKey = "distributor" - - // CompactorRingKey is the key under which we store the compactors ring in the KVStore. - CompactorRingKey = "compactor" - // GetBufferSize is the suggested size of buffers passed to Ring.Get(). It's based on // a typical replication factor 3, plus extra room for a JOINING + LEAVING instance. GetBufferSize = 5 @@ -198,6 +186,7 @@ type Ring struct { totalTokensGauge prometheus.Gauge numTokensGaugeVec *prometheus.GaugeVec oldestTimestampGaugeVec *prometheus.GaugeVec + reportedOwners map[string]struct{} logger log.Logger } @@ -285,21 +274,9 @@ func (r *Ring) starting(ctx context.Context) error { func (r *Ring) loop(ctx context.Context) error { // Update the ring metrics at start of the main loop. - r.updateRingMetrics() - go func() { - // Start metrics update ticker to update the ring metrics. - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - r.updateRingMetrics() - case <-ctx.Done(): - return - } - } - }() + r.mtx.Lock() + r.updateRingMetrics(Different) + r.mtx.Unlock() r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { @@ -334,6 +311,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc + r.updateRingMetrics(rc) r.mtx.Unlock() return } @@ -356,6 +334,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) { // Invalidate all cached subrings. r.shuffledSubringCache = make(map[subringCacheKey]*Ring) } + r.updateRingMetrics(rc) } // Get returns n (or more) instances which form the replicas for the given key. @@ -564,21 +543,16 @@ func (r *Ring) countTokens() (map[string]uint32, map[string]uint32) { return numTokens, owned } -// updateRingMetrics updates ring metrics. -func (r *Ring) updateRingMetrics() { - r.mtx.RLock() - defer r.mtx.RUnlock() - - numTokens, ownedRange := r.countTokens() - for id, totalOwned := range ownedRange { - r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) - r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) +// updateRingMetrics updates ring metrics. Caller must be holding the Write lock! +func (r *Ring) updateRingMetrics(compareResult CompareResult) { + if compareResult == Equal { + return } numByState := map[string]int{} oldestTimestampByState := map[string]int64{} - // Initialised to zero so we emit zero-metrics (instead of not emitting anything) + // Initialized to zero so we emit zero-metrics (instead of not emitting anything) for _, s := range []string{unhealthy, ACTIVE.String(), LEAVING.String(), PENDING.String(), JOINING.String()} { numByState[s] = 0 oldestTimestampByState[s] = 0 @@ -601,6 +575,26 @@ func (r *Ring) updateRingMetrics() { for state, timestamp := range oldestTimestampByState { r.oldestTimestampGaugeVec.WithLabelValues(state).Set(float64(timestamp)) } + + if compareResult == EqualButStatesAndTimestamps { + return + } + + prevOwners := r.reportedOwners + r.reportedOwners = make(map[string]struct{}) + numTokens, ownedRange := r.countTokens() + for id, totalOwned := range ownedRange { + r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) + r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) + delete(prevOwners, id) + r.reportedOwners[id] = struct{}{} + } + + for k := range prevOwners { + r.memberOwnershipGaugeVec.DeleteLabelValues(k) + r.numTokensGaugeVec.DeleteLabelValues(k) + } + r.totalTokensGauge.Set(float64(len(r.ringTokens))) } diff --git a/vendor/github.com/grafana/dskit/ring/ticker.go b/vendor/github.com/grafana/dskit/ring/ticker.go new file mode 100644 index 0000000000..1d854472b8 --- /dev/null +++ b/vendor/github.com/grafana/dskit/ring/ticker.go @@ -0,0 +1,14 @@ +package ring + +import "time" + +// newDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing +// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel. +func newDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { + if interval == 0 { + return func() {}, nil + } + + tick := time.NewTicker(interval) + return func() { tick.Stop() }, tick.C +} diff --git a/vendor/github.com/grafana/dskit/ring/tokens.go b/vendor/github.com/grafana/dskit/ring/tokens.go index 51b7d83091..cf4999ff5d 100644 --- a/vendor/github.com/grafana/dskit/ring/tokens.go +++ b/vendor/github.com/grafana/dskit/ring/tokens.go @@ -3,7 +3,6 @@ package ring import ( "encoding/json" "errors" - "io/ioutil" "os" "sort" ) @@ -72,7 +71,7 @@ func (t Tokens) StoreToFile(tokenFilePath string) error { // LoadTokensFromFile loads tokens from given file path. func LoadTokensFromFile(tokenFilePath string) (Tokens, error) { - b, err := ioutil.ReadFile(tokenFilePath) + b, err := os.ReadFile(tokenFilePath) if err != nil { return nil, err } diff --git a/vendor/github.com/grafana/dskit/ring/util.go b/vendor/github.com/grafana/dskit/ring/util.go index a836aa2fca..b39f2f26e7 100644 --- a/vendor/github.com/grafana/dskit/ring/util.go +++ b/vendor/github.com/grafana/dskit/ring/util.go @@ -96,6 +96,19 @@ func WaitInstanceState(ctx context.Context, r ReadRing, instanceID string, state // WaitRingStability monitors the ring topology for the provided operation and waits until it // keeps stable for at least minStability. func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error { + return waitStability(ctx, r, op, minStability, maxWaiting, HasReplicationSetChanged) +} + +// WaitRingTokensStability waits for the Ring to be unchanged at +// least for minStability time period, excluding transitioning between +// allowed states (e.g. JOINING->ACTIVE if allowed by op). +// This can be used to avoid wasting resources on moving data around +// due to multiple changes in the Ring. +func WaitRingTokensStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration) error { + return waitStability(ctx, r, op, minStability, maxWaiting, HasReplicationSetChangedWithoutState) +} + +func waitStability(ctx context.Context, r *Ring, op Operation, minStability, maxWaiting time.Duration, isChanged func(ReplicationSet, ReplicationSet) bool) error { // Configure the max waiting time as a context deadline. ctx, cancel := context.WithTimeout(ctx, maxWaiting) defer cancel() @@ -117,7 +130,7 @@ func WaitRingStability(ctx context.Context, r *Ring, op Operation, minStability, // replication set which we use to compare with the previous state. currRingState, _ := r.GetAllHealthy(op) // nolint:errcheck - if HasReplicationSetChanged(ringLastState, currRingState) { + if isChanged(ringLastState, currRingState) { ringLastState = currRingState ringLastStateTs = time.Now() } else if time.Since(ringLastStateTs) >= minStability { diff --git a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go index dbe5643526..e5da50bc7a 100644 --- a/vendor/github.com/grafana/dskit/runtimeconfig/manager.go +++ b/vendor/github.com/grafana/dskit/runtimeconfig/manager.go @@ -7,7 +7,7 @@ import ( "flag" "fmt" "io" - "io/ioutil" + "os" "sync" "time" @@ -145,7 +145,7 @@ func (om *Manager) loop(ctx context.Context) error { // loadConfig loads configuration using the loader function, and if successful, // stores it as current configuration and notifies listeners. func (om *Manager) loadConfig() error { - buf, err := ioutil.ReadFile(om.cfg.LoadPath) + buf, err := os.ReadFile(om.cfg.LoadPath) if err != nil { om.configLoadSuccess.Set(0) return errors.Wrap(err, "read file") diff --git a/vendor/github.com/grafana/dskit/time/time.go b/vendor/github.com/grafana/dskit/time/time.go deleted file mode 100644 index 84c698a18e..0000000000 --- a/vendor/github.com/grafana/dskit/time/time.go +++ /dev/null @@ -1,96 +0,0 @@ -package time - -import ( - "math" - "math/rand" - "net/http" - "strconv" - "time" - - "github.com/prometheus/common/model" - "github.com/weaveworks/common/httpgrpc" -) - -const ( - nanosecondsInMillisecond = int64(time.Millisecond / time.Nanosecond) -) - -func ToMillis(t time.Time) int64 { - return t.UnixNano() / nanosecondsInMillisecond -} - -// FromMillis is a helper to turn milliseconds -> time.Time -func FromMillis(ms int64) time.Time { - return time.Unix(0, ms*nanosecondsInMillisecond) -} - -// FormatTimeMillis returns a human readable version of the input time (in milliseconds). -func FormatTimeMillis(ms int64) string { - return FromMillis(ms).String() -} - -// FormatTimeModel returns a human readable version of the input time. -func FormatTimeModel(t model.Time) string { - return FromMillis(int64(t)).String() -} - -// ParseTime parses the string into an int64, milliseconds since epoch. -func ParseTime(s string) (int64, error) { - if t, err := strconv.ParseFloat(s, 64); err == nil { - s, ns := math.Modf(t) - ns = math.Round(ns*1000) / 1000 - tm := time.Unix(int64(s), int64(ns*float64(time.Second))) - return ToMillis(tm), nil - } - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return ToMillis(t), nil - } - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s) -} - -// DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval. -func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { - // No duration? No jitter. - if input == 0 { - return 0 - } - - variance := int64(float64(input) * variancePerc) - if variance == 0 { - // Values too low - return input - } - - jitter := rand.Int63n(variance*2) - variance - - return input + time.Duration(jitter) -} - -// DurationWithPositiveJitter returns random duration from "input" to "input + input*variance" interval. -func DurationWithPositiveJitter(input time.Duration, variancePerc float64) time.Duration { - // No duration? No jitter. - if input == 0 { - return 0 - } - - variance := int64(float64(input) * variancePerc) - if variance == 0 { - // Values too low - return input - } - - jitter := rand.Int63n(variance) - - return input + time.Duration(jitter) -} - -// NewDisableableTicker essentially wraps NewTicker but allows the ticker to be disabled by passing -// zero duration as the interval. Returns a function for stopping the ticker, and the ticker channel. -func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { - if interval == 0 { - return func() {}, nil - } - - tick := time.NewTicker(interval) - return func() { tick.Stop() }, tick.C -} diff --git a/vendor/modules.txt b/vendor/modules.txt index a2e5488392..061c43b088 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -355,23 +355,22 @@ github.com/googleapis/gax-go/v2/apierror/internal/proto # github.com/gorilla/mux v1.8.0 ## explicit github.com/gorilla/mux -# github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1 +# github.com/grafana/dskit v0.0.0-20220105080720-01ce9286d7d5 ## explicit github.com/grafana/dskit/backoff -github.com/grafana/dskit/closer github.com/grafana/dskit/concurrency github.com/grafana/dskit/crypto/tls github.com/grafana/dskit/flagext github.com/grafana/dskit/grpcclient github.com/grafana/dskit/grpcencoding/snappy github.com/grafana/dskit/grpcutil +github.com/grafana/dskit/internal/math github.com/grafana/dskit/kv github.com/grafana/dskit/kv/codec github.com/grafana/dskit/kv/consul github.com/grafana/dskit/kv/etcd github.com/grafana/dskit/kv/memberlist github.com/grafana/dskit/limiter -github.com/grafana/dskit/math github.com/grafana/dskit/middleware github.com/grafana/dskit/modules github.com/grafana/dskit/multierror @@ -382,7 +381,6 @@ github.com/grafana/dskit/ring/util github.com/grafana/dskit/runtimeconfig github.com/grafana/dskit/runutil github.com/grafana/dskit/services -github.com/grafana/dskit/time # github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-middleware # github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7