Skip to content

Instrument Cortex with OpenTracing middleware #155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions chunk/chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/bradfitz/gomemcache/memcache"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/scope/common/instrument"
"golang.org/x/net/context"
)

var (
Expand Down Expand Up @@ -70,7 +71,7 @@ func memcacheKey(userID, chunkID string) string {
}

// FetchChunkData gets chunks from the chunk cache.
func (c *Cache) FetchChunkData(userID string, chunks []Chunk) (found []Chunk, missing []Chunk, err error) {
func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chunk) (found []Chunk, missing []Chunk, err error) {
memcacheRequests.Add(float64(len(chunks)))

keys := make([]string, 0, len(chunks))
Expand All @@ -79,7 +80,7 @@ func (c *Cache) FetchChunkData(userID string, chunks []Chunk) (found []Chunk, mi
}

var items map[string]*memcache.Item
err = instrument.TimeRequestHistogramStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error {
err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
var err error
items, err = c.Memcache.GetMulti(keys)
return err
Expand All @@ -106,7 +107,7 @@ func (c *Cache) FetchChunkData(userID string, chunks []Chunk) (found []Chunk, mi
}

// StoreChunkData serializes and stores a chunk in the chunk cache.
func (c *Cache) StoreChunkData(userID string, chunk *Chunk) error {
func (c *Cache) StoreChunkData(ctx context.Context, userID string, chunk *Chunk) error {
reader, err := chunk.reader()
if err != nil {
return err
Expand All @@ -117,7 +118,7 @@ func (c *Cache) StoreChunkData(userID string, chunk *Chunk) error {
return err
}

return instrument.TimeRequestHistogramStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error {
item := memcache.Item{
Key: memcacheKey(userID, chunk.ID),
Value: buf,
Expand All @@ -128,11 +129,11 @@ func (c *Cache) StoreChunkData(userID string, chunk *Chunk) error {
}

// StoreChunks serializes and stores multiple chunks in the chunk cache.
func (c *Cache) StoreChunks(userID string, chunks []Chunk) error {
func (c *Cache) StoreChunks(ctx context.Context, userID string, chunks []Chunk) error {
errs := make(chan error)
for _, chunk := range chunks {
go func(chunk *Chunk) {
errs <- c.StoreChunkData(userID, chunk)
errs <- c.StoreChunkData(ctx, userID, chunk)
}(&chunk)
}
var errOut error
Expand Down
62 changes: 33 additions & 29 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,20 +370,20 @@ func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error {
return err
}

err = c.putChunks(userID, chunks)
err = c.putChunks(ctx, userID, chunks)
if err != nil {
return err
}

return c.updateIndex(userID, chunks)
return c.updateIndex(ctx, userID, chunks)
}

// putChunks writes a collection of chunks to S3 in parallel.
func (c *AWSStore) putChunks(userID string, chunks []Chunk) error {
func (c *AWSStore) putChunks(ctx context.Context, userID string, chunks []Chunk) error {
incomingErrors := make(chan error)
for _, chunk := range chunks {
go func(chunk Chunk) {
incomingErrors <- c.putChunk(userID, &chunk)
incomingErrors <- c.putChunk(ctx, userID, &chunk)
}(chunk)
}

Expand All @@ -398,13 +398,13 @@ func (c *AWSStore) putChunks(userID string, chunks []Chunk) error {
}

// putChunk puts a chunk into S3.
func (c *AWSStore) putChunk(userID string, chunk *Chunk) error {
func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) error {
body, err := chunk.reader()
if err != nil {
return err
}

err = instrument.TimeRequestHistogram("Put", s3RequestDuration, func() error {
err = instrument.TimeRequestHistogram(ctx, "S3.PutObject", s3RequestDuration, func(_ context.Context) error {
var err error
_, err = c.s3.PutObject(&s3.PutObjectInput{
Body: body,
Expand All @@ -418,20 +418,20 @@ func (c *AWSStore) putChunk(userID string, chunk *Chunk) error {
}

if c.chunkCache != nil {
if err = c.chunkCache.StoreChunkData(userID, chunk); err != nil {
if err = c.chunkCache.StoreChunkData(ctx, userID, chunk); err != nil {
log.Warnf("Could not store %v in chunk cache: %v", chunk.ID, err)
}
}
return nil
}

func (c *AWSStore) updateIndex(userID string, chunks []Chunk) error {
func (c *AWSStore) updateIndex(ctx context.Context, userID string, chunks []Chunk) error {
writeReqs, err := c.calculateDynamoWrites(userID, chunks)
if err != nil {
return err
}

return c.batchWriteDynamo(c.tableName, writeReqs)
return c.batchWriteDynamo(ctx, c.tableName, writeReqs)
}

// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all
Expand Down Expand Up @@ -478,8 +478,7 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers .
return nil, err
}

// TODO push ctx all the way through, so we can do cancellation (eventually!)
chunks, err := c.lookupChunks(userID, from, through, matchers)
chunks, err := c.lookupChunks(ctx, userID, from, through, matchers)
if err != nil {
return nil, err
}
Expand All @@ -501,19 +500,19 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers .
var fromCache []Chunk
var missing = filtered
if c.chunkCache != nil {
fromCache, missing, err = c.chunkCache.FetchChunkData(userID, missing)
fromCache, missing, err = c.chunkCache.FetchChunkData(ctx, userID, missing)
if err != nil {
log.Warnf("Error fetching from cache: %v", err)
}
}

fromS3, err := c.fetchChunkData(userID, missing)
fromS3, err := c.fetchChunkData(ctx, userID, missing)
if err != nil {
return nil, err
}

if c.chunkCache != nil {
if err = c.chunkCache.StoreChunks(userID, fromS3); err != nil {
if err = c.chunkCache.StoreChunks(ctx, userID, fromS3); err != nil {
log.Warnf("Could not store chunks in chunk cache: %v", err)
}
}
Expand All @@ -540,7 +539,7 @@ func extractMetricName(matchers []*metric.LabelMatcher) (model.LabelValue, []*me
return "", nil, fmt.Errorf("no matcher for MetricNameLabel")
}

func (c *AWSStore) lookupChunks(userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
func (c *AWSStore) lookupChunks(ctx context.Context, userID string, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
metricName, matchers, err := extractMetricName(matchers)
if err != nil {
return nil, err
Expand All @@ -552,7 +551,7 @@ func (c *AWSStore) lookupChunks(userID string, from, through model.Time, matcher
totalLookups := int32(0)
for _, hour := range buckets {
go func(hour int64) {
incoming, lookups, err := c.lookupChunksFor(userID, hour, metricName, matchers)
incoming, lookups, err := c.lookupChunksFor(ctx, userID, hour, metricName, matchers)
atomic.AddInt32(&totalLookups, lookups)
if err != nil {
incomingErrors <- err
Expand Down Expand Up @@ -584,17 +583,17 @@ func next(s string) string {
return result
}

func (c *AWSStore) lookupChunksFor(userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
func (c *AWSStore) lookupChunksFor(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matchers []*metric.LabelMatcher) (ByID, int32, error) {
if len(matchers) == 0 {
return c.lookupChunksForMetricName(userID, hour, metricName)
return c.lookupChunksForMetricName(ctx, userID, hour, metricName)
}

incomingChunkSets := make(chan ByID)
incomingErrors := make(chan error)

for _, matcher := range matchers {
go func(matcher *metric.LabelMatcher) {
incoming, err := c.lookupChunksForMatcher(userID, hour, metricName, matcher)
incoming, err := c.lookupChunksForMatcher(ctx, userID, hour, metricName, matcher)
if err != nil {
incomingErrors <- err
} else {
Expand All @@ -616,7 +615,7 @@ func (c *AWSStore) lookupChunksFor(userID string, hour int64, metricName model.L
return nWayIntersect(chunkSets), int32(len(matchers)), lastErr
}

func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) {
func (c *AWSStore) lookupChunksForMetricName(ctx context.Context, userID string, hour int64, metricName model.LabelValue) (ByID, int32, error) {
hashValue := hashValue(userID, hour, metricName)
input := &dynamodb.QueryInput{
TableName: aws.String(c.tableName),
Expand All @@ -638,7 +637,8 @@ func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricNa
queryRequestPages.Observe(float64(pages))
queryDroppedMatches.Observe(float64(totalDropped))
}()
if err := c.queryPages(input, func(resp interface{}, lastPage bool) (shouldContinue bool) {

if err := c.queryPages(ctx, input, func(resp interface{}, lastPage bool) (shouldContinue bool) {
var dropped int
dropped, processingError = processResponse(resp.(*dynamodb.QueryOutput), &chunkSet, nil)
totalDropped += dropped
Expand All @@ -656,7 +656,7 @@ func (c *AWSStore) lookupChunksForMetricName(userID string, hour int64, metricNa
return chunkSet, 1, nil
}

func (c *AWSStore) lookupChunksForMatcher(userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, hour int64, metricName model.LabelValue, matcher *metric.LabelMatcher) (ByID, error) {
hashValue := hashValue(userID, hour, metricName)
var rangeMinValue, rangeMaxValue []byte
if matcher.Type == metric.Equal {
Expand Down Expand Up @@ -696,7 +696,7 @@ func (c *AWSStore) lookupChunksForMatcher(userID string, hour int64, metricName
queryRequestPages.Observe(float64(pages))
queryDroppedMatches.Observe(float64(totalDropped))
}()
if err := c.queryPages(input, func(resp interface{}, lastPage bool) (shouldContinue bool) {
if err := c.queryPages(ctx, input, func(resp interface{}, lastPage bool) (shouldContinue bool) {
var dropped int
dropped, processingError = processResponse(resp.(*dynamodb.QueryOutput), &chunkSet, matcher)
totalDropped += dropped
Expand Down Expand Up @@ -751,13 +751,13 @@ func processResponse(resp *dynamodb.QueryOutput, chunkSet *ByID, matcher *metric
return dropped, nil
}

func (c *AWSStore) fetchChunkData(userID string, chunkSet []Chunk) ([]Chunk, error) {
func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet []Chunk) ([]Chunk, error) {
incomingChunks := make(chan Chunk)
incomingErrors := make(chan error)
for _, chunk := range chunkSet {
go func(chunk Chunk) {
var resp *s3.GetObjectOutput
err := instrument.TimeRequestHistogram("Get", s3RequestDuration, func() error {
err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(_ context.Context) error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Expand Down Expand Up @@ -795,8 +795,9 @@ func (c *AWSStore) fetchChunkData(userID string, chunkSet []Chunk) ([]Chunk, err
}

// batchWriteDynamo writes many requests to dynamo in a single batch.
func (c *AWSStore) batchWriteDynamo(tableName string, reqs []*dynamodb.WriteRequest) error {
func (c *AWSStore) batchWriteDynamo(ctx context.Context, tableName string, reqs []*dynamodb.WriteRequest) error {
req := &dynamoBatchWriteItemsOp{
ctx: ctx,
tableName: tableName,
reqs: reqs,
dynamodb: c.dynamodb,
Expand All @@ -806,9 +807,10 @@ func (c *AWSStore) batchWriteDynamo(tableName string, reqs []*dynamodb.WriteRequ
return <-req.done
}

func (c *AWSStore) queryPages(input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
func (c *AWSStore) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
page, _ := c.dynamodb.QueryRequest(input)
req := &dynamoQueryPagesOp{
ctx: ctx,
request: page,
callback: callback,
done: make(chan error),
Expand All @@ -835,12 +837,14 @@ type dynamoOp interface {
}

type dynamoQueryPagesOp struct {
ctx context.Context
request dynamoRequest
callback func(resp interface{}, lastPage bool) (shouldContinue bool)
done chan error
}

type dynamoBatchWriteItemsOp struct {
ctx context.Context
tableName string
reqs []*dynamodb.WriteRequest
dynamodb dynamodbClient
Expand All @@ -851,7 +855,7 @@ func (r *dynamoQueryPagesOp) do() {
backoff := minBackoff

for page := r.request; page != nil; page = page.NextPage() {
err := instrument.TimeRequestHistogram("DynamoDB.QueryPages", dynamoRequestDuration, func() error {
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
return page.Send()
})

Expand Down Expand Up @@ -909,7 +913,7 @@ func (r *dynamoBatchWriteItemsOp) do() {
fillReq(&outstanding, &reqs)

var resp *dynamodb.BatchWriteItemOutput
err := instrument.TimeRequestHistogram("DynamoDB.BatchWriteItem", dynamoRequestDuration, func() error {
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error {
var err error
resp, err = r.dynamodb.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: map[string][]*dynamodb.WriteRequest{r.tableName: reqs},
Expand Down
11 changes: 9 additions & 2 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/scope/common/middleware"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -166,8 +169,9 @@ func main() {
}
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
cortex_grpc_middleware.ServerInstrumentInterceptor(requestDuration),
cortex_grpc_middleware.ServerLoggingInterceptor(cfg.logSuccess),
cortex_grpc_middleware.ServerInstrumentInterceptor(requestDuration),
otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer()),
cortex_grpc_middleware.ServerUserHeaderInterceptor,
)),
)
Expand Down Expand Up @@ -204,6 +208,9 @@ func main() {

router.Handle("/metrics", prometheus.Handler())
instrumented := middleware.Merge(
middleware.Func(func(handler http.Handler) http.Handler {
return nethttp.Middleware(opentracing.GlobalTracer(), handler)
}),
middleware.Log{
LogSuccess: cfg.logSuccess,
},
Expand Down Expand Up @@ -284,7 +291,7 @@ func setupQuerier(
if userID == "" {
return nil, fmt.Errorf("no %s header", user.UserIDHeaderName)
}
return user.WithID(context.Background(), userID), nil
return user.WithID(r.Context(), userID), nil
}).WithPrefix("/api/prom/api/v1")
api.Register(promRouter)
router.PathPrefix("/api/v1").Handler(promRouter)
Expand Down
12 changes: 9 additions & 3 deletions distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"sync/atomic"
"time"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/mwitkow/go-grpc-middleware"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/scope/common/instrument"
"golang.org/x/net/context"
"google.golang.org/grpc"
Expand Down Expand Up @@ -137,7 +140,10 @@ func (d *Distributor) getClientFor(ingester ring.IngesterDesc) (cortex.IngesterC
conn, err := grpc.Dial(
ingester.GRPCHostname,
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(middleware.ClientUserHeaderInterceptor),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
)),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -257,7 +263,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.IngesterDes
for i := range sampleTrackers {
samples[i] = sampleTrackers[i].sample
}
err = instrument.TimeRequestHistogram("send", d.sendDuration, func() error {
err = instrument.TimeRequestHistogram(ctx, "Distributor.sendSamples", d.sendDuration, func(ctx context.Context) error {
_, err := client.Push(ctx, util.ToWriteRequest(samples))
return err
})
Expand Down Expand Up @@ -286,7 +292,7 @@ func metricNameFromLabelMatchers(matchers ...*metric.LabelMatcher) (model.LabelV
// Query implements Querier.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*metric.LabelMatcher) (model.Matrix, error) {
var result model.Matrix
err := instrument.TimeRequestHistogram("duration", d.queryDuration, func() error {
err := instrument.TimeRequestHistogram(ctx, "Distributor.Query", d.queryDuration, func(ctx context.Context) error {
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}

metricName, err := metricNameFromLabelMatchers(matchers...)
Expand Down
2 changes: 1 addition & 1 deletion util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func ParseProtoRequest(w http.ResponseWriter, r *http.Request, req proto.Message
return nil, true
}

ctx = user.WithID(context.Background(), userID)
ctx = user.WithID(r.Context(), userID)
if req == nil {
return ctx, false
}
Expand Down
Loading