Skip to content

Tidy up and add more query metrics #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 14, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 70 additions & 35 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/url"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -45,12 +46,6 @@ var (
// important. So use 8 buckets from 64us to 8s.
Buckets: prometheus.ExponentialBuckets(0.000128, 4, 8),
}, []string{"operation", "status_code"})
dynamoRequestPages = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "dynamo_request_pages",
Help: "Number of pages by DynamoDB request",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 5),
})
dynamoConsumedCapacity = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dynamo_consumed_capacity_total",
Expand All @@ -62,26 +57,49 @@ var (
Help: "Number of entries written to dynamodb per chunk.",
Buckets: prometheus.ExponentialBuckets(1, 2, 5),
})
droppedMatches = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "dropped_matches_total",
Help: "The number of chunks fetched but later dropped for not matching.",
})
s3RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "s3_request_duration_seconds",
Help: "Time spent doing S3 requests.",
Buckets: []float64{.025, .05, .1, .25, .5, 1, 2},
}, []string{"operation", "status_code"})

queryChunks = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_chunks",
Help: "Number of chunks loaded per query.",
Buckets: prometheus.ExponentialBuckets(1, 4.0, 5),
})
queryDynamoLookups = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_dynamo_lookups",
Help: "Number of dynamo lookups per query.",
Buckets: prometheus.ExponentialBuckets(1, 4.0, 5),
})
queryRequestPages = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_dynamo_request_pages",
Help: "Number of pages per DynamoDB request",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 5),
})
queryDroppedMatches = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "query_dynamo_dropped_matches_total",
Help: "The number of chunks IDs fetched from Dynamo but later dropped for not matching (per DynamoDB request).",
Buckets: prometheus.ExponentialBuckets(1, 2.0, 5),
})
)

func init() {
prometheus.MustRegister(dynamoRequestDuration)
prometheus.MustRegister(dynamoConsumedCapacity)
prometheus.MustRegister(dynamoRequestPages)
prometheus.MustRegister(indexEntriesPerChunk)
prometheus.MustRegister(droppedMatches)
prometheus.MustRegister(s3RequestDuration)

prometheus.MustRegister(queryChunks)
prometheus.MustRegister(queryDynamoLookups)
prometheus.MustRegister(queryRequestPages)
prometheus.MustRegister(queryDroppedMatches)
}

// Store type stores and indexes chunks
Expand Down Expand Up @@ -408,6 +426,8 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers .
return nil, err
}

queryChunks.Observe(float64(len(missing)))

