Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra
type HealthAndIngesterClient interface {
IngesterClient
grpc_health_v1.HealthClient
Close() error
}

type closableHealthAndIngesterClient struct {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Config struct {
LifecyclerConfig ring.LifecyclerConfig

// Config for transferring chunks.
SearchPendingFor time.Duration
MaxTransferRetries int

// Config for chunk flushing.
FlushCheckPeriod time.Duration
Expand All @@ -101,7 +101,7 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)

f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.")
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.")
Expand All @@ -112,6 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")

// DEPRECATED, no-op
f.Duration("ingester.search-pending-for", 30*time.Second, "DEPRECATED. Time to spend searching for a pending ingester when shutting down.")
f.Bool("ingester.reject-old-samples", false, "DEPRECATED. Reject old samples.")
f.Duration("ingester.reject-old-samples.max-age", 0, "DEPRECATED. Maximum accepted sample age before rejecting.")
f.Int("ingester.validation.max-length-label-name", 0, "DEPRECATED. Maximum length accepted for label names.")
Expand Down
7 changes: 5 additions & 2 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func defaultIngesterTestConfig() Config {
cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0)
cfg.LifecyclerConfig.Addr = "localhost"
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
return cfg
}

Expand Down Expand Up @@ -95,7 +96,6 @@ func TestIngesterTransfer(t *testing.T) {
cfg1.LifecyclerConfig.Addr = "ingester1"
cfg1.LifecyclerConfig.ClaimOnRollout = true
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
cfg1.SearchPendingFor = 1 * time.Second
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -177,7 +177,6 @@ func TestIngesterBadTransfer(t *testing.T) {
cfg.LifecyclerConfig.Addr = "ingester1"
cfg.LifecyclerConfig.ClaimOnRollout = true
cfg.LifecyclerConfig.JoinAfter = 100 * time.Second
cfg.SearchPendingFor = 1 * time.Second
ing, err := New(cfg, defaultClientTestConfig(), limits, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -291,6 +290,10 @@ func (i ingesterClientAdapater) Close() error {
return nil
}

func (i ingesterClientAdapater) Check(ctx context.Context, in *grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
return nil, nil
}

// TestIngesterFlush tries to test that the ingester flushes chunks before
// removing itself from the ring.
func TestIngesterFlush(t *testing.T) {
Expand Down
69 changes: 34 additions & 35 deletions pkg/ingester/transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"

Expand All @@ -19,10 +20,6 @@ import (
"github.com/weaveworks/common/user"
)

const (
pendingSearchIterations = 10
)

var (
sentChunks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_sent_chunks",
Expand Down Expand Up @@ -187,6 +184,26 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) {
// TransferOut finds an ingester in PENDING state and transfers our chunks to it.
// Called as part of the ingester shutdown process.
func (i *Ingester) TransferOut(ctx context.Context) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
MaxRetries: i.cfg.MaxTransferRetries,
})

for backoff.Ongoing() {
err := i.transferOut(ctx)
if err == nil {
return nil
}

level.Error(util.Logger).Log("msg", "transfer failed", "err", err)
backoff.Wait()
}

return backoff.Err()
}

func (i *Ingester) transferOut(ctx context.Context) error {
userStatesCopy := i.userStates.cp()
if len(userStatesCopy) == 0 {
level.Info(util.Logger).Log("msg", "nothing to transfer")
Expand All @@ -203,12 +220,12 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
if err != nil {
return err
}
defer c.(io.Closer).Close()
defer c.Close()

ctx = user.InjectOrgID(ctx, "-1")
stream, err := c.TransferChunks(ctx)
if err != nil {
return err
return errors.Wrap(err, "TransferChunks")
}

for userID, state := range userStatesCopy {
Expand All @@ -223,7 +240,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
chunks, err := toWireChunks(pair.series.chunkDescs)
if err != nil {
state.fpLocker.Unlock(pair.fp)
return err
return errors.Wrap(err, "toWireChunks")
}

err = stream.Send(&client.TimeSeriesChunk{
Expand All @@ -234,7 +251,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error {
})
state.fpLocker.Unlock(pair.fp)
if err != nil {
return err
return errors.Wrap(err, "Send")
}

sentChunks.Add(float64(len(chunks)))
Expand All @@ -243,7 +260,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error {

_, err = stream.CloseAndRecv()
if err != nil {
return err
return errors.Wrap(err, "CloseAndRecv")
}

// Close & empty all the flush queues, to unblock waiting workers.
Expand All @@ -258,33 +275,15 @@ func (i *Ingester) TransferOut(ctx context.Context) error {

// findTargetIngester finds an ingester in PENDING state.
func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) {
findIngester := func() (*ring.IngesterDesc, error) {
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return nil, err
}

ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING)
if len(ingesters) <= 0 {
return nil, fmt.Errorf("no pending ingesters")
}

return &ingesters[0], nil
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return nil, err
}

deadline := time.Now().Add(i.cfg.SearchPendingFor)
for {
ingester, err := findIngester()
if err != nil {
level.Debug(util.Logger).Log("msg", "Error looking for pending ingester", "err", err)
if time.Now().Before(deadline) {
time.Sleep(i.cfg.SearchPendingFor / pendingSearchIterations)
continue
} else {
level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline", "err", err)
return nil, err
}
}
return ingester, nil
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING)
if len(ingesters) <= 0 {
return nil, fmt.Errorf("no pending ingesters")
}

return &ingesters[0], nil
}
15 changes: 15 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ var (
Name: "cortex_ingester_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
})
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_shutdown_duration_seconds",
Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
}, []string{"op", "status"})
)

// LifecyclerConfig is the config to build a Lifecycler.
Expand All @@ -45,6 +50,7 @@ type LifecyclerConfig struct {
ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"`
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address"`
Expand All @@ -63,6 +69,7 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MinReadyDuration, "ingester.min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.")
f.BoolVar(&cfg.NormaliseTokens, "ingester.normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
f.DurationVar(&cfg.FinalSleep, "ingester.final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -454,16 +461,24 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error
func (i *Lifecycler) processShutdown(ctx context.Context) {
flushRequired := true
if i.cfg.ClaimOnRollout {
transferStart := time.Now()
if err := i.flushTransferer.TransferOut(ctx); err != nil {
level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err)
shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds())
} else {
flushRequired = false
shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds())
}
}

if flushRequired {
flushStart := time.Now()
i.flushTransferer.Flush()
shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds())
}

// Sleep so the shutdownDuration metric can be collected.
time.Sleep(i.cfg.FinalSleep)
}

// unregister removes our entry from consul.
Expand Down
2 changes: 2 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestRingNormaliseMigration(t *testing.T) {
lifecyclerConfig1.NumTokens = 1
lifecyclerConfig1.ClaimOnRollout = true
lifecyclerConfig1.ID = "ing1"
lifecyclerConfig1.FinalSleep = 0

ft := &flushTransferer{}
l1, err := NewLifecycler(lifecyclerConfig1, ft)
Expand All @@ -67,6 +68,7 @@ func TestRingNormaliseMigration(t *testing.T) {
lifecyclerConfig2.JoinAfter = 100 * time.Second
lifecyclerConfig2.NormaliseTokens = true
lifecyclerConfig2.ID = "ing2"
lifecyclerConfig1.FinalSleep = 0

l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{})
require.NoError(t, err)
Expand Down