Skip to content

More ingester events #772

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions pkg/chunk/aws/storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,24 @@ func (a storageClient) NewWriteBatch() chunk.WriteBatch {
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
}

func logRetry(unprocessed dynamoDBWriteBatch) {
for table, reqs := range unprocessed {
for _, req := range reqs {
item := req.PutRequest.Item
var hash, rnge string
if hashAttr, ok := item[hashKey]; ok {
if hashAttr.S != nil {
hash = *hashAttr.S
}
}
if rangeAttr, ok := item[rangeKey]; ok {
rnge = string(rangeAttr.B)
}
util.Event().Log("msg", "store retry", "table", table, "hashKey", hash, "rangeKey", rnge)
}
}
}

// BatchWrite writes requests to the underlying storage, handling retries and backoff.
// Structure is identical to getDynamoDBChunks(), but operating on different datatypes
// so cannot share implementation. If you fix a bug here fix it there too.
Expand Down Expand Up @@ -228,6 +246,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
// If we get provisionedThroughputExceededException, then no items were processed,
// so back off and retry all.
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
logRetry(requests)
unprocessed.TakeReqs(requests, -1)
backoff.Wait()
continue
Expand All @@ -239,6 +258,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e

// If there are unprocessed items, backoff and retry those items.
if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dynamoDBWriteBatch(unprocessedItems).Len() > 0 {
logRetry(dynamoDBWriteBatch(unprocessedItems))
unprocessed.TakeReqs(unprocessedItems, -1)
// I am unclear why we don't count here; perhaps the idea is
// that while we are making _some_ progress we should carry on.
Expand All @@ -250,9 +270,6 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
}

if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
if valuesLeft < 4 { // protect against logging lots of data
level.Info(util.Logger).Log("msg", "DynamoDB BatchWrite values left", "count", valuesLeft, "outstanding", outstanding, "unprocessed", unprocessed)
}
return fmt.Errorf("failed to write chunk, %d values remaining: %s", valuesLeft, backoff.Err())
}
return backoff.Err()
Expand Down Expand Up @@ -800,30 +817,6 @@ func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) {
}
}

func (b dynamoDBWriteBatch) String() string {
buf := &bytes.Buffer{}
for table, reqs := range b {
for _, req := range reqs {
item := req.PutRequest.Item
hash := ""
if hashAttr, ok := item[hashKey]; ok {
if hashAttr.S != nil {
hash = *hashAttr.S
}
}
var rnge, value []byte
if rangeAttr, ok := item[rangeKey]; ok {
rnge = rangeAttr.B
}
if valueAttr, ok := item[valueKey]; ok {
value = valueAttr.B
}
fmt.Fprintf(buf, "%s: %s,%.32s,%.32s; ", table, hash, rnge, value)
}
}
return buf.String()
}

// map key is table name
type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes

Expand Down
18 changes: 10 additions & 8 deletions pkg/ingester/ingester_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo

if flush != noFlush {
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric)
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) {
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex)
}
}
}

Expand Down Expand Up @@ -131,7 +132,7 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
if err != nil {
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
}
Expand All @@ -145,7 +146,7 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.Fingerprint, immediate bool) error {
if i.preFlushUserSeries != nil {
i.preFlushUserSeries()
}
Expand Down Expand Up @@ -185,10 +186,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel() // releases resources if slowOperation completes before timeout elapses

util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric)
util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex)
err := i.flushChunks(ctx, fp, series.metric, chunks)
if err != nil {
util.Event().Log("msg", "flush error", "userID", userID, "err", err, "fp", fp, "series", series.metric)
return err
}

Expand Down Expand Up @@ -220,8 +220,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric

// Record statistsics only when actual put request did not return error.
for _, chunkDesc := range chunkDescs {
i.chunkUtilization.Observe(chunkDesc.C.Utilization())
i.chunkLength.Observe(float64(chunkDesc.C.Len()))
utilization, length := chunkDesc.C.Utilization(), chunkDesc.C.Len()
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
i.chunkUtilization.Observe(utilization)
i.chunkLength.Observe(float64(length))
i.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/ingester/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ import (
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
)

var discardedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_samples_total",
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
},
[]string{discardReasonLabel},
var (
discardedSamples = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cortex_ingester_out_of_order_samples_total",
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
},
[]string{discardReasonLabel},
)
createdChunks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_chunks_created_total",
Help: "The total number of chunks the ingester has created.",
})
)

func init() {
Expand Down Expand Up @@ -77,6 +83,7 @@ func (s *memorySeries) add(v model.SamplePair) error {
newHead := newDesc(chunk.New(), v.Timestamp, v.Timestamp)
s.chunkDescs = append(s.chunkDescs, newHead)
s.headChunkClosed = false
createdChunks.Inc()
}

chunks, err := s.head().add(v)
Expand All @@ -97,6 +104,7 @@ func (s *memorySeries) add(v model.SamplePair) error {
return err
}
s.chunkDescs = append(s.chunkDescs, newDesc(c, c.FirstTime(), lastTime))
createdChunks.Inc()
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (mod
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit (%d) exceeded for %s: %s", cfg.MaxSeriesPerMetric, metricName, metric)
}

util.Event().Log("msg", "new series", "userID", u.userID, "fp", fp, "series", metric)
series = newMemorySeries(metric)
u.fpToSeries.put(fp, series)
u.index.add(metric, fp)
Expand Down
9 changes: 5 additions & 4 deletions pkg/util/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func (pq *PriorityQueue) DrainAndClose() {
pq.cond.Broadcast()
}

// Enqueue adds an operation to the queue in priority order. If the operation
// is already on the queue, it will be ignored.
func (pq *PriorityQueue) Enqueue(op Op) {
// Enqueue adds an operation to the queue in priority order. Returns
// true if added; false if the operation was already on the queue.
func (pq *PriorityQueue) Enqueue(op Op) bool {
pq.lock.Lock()
defer pq.lock.Unlock()

Expand All @@ -88,12 +88,13 @@ func (pq *PriorityQueue) Enqueue(op Op) {

_, enqueued := pq.hit[op.Key()]
if enqueued {
return
return false
}

pq.hit[op.Key()] = struct{}{}
heap.Push(&pq.queue, op)
pq.cond.Broadcast()
return true
}

// Dequeue will return the op with the highest priority; block if queue is
Expand Down