var fromCache []Chunk
if c.chunkCache != nil {
fromCache, missing, err = c.chunkCache.FetchChunkData(userID, missing)
Expand Down Expand Up @@ -458,9 +478,11 @@ func (c *AWSStore) lookupChunks(userID string, from, through model.Time, matcher
incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)
buckets := bigBuckets(from, through)
totalLookups := int32(0)
for _, hour := range buckets {
go func(hour int64) {
incoming, err := c.lookupChunksFor(userID, hour, metricName, matchers)
incoming, lookups, err := c.lookupChunksFor(userID, hour, metricName, matchers)
atomic.AddInt32(&totalLookups, lookups)
if err != nil {
incomingErrors <- err
} else {
Expand All @@ -479,6 +501,8 @@ func (c *AWSStore) lookupChunks(userID string, from, through model.Time, matcher
lastErr = err
}
}

queryDynamoLookups.Observe(float64(atomic.LoadInt32(&totalLookups)))
return chunks, lastErr
}

Expand All @@ -489,13 +513,14 @@ func next(s string) string {
return result
}

func (c *AWSStore) lookupChunksFor(userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, error) {
func (c *AWSStore) lookupChunksFor(userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
if len(matchers) == 0 {
return c.lookupChunksForMetricName(userID, hour, metricName)
}

incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)

for _, matcher := range matchers {
go func(matcher *metric.LabelMatcher) {
incoming, err := c.lookupChunksForMatcher(userID, hour, metricName, matcher)
Expand All @@ -516,12 +541,11 @@ func (c *AWSStore) lookupChunksFor(userID string, hour int64, metricName model.L
case err := <-incomingErrors:
lastErr = err
}

}
return nWayIntersect(chunkSets), lastErr
return nWayIntersect(chunkSets), int32(len(matchers)), lastErr
}

func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricName model.LabelValue) (ByID, error) {
func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) {
hashValue := hashValue(userID, hour, metricName)
input := &dynamodb.QueryInput{
TableName: aws.String(c.tableName),
Expand All @@ -538,25 +562,31 @@ func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricNa

chunkSet := ByID{}
var processingError error
var pages int
defer func() { dynamoRequestPages.Observe(float64(pages)) }()
var pages, totalDropped int
defer func() {
queryRequestPages.Observe(float64(pages))
queryDroppedMatches.Observe(float64(totalDropped))
}()
if err := instrument.TimeRequestHistogram("QueryPages", dynamoRequestDuration, func() error {
pages++
return c.dynamodb.QueryPages(input, func(resp *dynamodb.QueryOutput, lastPage bool) (shouldContinue bool) {
processingError = processResponse(resp, &chunkSet, nil)
var dropped int
dropped, processingError = processResponse(resp, &chunkSet, nil)
totalDropped += dropped
pages++
return processingError != nil && !lastPage
})
}); err != nil {
log.Errorf("Error querying DynamoDB: %v", err)
return nil, err
return nil, 1, err
} else if processingError != nil {
log.Errorf("Error processing DynamoDB response: %v", processingError)
return nil, processingError
return nil, 1, processingError
}

sort.Sort(ByID(chunkSet))
chunkSet = unique(chunkSet)
return chunkSet, nil
return chunkSet, 1, nil
}

func (c *AWSStore) lookupChunksForMatcher(userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
Expand Down Expand Up @@ -594,12 +624,17 @@ func (c *AWSStore) lookupChunksForMatcher(userID string, hour int64, metricName

chunkSet := ByID{}
var processingError error
var pages int
defer func() { dynamoRequestPages.Observe(float64(pages)) }()
var pages, totalDropped int
defer func() {
queryRequestPages.Observe(float64(pages))
queryDroppedMatches.Observe(float64(totalDropped))
}()
if err := instrument.TimeRequestHistogram("QueryPages", dynamoRequestDuration, func() error {
return c.dynamodb.QueryPages(input, func(resp *dynamodb.QueryOutput, lastPage bool) (shouldContinue bool) {
var dropped int
dropped, processingError = processResponse(resp, &chunkSet, matcher)
totalDropped += dropped
pages++
processingError = processResponse(resp, &chunkSet, matcher)
return processingError != nil && !lastPage
})
}); err != nil {
Expand All @@ -614,39 +649,39 @@ func (c *AWSStore) lookupChunksForMatcher(userID string, hour int64, metricName
return chunkSet, nil
}

func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric.LabelMatcher) error {
func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric.LabelMatcher) (int, error) {
if resp.ConsumedCapacity != nil {
dynamoConsumedCapacity.WithLabelValues("Query").
Add(float64(*resp.ConsumedCapacity.CapacityUnits))
}

dropped := 0
for _, item := range resp.Items {
rangeValue := item[rangeKey].B
if rangeValue == nil {
return fmt.Errorf("invalid item: %v", item)
return dropped, fmt.Errorf("invalid item: %v", item)
}
label, value, chunkID, err := parseRangeValue(rangeValue)
if err != nil {
return err
return dropped, err
}
chunkValue := item[chunkKey].B
if chunkValue == nil {
return fmt.Errorf("invalid item: %v", item)
return dropped, fmt.Errorf("invalid item: %v", item)
}
chunk := Chunk{
ID: chunkID,
}
if err := json.Unmarshal(chunkValue, &chunk); err != nil {
return err
return dropped, err
}
if matcher != nil && (label != matcher.Name || !matcher.Match(value)) {
log.Debugf("Dropping unexpected", chunk.Metric)
droppedMatches.Add(1)
dropped++
continue
}
*chunkSet = append(*chunkSet, chunk)
}
return nil
return dropped, nil
}

func (c *AWSStore) fetchChunkData(userID string, chunkSet []Chunk) ([]Chunk, error) {
Expand Down