Skip to content

Commit 9c97fb0

Browse files
committed
Review feedback.
Signed-off-by: Tom Wilkie <[email protected]>
1 parent 7d27cf1 commit 9c97fb0

File tree

4 files changed

+12
-48
lines changed

4 files changed

+12
-48
lines changed

pkg/ingester/ingester.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ type Config struct {
8080
LifecyclerConfig ring.LifecyclerConfig
8181

8282
// Config for transferring chunks.
83-
SearchPendingFor time.Duration
8483
MaxTransferRetries int
8584

8685
// Config for chunk flushing.
@@ -102,8 +101,7 @@ type Config struct {
102101
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
103102
cfg.LifecyclerConfig.RegisterFlags(f)
104103

105-
f.DurationVar(&cfg.SearchPendingFor, "ingester.search-pending-for", 30*time.Second, "Time to spend searching for a pending ingester when shutting down.")
106-
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 5, "Number of times to try and transfer chunks before falling back to flushing.")
104+
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
107105
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.")
108106
f.DurationVar(&cfg.RetainPeriod, "ingester.retain-period", 5*time.Minute, "Period chunks will remain in memory after flushing.")
109107
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 1*time.Minute, "Timeout for individual flush operations.")
@@ -114,6 +112,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
114112
f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
115113

116114
// DEPRECATED, no-op
115+
f.Duration("ingester.search-pending-for", 30*time.Second, "DEPRECATED. Time to spend searching for a pending ingester when shutting down.")
117116
f.Bool("ingester.reject-old-samples", false, "DEPRECATED. Reject old samples.")
118117
f.Duration("ingester.reject-old-samples.max-age", 0, "DEPRECATED. Maximum accepted sample age before rejecting.")
119118
f.Int("ingester.validation.max-length-label-name", 0, "DEPRECATED. Maximum length accepted for label names.")

pkg/ingester/lifecycle_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ func TestIngesterTransfer(t *testing.T) {
9696
cfg1.LifecyclerConfig.Addr = "ingester1"
9797
cfg1.LifecyclerConfig.ClaimOnRollout = true
9898
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
99-
cfg1.SearchPendingFor = 1 * time.Second
10099
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil)
101100
require.NoError(t, err)
102101

@@ -178,7 +177,6 @@ func TestIngesterBadTransfer(t *testing.T) {
178177
cfg.LifecyclerConfig.Addr = "ingester1"
179178
cfg.LifecyclerConfig.ClaimOnRollout = true
180179
cfg.LifecyclerConfig.JoinAfter = 100 * time.Second
181-
cfg.SearchPendingFor = 1 * time.Second
182180
ing, err := New(cfg, defaultClientTestConfig(), limits, nil)
183181
require.NoError(t, err)
184182

pkg/ingester/transfer.go

Lines changed: 9 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ import (
2020
"github.com/weaveworks/common/user"
2121
)
2222

23-
const (
24-
pendingSearchIterations = 10
25-
)
26-
2723
var (
2824
sentChunks = prometheus.NewCounter(prometheus.CounterOpts{
2925
Name: "cortex_ingester_sent_chunks",
@@ -190,7 +186,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) {
190186
func (i *Ingester) TransferOut(ctx context.Context) error {
191187
backoff := util.NewBackoff(ctx, util.BackoffConfig{
192188
MinBackoff: 100 * time.Millisecond,
193-
MaxBackoff: 1 * time.Second,
189+
MaxBackoff: 5 * time.Second,
194190
MaxRetries: i.cfg.MaxTransferRetries,
195191
})
196192

@@ -279,44 +275,15 @@ func (i *Ingester) transferOut(ctx context.Context) error {
279275

280276
// findTargetIngester finds an ingester in PENDING state.
281277
func (i *Ingester) findTargetIngester(ctx context.Context) (*ring.IngesterDesc, error) {
282-
findIngester := func(ctx context.Context) (*ring.IngesterDesc, error) {
283-
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
284-
if err != nil {
285-
return nil, err
286-
}
287-
288-
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING)
289-
if len(ingesters) <= 0 {
290-
return nil, fmt.Errorf("no pending ingesters")
291-
}
292-
293-
return &ingesters[0], nil
278+
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
279+
if err != nil {
280+
return nil, err
294281
}
295282

296-
deadline := time.NewTimer(i.cfg.SearchPendingFor)
297-
defer deadline.Stop()
298-
299-
ticker := time.NewTicker(i.cfg.SearchPendingFor / pendingSearchIterations)
300-
defer ticker.Stop()
301-
302-
for {
303-
select {
304-
case <-ticker.C:
305-
ctx, cancel := context.WithTimeout(ctx, i.cfg.SearchPendingFor/pendingSearchIterations)
306-
ingester, err := findIngester(ctx)
307-
cancel()
308-
if err != nil {
309-
level.Warn(util.Logger).Log("msg", "Error looking for pending ingester", "err", err)
310-
continue
311-
}
312-
return ingester, nil
313-
314-
case <-deadline.C:
315-
level.Warn(util.Logger).Log("msg", "Could not find pending ingester before deadline")
316-
return nil, fmt.Errorf("could not find pending ingester before deadline")
317-
318-
case <-ctx.Done():
319-
return nil, ctx.Err()
320-
}
283+
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING)
284+
if len(ingesters) <= 0 {
285+
return nil, fmt.Errorf("no pending ingesters")
321286
}
287+
288+
return &ingesters[0], nil
322289
}

pkg/ring/lifecycler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var (
3333
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
3434
Name: "cortex_shutdown_duration_seconds",
3535
Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).",
36-
Buckets: prometheus.DefBuckets,
36+
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
3737
}, []string{"op", "status"})
3838
)
3939

0 commit comments

Comments
 (0)