Skip to content

Commit 2af690b

Browse files
authored
Merge pull request #89 from weaveworks/dynamo-monitoring
Metrics for Dynamo capacity
2 parents d412e26 + 0f0a393 commit 2af690b

File tree

2 files changed

+144
-1
lines changed

2 files changed

+144
-1
lines changed

chunk/dynamo_client.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package chunk
2+
3+
import (
4+
"net/url"
5+
"strings"
6+
"sync"
7+
"time"
8+
9+
"github.com/aws/aws-sdk-go/aws/session"
10+
"github.com/aws/aws-sdk-go/service/dynamodb"
11+
"github.com/prometheus/client_golang/prometheus"
12+
"github.com/prometheus/common/log"
13+
)
14+
15+
const (
16+
readLabel = "read"
17+
writeLabel = "write"
18+
)
19+
20+
type dynamoWatcher struct {
21+
accountMaxCapacity *prometheus.GaugeVec
22+
tableCapacity *prometheus.GaugeVec
23+
24+
dynamoDB *dynamodb.DynamoDB
25+
tableName string
26+
27+
updateInterval time.Duration
28+
quit chan struct{}
29+
wait sync.WaitGroup
30+
}
31+
32+
// Watcher watches something and reports to Prometheus.
33+
type Watcher interface {
34+
Stop()
35+
prometheus.Collector
36+
}
37+
38+
// WatchDynamo watches Dynamo and reports on resource limits.
39+
func WatchDynamo(dynamoDBURL string, interval time.Duration) (Watcher, error) {
40+
url, err := url.Parse(dynamoDBURL)
41+
if err != nil {
42+
return nil, err
43+
}
44+
dynamoDBConfig, err := awsConfigFromURL(url)
45+
if err != nil {
46+
return nil, err
47+
}
48+
client := dynamodb.New(session.New(dynamoDBConfig))
49+
50+
tableName := strings.TrimPrefix(url.Path, "/")
51+
w := &dynamoWatcher{
52+
accountMaxCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
53+
Namespace: "cortex",
54+
Name: "dynamo_account_max_capacity_units",
55+
Help: "Account-wide DynamoDB capacity, measured in DynamoDB capacity units.",
56+
}, []string{"op"}),
57+
tableCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
58+
Namespace: "cortex",
59+
Name: "dynamo_table_capacity_units",
60+
Help: "Per-table DynamoDB capacity, measured in DynamoDB capacity units.",
61+
}, []string{"op", "table"}),
62+
dynamoDB: client,
63+
tableName: tableName,
64+
updateInterval: interval,
65+
quit: make(chan struct{}),
66+
}
67+
go w.updateLoop()
68+
return w, nil
69+
}
70+
71+
// Stop stops the dynamo watcher.
72+
func (w *dynamoWatcher) Stop() {
73+
close(w.quit)
74+
w.wait.Wait()
75+
}
76+
77+
func (w *dynamoWatcher) updateLoop() {
78+
defer w.wait.Done()
79+
ticker := time.NewTicker(w.updateInterval)
80+
for {
81+
select {
82+
case <-ticker.C:
83+
log.Debugf("Updating limits from dynamo")
84+
err := w.updateAccountLimits()
85+
if err != nil {
86+
// TODO: Back off if err is throttling related.
87+
log.Warnf("Could not fetch account limits from dynamo: %v", err)
88+
}
89+
err = w.updateTableLimits()
90+
if err != nil {
91+
log.Warnf("Could not fetch table limits from dynamo: %v", err)
92+
}
93+
case <-w.quit:
94+
ticker.Stop()
95+
}
96+
}
97+
}
98+
99+
func (w *dynamoWatcher) updateAccountLimits() error {
100+
limits, err := w.dynamoDB.DescribeLimits(&dynamodb.DescribeLimitsInput{})
101+
if err != nil {
102+
return err
103+
}
104+
w.accountMaxCapacity.WithLabelValues(readLabel).Set(float64(*limits.AccountMaxReadCapacityUnits))
105+
w.accountMaxCapacity.WithLabelValues(writeLabel).Set(float64(*limits.AccountMaxWriteCapacityUnits))
106+
return nil
107+
}
108+
109+
func (w *dynamoWatcher) updateTableLimits() error {
110+
output, err := w.dynamoDB.DescribeTable(&dynamodb.DescribeTableInput{
111+
TableName: &w.tableName,
112+
})
113+
if err != nil {
114+
return err
115+
}
116+
throughput := output.Table.ProvisionedThroughput
117+
w.tableCapacity.WithLabelValues(readLabel, w.tableName).Set(float64(*throughput.ReadCapacityUnits))
118+
w.tableCapacity.WithLabelValues(writeLabel, w.tableName).Set(float64(*throughput.WriteCapacityUnits))
119+
return nil
120+
}
121+
122+
// Describe implements prometheus.Collector.
123+
func (w *dynamoWatcher) Describe(ch chan<- *prometheus.Desc) {
124+
w.accountMaxCapacity.Describe(ch)
125+
w.tableCapacity.Describe(ch)
126+
}
127+
128+
// Collect implements prometheus.Collector.
129+
func (w *dynamoWatcher) Collect(ch chan<- prometheus.Metric) {
130+
w.accountMaxCapacity.Collect(ch)
131+
w.tableCapacity.Collect(ch)
132+
}

cmd/cortex/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type cfg struct {
6262
s3URL string
6363
dynamodbURL string
6464
dynamodbCreateTables bool
65+
dynamodbPollInterval time.Duration
6566
memcachedHostname string
6667
memcachedTimeout time.Duration
6768
memcachedExpiration time.Duration
@@ -83,6 +84,7 @@ func main() {
8384
flag.StringVar(&cfg.s3URL, "s3.url", "localhost:4569", "S3 endpoint URL.")
8485
flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.")
8586
flag.BoolVar(&cfg.dynamodbCreateTables, "dynamodb.create-tables", false, "Create required DynamoDB tables on startup.")
87+
flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.")
8688
flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.")
8789
flag.DurationVar(&cfg.memcachedTimeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.")
8890
flag.DurationVar(&cfg.memcachedExpiration, "memcached.expiration", 0, "How long chunks stay in the memcache.")
@@ -102,6 +104,15 @@ func main() {
102104
if err != nil {
103105
log.Fatalf("Error initializing chunk store: %v", err)
104106
}
107+
if cfg.dynamodbPollInterval < 1*time.Minute {
108+
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
109+
}
110+
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, cfg.dynamodbPollInterval)
111+
if err != nil {
112+
log.Fatalf("Error initializing DynamoDB watcher: %v", err)
113+
}
114+
defer resourceWatcher.Stop()
115+
prometheus.MustRegister(resourceWatcher)
105116

106117
consul, err := ring.NewConsulClient(cfg.consulHost)
107118
if err != nil {
@@ -234,7 +245,7 @@ func setupQuerier(
234245

235246
engine := promql.NewEngine(queryable, nil)
236247

237-
api := v1.NewAPI(engine, querier.DummyStorage{queryable})
248+
api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable})
238249
router := route.New(func(r *http.Request) (context.Context, error) {
239250
userID := r.Header.Get(userIDHeaderName)
240251
if r.Method != "OPTIONS" && userID == "" {

0 commit comments

Comments
 (0)