Skip to content

Fix Ingester timeouts #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Config struct {
CompressToIngester bool

// for testing
ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error)
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand Down Expand Up @@ -224,7 +224,7 @@ func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (client.Ingester
return client, nil
}

client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.RemoteTimeout, d.cfg.CompressToIngester)
client, err := d.cfg.ingesterClientFactory(ingester.Addr, d.cfg.CompressToIngester)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -347,6 +347,8 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
done: make(chan struct{}),
err: make(chan error),
}
ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout)
defer cancel() // cancel the timeout to release resources
for ingester, samples := range samplesByIngester {
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
d.sendSamples(ctx, ingester, samples, &pushTracker)
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestDistributorPush(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) {
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
Expand Down Expand Up @@ -305,7 +305,7 @@ func TestDistributorQuery(t *testing.T) {
IngestionRateLimit: 10000,
IngestionBurstSize: 10000,

ingesterClientFactory: func(addr string, _ time.Duration, _ bool) (client.IngesterClient, error) {
ingesterClientFactory: func(addr string, _ bool) (client.IngesterClient, error) {
return ingesters[addr], nil
},
}, ring)
Expand Down
6 changes: 2 additions & 4 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package client

import (
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing/opentracing-go"
Expand All @@ -18,8 +16,8 @@ type closableIngesterClient struct {
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(addr string, timeout time.Duration, withCompression bool) (IngesterClient, error) {
opts := []grpc.DialOption{grpc.WithTimeout(timeout),
func MakeIngesterClient(addr string, withCompression bool) (IngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Config struct {
infName string
id string
skipUnregister bool
ingesterClientFactory func(addr string, timeout time.Duration, withCompression bool) (client.IngesterClient, error)
ingesterClientFactory func(addr string, withCompression bool) (client.IngesterClient, error)
KVClient ring.KVClient
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (i *Ingester) transferChunks() error {
}

level.Info(util.Logger).Log("msg", "sending chunks to ingester", "ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.cfg.SearchPendingFor, false)
c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, false)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestIngesterTransfer(t *testing.T) {
require.NoError(t, err)

// Let ing2 send chunks to ing1
ing1.cfg.ingesterClientFactory = func(addr string, timeout time.Duration, _ bool) (client.IngesterClient, error) {
ing1.cfg.ingesterClientFactory = func(addr string, _ bool) (client.IngesterClient, error) {
return ingesterClientAdapater{
ingester: ing2,
}, nil
Expand Down
11 changes: 11 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type Config struct {
NotificationQueueCapacity int
// HTTP timeout duration when sending notifications to the Alertmanager.
NotificationTimeout time.Duration
// Timeout for rule group evaluation, including sending result to ingester
GroupTimeout time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -100,6 +102,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.AlertmanagerRefreshInterval, "ruler.alertmanager-refresh-interval", 1*time.Minute, "How long to wait between refreshing alertmanager hosts.")
f.IntVar(&cfg.NotificationQueueCapacity, "ruler.notification-queue-capacity", 10000, "Capacity of the queue for notifications to be sent to the Alertmanager.")
f.DurationVar(&cfg.NotificationTimeout, "ruler.notification-timeout", 10*time.Second, "HTTP timeout duration when sending notifications to the Alertmanager.")
f.DurationVar(&cfg.GroupTimeout, "ruler.group-timeout", 10*time.Second, "Timeout for rule group evaluation, including sending result to ingester")
}

// Ruler evaluates rules.
Expand All @@ -109,6 +112,7 @@ type Ruler struct {
alertURL *url.URL
notifierCfg *config.Config
queueCapacity int
groupTimeout time.Duration

// Per-user notifiers with separate queues.
notifiersMtx sync.Mutex
Expand Down Expand Up @@ -191,6 +195,7 @@ func NewRuler(cfg Config, d *distributor.Distributor, c *chunk.Store) (*Ruler, e
notifierCfg: ncfg,
queueCapacity: cfg.NotificationQueueCapacity,
notifiers: map[string]*rulerNotifier{},
groupTimeout: cfg.GroupTimeout,
}, nil
}

Expand Down Expand Up @@ -352,12 +357,18 @@ func (r *Ruler) Evaluate(ctx context.Context, rs []rules.Rule) {
logger := util.WithContext(ctx, util.Logger)
level.Debug(logger).Log("msg", "evaluating rules...", "num_rules", len(rs))
start := time.Now()
ctx, cancelTimeout := context.WithTimeout(ctx, r.groupTimeout)
g, err := r.newGroup(ctx, rs)
if err != nil {
level.Error(logger).Log("msg", "failed to create rule group", "err", err)
return
}
g.Eval(ctx, start)
if err := ctx.Err(); err == nil {
cancelTimeout() // release resources
} else {
level.Warn(util.Logger).Log("msg", "context error", "error", err)
}

// The prometheus routines we're calling have their own instrumentation
// but, a) it's rule-based, not group-based, b) it's a summary, not a
Expand Down