Skip to content

Commit dac838b

Browse files
authored
Fix Ingester timeouts (#673)
* Set timeout for ingester Push() calls * Remove now-unused timeout parameter from ingester Dial - The semantics of Dial and grpc.WithTimeout() have changed since this was written * Add a timeout in ruler for each rule-group evaluation
1 parent 65e5ed1 commit dac838b

File tree

7 files changed

+22
-11
lines changed

7 files changed

+22
-11
lines changed

pkg/distributor/distributor.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type Config struct {
7979
CompressToIngester bool
8080

8181
// for testing
82-
ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error)
82+
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
8383
}
8484

8585
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -224,7 +224,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester
224224
return client, nil
225225
}
226226

227-
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout, d.cfg.CompressToIngester)
227+
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester)
228228
if err != nil {
229229
return nil, err
230230
}
@@ -347,6 +347,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
347347
done: make(chan struct{}),
348348
err: make(chan error),
349349
}
350+
ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout)
351+
defer cancel() // cancel the timeout to release resources
350352
for ingester, samples := range samplesByIngester {
351353
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
352354
d.sendSamples(ctx, ingester, samples, &pushTracker)

pkg/distributor/distributor_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) {
165165
IngestionRateLimit: 10000,
166166
IngestionBurstSize: 10000,
167167

168-
ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) {
168+
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
169169
return ingesters[addr], nil
170170
},
171171
}, ring)
@@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) {
305305
IngestionRateLimit: 10000,
306306
IngestionBurstSize: 10000,
307307

308-
ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) {
308+
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
309309
return ingesters[addr], nil
310310
},
311311
}, ring)

pkg/ingester/client/client.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package client
22

33
import (
4-
"time"
5-
64
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
75
"github.com/mwitkow/go-grpc-middleware"
86
"github.com/opentracing/opentracing-go"
@@ -18,8 +16,8 @@ type closableIngesterClient struct {
1816
}
1917

2018
// MakeIngesterClient makes a new IngesterClient
21-
func MakeIngesterClient(addr string, timeout time.Duration, withCompression bool) (IngesterClient, error) {
22-
opts := []grpc.DialOption{grpc.WithTimeout(timeout),
19+
func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) {
20+
opts := []grpc.DialOption{
2321
grpc.WithInsecure(),
2422
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
2523
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),

pkg/ingester/ingester.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ type Config struct {
9090
infName string
9191
id string
9292
skipUnregister bool
93-
ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error)
93+
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
9494
KVClient ring.KVClient
9595
}
9696

pkg/ingester/ingester_lifecycle.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error {
359359
}
360360

361361
level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr)
362-
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.SearchPendingFor, false)
362+
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false)
363363
if err != nil {
364364
return err
365365
}

pkg/ingester/ingester_lifecycle_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) {
118118
require.NoError(t, err)
119119

120120
// Let ing2 send chunks to ing1
121-
ing1.cfg.ingesterClientFactory = func(addr string, timeout time.Duration, _ bool) (client.IngesterClient, error) {
121+
ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) {
122122
return ingesterClientAdapater{
123123
ingester: ing2,
124124
}, nil

pkg/ruler/ruler.go

+11
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ type Config struct {
8787
NotificationQueueCapacity int
8888
// HTTP timeout duration when sending notifications to the Alertmanager.
8989
NotificationTimeout time.Duration
90+
// Timeout for rule group evaluation, including sending result to ingester
91+
GroupTimeout time.Duration
9092
}
9193

9294
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -100,6 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
100102
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing alertmanager hosts.")
101103
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
102104
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
105+
f.DurationVar(&cfg.GroupTimeout, "ruler.group-timeout", 10*time.Second, "Timeout for rule group evaluation, including sending result to ingester")
103106
}
104107

105108
// Ruler evaluates rules.
@@ -109,6 +112,7 @@ type Ruler struct {
109112
alertURL *url.URL
110113
notifierCfg *config.Config
111114
queueCapacity int
115+
groupTimeout time.Duration
112116

113117
// Per-user notifiers with separate queues.
114118
notifiersMtx sync.Mutex
@@ -191,6 +195,7 @@ func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, e
191195
notifierCfg: ncfg,
192196
queueCapacity: cfg.NotificationQueueCapacity,
193197
notifiers: map[string]*rulerNotifier{},
198+
groupTimeout: cfg.GroupTimeout,
194199
}, nil
195200
}
196201

@@ -352,12 +357,18 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
352357
logger := util.WithContext(ctx, util.Logger)
353358
level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(rs))
354359
start := time.Now()
360+
ctx, cancelTimeout := context.WithTimeout(ctx, r.groupTimeout)
355361
g, err := r.newGroup(ctx, rs)
356362
if err != nil {
357363
level.Error(logger).Log("msg", "failed to create rule group", "err", err)
358364
return
359365
}
360366
g.Eval(ctx, start)
367+
if err := ctx.Err(); err == nil {
368+
cancelTimeout() // release resources
369+
} else {
370+
level.Warn(util.Logger).Log("msg", "context error", "error", err)
371+
}
361372

362373
// The prometheus routines we're calling have their own instrumentation
363374
// but, a) it's rule-based, not group-based, b) it's a summary, not a

0 commit comments

Comments
 (0)