From 7ab6b9724625ce3444ba977eea6be03083144109 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 27 Jan 2018 23:26:17 +0000 Subject: [PATCH 1/3] Remove pointless timeout parameter from ingester Dial grpc.WithTimeout() is documented to only do anything if grpc.WithBlock() also supplied, and we do not supply that. Even then, it puts a timeout on the Dial() operation, which is not what we want. --- pkg/distributor/distributor.go | 4 ++-- pkg/distributor/distributor_test.go | 4 ++-- pkg/ingester/client/client.go | 6 ++---- pkg/ingester/ingester.go | 2 +- pkg/ingester/ingester_lifecycle.go | 2 +- pkg/ingester/ingester_lifecycle_test.go | 2 +- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 96882ff2c1b..7c6fa6380f0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -79,7 +79,7 @@ type Config struct { CompressToIngester bool // for testing - ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error) + ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error) } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -224,7 +224,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester return client, nil } - client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout, d.cfg.CompressToIngester) + client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester) if err != nil { return nil, err } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index eacb3ae5a10..2db97ba023f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) { + ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) { return ingesters[addr], nil }, }, ring) @@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) { IngestionRateLimit: 10000, IngestionBurstSize: 10000, - ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) { + ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) { return ingesters[addr], nil }, }, ring) diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index eba79b76041..9dc8d3fbd54 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -1,8 +1,6 @@ package client import ( - "time" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" "github.com/mwitkow/go-grpc-middleware" "github.com/opentracing/opentracing-go" @@ -18,8 +16,8 @@ type closableIngesterClient struct { } // MakeIngesterClient makes a new IngesterClient -func MakeIngesterClient(addr string, timeout time.Duration, withCompression bool) (IngesterClient, error) { - opts := []grpc.DialOption{grpc.WithTimeout(timeout), +func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) { + opts := []grpc.DialOption{ grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()), diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9085c8116ab..54e1a7384b5 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -90,7 +90,7 @@ type Config struct { infName string id string skipUnregister bool - ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error) + ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error) KVClient ring.KVClient } diff --git a/pkg/ingester/ingester_lifecycle.go b/pkg/ingester/ingester_lifecycle.go index b13e02bbfb6..da6bf0dde7d 100644 --- a/pkg/ingester/ingester_lifecycle.go +++ b/pkg/ingester/ingester_lifecycle.go @@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error { } level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr) - c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.SearchPendingFor, false) + c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false) if err != nil { return err } diff --git a/pkg/ingester/ingester_lifecycle_test.go b/pkg/ingester/ingester_lifecycle_test.go index 5641a93cf5c..473b642c357 100644 --- a/pkg/ingester/ingester_lifecycle_test.go +++ b/pkg/ingester/ingester_lifecycle_test.go @@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) { require.NoError(t, err) // Let ing2 send chunks to ing1 - ing1.cfg.ingesterClientFactory = func(addr string, timeout time.Duration, _ bool) (client.IngesterClient, error) { + ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) { return ingesterClientAdapater{ ingester: ing2, }, nil From fe0d7de6b1621e906780623d82c2ac3ca22fb7fb Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 27 Jan 2018 23:25:46 +0000 Subject: [PATCH 2/3] Set timeout for ingester Push() calls --- pkg/distributor/distributor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7c6fa6380f0..4fb3ddda91e 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -347,6 +347,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie done: make(chan struct{}), err: make(chan error), } + ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout) + defer cancel() // cancel the timeout to release resources for ingester, samples := range samplesByIngester { go func(ingester *ring.IngesterDesc, samples []*sampleTracker) { d.sendSamples(ctx, ingester, samples, &pushTracker) From bb4a1f082e4f4c0e5222796028f48683f68e9bb9 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 27 Jan 2018 23:09:40 +0000 Subject: [PATCH 3/3] Add a timeout in ruler for each rule-group evaluation --- pkg/ruler/ruler.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index a651230ca3d..64ef72b27cb 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -87,6 +87,8 @@ type Config struct { NotificationQueueCapacity int // HTTP timeout duration when sending notifications to the Alertmanager. NotificationTimeout time.Duration + // Timeout for rule group evaluation, including sending result to ingester + GroupTimeout time.Duration } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -100,6 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing alertmanager hosts.") f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.") f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.") + f.DurationVar(&cfg.GroupTimeout, "ruler.group-timeout", 10*time.Second, "Timeout for rule group evaluation, including sending result to ingester") } // Ruler evaluates rules. @@ -109,6 +112,7 @@ type Ruler struct { alertURL *url.URL notifierCfg *config.Config queueCapacity int + groupTimeout time.Duration // Per-user notifiers with separate queues. notifiersMtx sync.Mutex @@ -191,6 +195,7 @@ func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, e notifierCfg: ncfg, queueCapacity: cfg.NotificationQueueCapacity, notifiers: map[string]*rulerNotifier{}, + groupTimeout: cfg.GroupTimeout, }, nil } @@ -352,12 +357,18 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) { logger := util.WithContext(ctx, util.Logger) level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(rs)) start := time.Now() + ctx, cancelTimeout := context.WithTimeout(ctx, r.groupTimeout) g, err := r.newGroup(ctx, rs) if err != nil { level.Error(logger).Log("msg", "failed to create rule group", "err", err) return } g.Eval(ctx, start) + if err := ctx.Err(); err == nil { + cancelTimeout() // release resources + } else { + level.Warn(util.Logger).Log("msg", "context error", "error", err) + } // The prometheus routines we're calling have their own instrumentation // but, a) it's rule-based, not group-based, b) it's a summary, not a