Skip to content

Commit c008545

Browse files
committed
Monitor DynamoDB account limits
1 parent e76f5d7 commit c008545

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

chunk/dynamo_client.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package chunk
2+
3+
import (
4+
"net/url"
5+
"sync"
6+
"time"
7+
8+
"github.com/aws/aws-sdk-go/aws/session"
9+
"github.com/aws/aws-sdk-go/service/dynamodb"
10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/common/log"
12+
)
13+
14+
const (
15+
readLabel = "read"
16+
writeLabel = "write"
17+
)
18+
19+
type dynamoWatcher struct {
20+
accountMaxCapacity *prometheus.GaugeVec
21+
22+
dynamoDB *dynamodb.DynamoDB
23+
updateInterval time.Duration
24+
quit chan struct{}
25+
wait sync.WaitGroup
26+
}
27+
28+
// Watcher watches something and reports to Prometheus.
29+
type Watcher interface {
30+
Stop()
31+
prometheus.Collector
32+
}
33+
34+
// WatchDynamo watches Dynamo and reports on resource limits.
35+
func WatchDynamo(dynamoDBURL string, interval time.Duration) (Watcher, error) {
36+
url, err := url.Parse(dynamoDBURL)
37+
if err != nil {
38+
return nil, err
39+
}
40+
dynamoDBConfig, err := awsConfigFromURL(url)
41+
if err != nil {
42+
return nil, err
43+
}
44+
client := dynamodb.New(session.New(dynamoDBConfig))
45+
46+
// TODO: Report on table capacity.
47+
// tableName := strings.TrimPrefix(dynamoURL.Path, "/")
48+
w := &dynamoWatcher{
49+
accountMaxCapacity: prometheus.NewGaugeVec(prometheus.GaugeOpts{
50+
Namespace: "cortex",
51+
Name: "dynamo_account_max_capacity_units",
52+
Help: "Account-wide DynamoDB capacity, measured in DynamoDB capacity units.",
53+
}, []string{"op"}),
54+
dynamoDB: client,
55+
updateInterval: interval,
56+
quit: make(chan struct{}),
57+
}
58+
go w.updateLoop()
59+
return w, nil
60+
}
61+
62+
// Stop stops the dynamo watcher.
63+
func (w *dynamoWatcher) Stop() {
64+
close(w.quit)
65+
w.wait.Wait()
66+
}
67+
68+
func (w *dynamoWatcher) updateLoop() {
69+
defer w.wait.Done()
70+
ticker := time.NewTicker(w.updateInterval)
71+
for {
72+
select {
73+
case <-ticker.C:
74+
err := w.updateAccountLimits()
75+
if err != nil {
76+
// TODO: Back off if err is throttling related.
77+
log.Warnf("Could not fetch limits from dynamo: %v", err)
78+
}
79+
case <-w.quit:
80+
ticker.Stop()
81+
}
82+
}
83+
}
84+
85+
func (w *dynamoWatcher) updateAccountLimits() error {
86+
limits, err := w.dynamoDB.DescribeLimits(&dynamodb.DescribeLimitsInput{})
87+
if err != nil {
88+
return err
89+
}
90+
w.accountMaxCapacity.WithLabelValues(readLabel).Set(float64(*limits.AccountMaxReadCapacityUnits))
91+
w.accountMaxCapacity.WithLabelValues(writeLabel).Set(float64(*limits.AccountMaxWriteCapacityUnits))
92+
return nil
93+
}
94+
95+
// Describe implements prometheus.Collector.
96+
func (w *dynamoWatcher) Describe(ch chan<- *prometheus.Desc) {
97+
w.accountMaxCapacity.Describe(ch)
98+
}
99+
100+
// Collect implements prometheus.Collector.
101+
func (w *dynamoWatcher) Collect(ch chan<- prometheus.Metric) {
102+
w.accountMaxCapacity.Collect(ch)
103+
}

cmd/cortex/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ func main() {
115115
if err != nil {
116116
log.Fatalf("Error initializing chunk store: %v", err)
117117
}
118+
resourceWatcher, err := chunk.WatchDynamo(cfg.dynamodbURL, 2*time.Minute)
119+
if err != nil {
120+
log.Fatalf("Error initializing DynamoDB watcher: %v", err)
121+
}
122+
defer resourceWatcher.Stop()
123+
prometheus.MustRegister(resourceWatcher)
118124

119125
consul, err := ring.NewConsulClient(cfg.consulHost)
120126
if err != nil {

0 commit comments

Comments
 (0)