Skip to content

Metrics for Dynamo capacity #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions chunk/dynamo_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package chunk

import (
"net/url"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
)

const (
readLabel = "read"
writeLabel = "write"
)

type dynamoWatcher struct {
accountMaxCapacity *prometheus.GaugeVec
tableCapacity *prometheus.GaugeVec

dynamoDB *dynamodb.DynamoDB
tableName string

updateInterval time.Duration
quit chan struct{}
wait sync.WaitGroup
}

// Watcher watches something and reports to Prometheus.
type Watcher interface {
Stop()
prometheus.Collector
}

// WatchDynamo watches Dynamo and reports on resource limits.
func WatchDynamo(dynamoDBURL string, interval time.Duration) (Watcher, error) {
url, err := url.Parse(dynamoDBURL)
if err != nil {
return nil, err
}
dynamoDBConfig, err := awsConfigFromURL(url)
if err != nil {
return nil, err
}
client := dynamodb.New(session.New(dynamoDBConfig))

tableName := strings.TrimPrefix(url.Path, "/")
w := &dynamoWatcher{
accountMaxCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_account_max_capacity_units",
Help: "Account-wide DynamoDB capacity, measured in DynamoDB capacity units.",
}, []string{"op"}),
tableCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_table_capacity_units",
Help: "Per-table DynamoDB capacity, measured in DynamoDB capacity units.",
}, []string{"op", "table"}),
dynamoDB: client,
tableName: tableName,
updateInterval: interval,
quit: make(chan struct{}),
}
go w.updateLoop()
return w, nil
}

// Stop stops the dynamo watcher.
func (w *dynamoWatcher) Stop() {
close(w.quit)
w.wait.Wait()
}

func (w *dynamoWatcher) updateLoop() {
defer w.wait.Done()
ticker := time.NewTicker(w.updateInterval)
for {
select {
case <-ticker.C:
log.Debugf("Updating limits from dynamo")
err := w.updateAccountLimits()
if err != nil {
// TODO: Back off if err is throttling related.
log.Warnf("Could not fetch account limits from dynamo: %v", err)
}
err = w.updateTableLimits()
if err != nil {
log.Warnf("Could not fetch table limits from dynamo: %v", err)
}
case <-w.quit:
ticker.Stop()
}
}
}

func (w *dynamoWatcher) updateAccountLimits() error {
limits, err := w.dynamoDB.DescribeLimits(&dynamodb.DescribeLimitsInput{})
if err != nil {
return err
}
w.accountMaxCapacity.WithLabelValues(readLabel).Set(float64(*limits.AccountMaxReadCapacityUnits))
w.accountMaxCapacity.WithLabelValues(writeLabel).Set(float64(*limits.AccountMaxWriteCapacityUnits))
return nil
}

func (w *dynamoWatcher) updateTableLimits() error {
output, err := w.dynamoDB.DescribeTable(&dynamodb.DescribeTableInput{
TableName: &w.tableName,
})
if err != nil {
return err
}
throughput := output.Table.ProvisionedThroughput
w.tableCapacity.WithLabelValues(readLabel, w.tableName).Set(float64(*throughput.ReadCapacityUnits))
w.tableCapacity.WithLabelValues(writeLabel, w.tableName).Set(float64(*throughput.WriteCapacityUnits))
return nil
}

// Describe implements prometheus.Collector.
func (w *dynamoWatcher) Describe(ch chan<- *prometheus.Desc) {
w.accountMaxCapacity.Describe(ch)
w.tableCapacity.Describe(ch)
}

// Collect implements prometheus.Collector.
func (w *dynamoWatcher) Collect(ch chan<- prometheus.Metric) {
w.accountMaxCapacity.Collect(ch)
w.tableCapacity.Collect(ch)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use a global gauge for this; this style is very verbose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you & @juliusv please sort this out between yourselves? I don't have a strong preference either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I favor encapsulated metrics that don't poop all over your global package namespace (was annoying in the past when reusing a package, especially when they also register themselves globally, but then aren't even set properly or aren't always relevant for the user of a package), but I don't care enough to fight about it here, and reuse of Cortex packages is less likely. Do it any way you guys prefer, you'll have to live with it longer :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly favour global metrics, mainly as its less code, and its harder to forget to register them! (140c822)

I seem to remember something about custom registries? Could each component have its own registry, and can you register and registry to another registry?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS maybe lets not let this discussion hold up this PR

13 changes: 12 additions & 1 deletion cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type cfg struct {
s3URL string
dynamodbURL string
dynamodbCreateTables bool
dynamodbPollInterval time.Duration
memcachedHostname string
memcachedTimeout time.Duration
memcachedExpiration time.Duration
Expand All @@ -83,6 +84,7 @@ func main() {
flag.StringVar(&cfg.s3URL, "s3.url", "localhost:4569", "S3 endpoint URL.")
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.")
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
flag.DurationVar(&cfg.memcachedTimeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
flag.DurationVar(&cfg.memcachedExpiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
Expand All @@ -102,6 +104,15 @@ func main() {
if err != nil {
log.Fatalf("Error initializing chunk store: %v", err)
}
if cfg.dynamodbPollInterval < 1*time.Minute {
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
}
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
if err != nil {
log.Fatalf("Error initializing DynamoDB watcher: %v", err)
}
defer resourceWatcher.Stop()
prometheus.MustRegister(resourceWatcher)

consul, err := ring.NewConsulClient(cfg.consulHost)
if err != nil {
Expand Down Expand Up @@ -234,7 +245,7 @@ func setupQuerier(

engine := promql.NewEngine(queryable, nil)

api := v1.NewAPI(engine, querier.DummyStorage{queryable})
api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable})
router := route.New(func(r *http.Request) (context.Context, error) {
userID := r.Header.Get(userIDHeaderName)
if r.Method != "OPTIONS" && userID == "" {
Expand Down