Skip to content

Fix up DynamoDB Watcher #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 2 additions & 25 deletions chunk/dynamo_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ const (
)

type dynamoWatcher struct {
accountMaxCapacity *prometheus.GaugeVec
tableCapacity *prometheus.GaugeVec
tableCapacity *prometheus.GaugeVec

dynamoDB *dynamodb.DynamoDB
tableName string
Expand Down Expand Up @@ -49,11 +48,6 @@ func WatchDynamo(dynamoDBURL string, interval time.Duration) (Watcher, error) {

tableName := strings.TrimPrefix(url.Path, "/")
w := &dynamoWatcher{
accountMaxCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_account_max_capacity_units",
Help: "Account-wide DynamoDB capacity, measured in DynamoDB capacity units.",
}, []string{"op"}),
tableCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "dynamo_table_capacity_units",
Expand Down Expand Up @@ -81,12 +75,7 @@ func (w *dynamoWatcher) updateLoop() {
select {
case <-ticker.C:
log.Debugf("Updating limits from dynamo")
err := w.updateAccountLimits()
if err != nil {
// TODO: Back off if err is throttling related.
log.Warnf("Could not fetch account limits from dynamo: %v", err)
}
err = w.updateTableLimits()
err := w.updateTableLimits()
if err != nil {
log.Warnf("Could not fetch table limits from dynamo: %v", err)
}
Expand All @@ -96,16 +85,6 @@ func (w *dynamoWatcher) updateLoop() {
}
}

func (w *dynamoWatcher) updateAccountLimits() error {
limits, err := w.dynamoDB.DescribeLimits(&dynamodb.DescribeLimitsInput{})
if err != nil {
return err
}
w.accountMaxCapacity.WithLabelValues(readLabel).Set(float64(*limits.AccountMaxReadCapacityUnits))
w.accountMaxCapacity.WithLabelValues(writeLabel).Set(float64(*limits.AccountMaxWriteCapacityUnits))
return nil
}

func (w *dynamoWatcher) updateTableLimits() error {
output, err := w.dynamoDB.DescribeTable(&dynamodb.DescribeTableInput{
TableName: &w.tableName,
Expand All @@ -121,12 +100,10 @@ func (w *dynamoWatcher) updateTableLimits() error {

// Describe implements prometheus.Collector.
func (w *dynamoWatcher) Describe(ch chan<- *prometheus.Desc) {
w.accountMaxCapacity.Describe(ch)
w.tableCapacity.Describe(ch)
}

// Collect implements prometheus.Collector.
func (w *dynamoWatcher) Collect(ch chan<- prometheus.Metric) {
w.accountMaxCapacity.Collect(ch)
w.tableCapacity.Collect(ch)
}
15 changes: 10 additions & 5 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type cfg struct {
remoteTimeout time.Duration
numTokens int
logSuccess bool
watchDynamo bool

ingesterConfig ingester.Config
distributorConfig distributor.Config
Expand Down Expand Up @@ -99,6 +100,7 @@ func main() {
flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.")
flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.")
flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests")
flag.BoolVar(&cfg.watchDynamo, "watch-dynamo", false, "Periodically collect DynamoDB provisioned throughput.")
flag.Parse()

chunkStore, err := setupChunkStore(cfg)
Expand All @@ -108,12 +110,15 @@ func main() {
if cfg.dynamodbPollInterval < 1*time.Minute {
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
}
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
if err != nil {
log.Fatalf("Error initializing DynamoDB watcher: %v", err)

if cfg.watchDynamo {
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
if err != nil {
log.Fatalf("Error initializing DynamoDB watcher: %v", err)
}
defer resourceWatcher.Stop()
prometheus.MustRegister(resourceWatcher)
}
defer resourceWatcher.Stop()
prometheus.MustRegister(resourceWatcher)

consul, err := ring.NewConsulClient(cfg.consulHost)
if err != nil {
Expand Down