diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 22839ca5de..1c8138931d 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -188,6 +188,24 @@ func (a storageClient) NewWriteBatch() chunk.WriteBatch { return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{}) } +func logRetry(unprocessed dynamoDBWriteBatch) { + for table, reqs := range unprocessed { + for _, req := range reqs { + item := req.PutRequest.Item + var hash, rnge string + if hashAttr, ok := item[hashKey]; ok { + if hashAttr.S != nil { + hash = *hashAttr.S + } + } + if rangeAttr, ok := item[rangeKey]; ok { + rnge = string(rangeAttr.B) + } + util.Event().Log("msg", "store retry", "table", table, "hashKey", hash, "rangeKey", rnge) + } + } +} + // BatchWrite writes requests to the underlying storage, handling retries and backoff. // Structure is identical to getDynamoDBChunks(), but operating on different datatypes // so cannot share implementation. If you fix a bug here fix it there too. @@ -228,6 +246,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e // If we get provisionedThroughputExceededException, then no items were processed, // so back off and retry all. if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) { + logRetry(requests) unprocessed.TakeReqs(requests, -1) backoff.Wait() continue @@ -239,6 +258,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e // If there are unprocessed items, backoff and retry those items. if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dynamoDBWriteBatch(unprocessedItems).Len() > 0 { + logRetry(dynamoDBWriteBatch(unprocessedItems)) unprocessed.TakeReqs(unprocessedItems, -1) // I am unclear why we don't count here; perhaps the idea is // that while we are making _some_ progress we should carry on. @@ -250,9 +270,6 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e } if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { - if valuesLeft < 4 { // protect against logging lots of data - level.Info(util.Logger).Log("msg", "DynamoDB BatchWrite values left", "count", valuesLeft, "outstanding", outstanding, "unprocessed", unprocessed) - } return fmt.Errorf("failed to write chunk, %d values remaining: %s", valuesLeft, backoff.Err()) } return backoff.Err() @@ -800,30 +817,6 @@ func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) { } } -func (b dynamoDBWriteBatch) String() string { - buf := &bytes.Buffer{} - for table, reqs := range b { - for _, req := range reqs { - item := req.PutRequest.Item - hash := "" - if hashAttr, ok := item[hashKey]; ok { - if hashAttr.S != nil { - hash = *hashAttr.S - } - } - var rnge, value []byte - if rangeAttr, ok := item[rangeKey]; ok { - rnge = rangeAttr.B - } - if valueAttr, ok := item[valueKey]; ok { - value = valueAttr.B - } - fmt.Fprintf(buf, "%s: %s,%.32s,%.32s; ", table, hash, rnge, value) - } - } - return buf.String() -} - // map key is table name type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes diff --git a/pkg/ingester/ingester_flush.go b/pkg/ingester/ingester_flush.go index 640041a566..0f34d9b08d 100644 --- a/pkg/ingester/ingester_flush.go +++ b/pkg/ingester/ingester_flush.go @@ -82,8 +82,9 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo if flush != noFlush { flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) - util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric) - i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) + if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) { + util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex) + } } } @@ -131,7 +132,7 @@ func (i *Ingester) flushLoop(j int) { } op := o.(*flushOp) - err := i.flushUserSeries(op.userID, op.fp, op.immediate) + err := i.flushUserSeries(j, op.userID, op.fp, op.immediate) if err != nil { level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err) } @@ -145,7 +146,7 @@ func (i *Ingester) flushLoop(j int) { } } -func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error { +func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.Fingerprint, immediate bool) error { if i.preFlushUserSeries != nil { i.preFlushUserSeries() } @@ -185,10 +186,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout) defer cancel() // releases resources if slowOperation completes before timeout elapses - util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric) + util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex) err := i.flushChunks(ctx, fp, series.metric, chunks) if err != nil { - util.Event().Log("msg", "flush error", "userID", userID, "err", err, "fp", fp, "series", series.metric) return err } @@ -220,8 +220,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric // Record statistsics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { - i.chunkUtilization.Observe(chunkDesc.C.Utilization()) - i.chunkLength.Observe(float64(chunkDesc.C.Len())) + utilization, length := chunkDesc.C.Utilization(), chunkDesc.C.Len() + util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) + i.chunkUtilization.Observe(utilization) + i.chunkLength.Observe(float64(length)) i.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index eff775af7d..ce53083a83 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -12,12 +12,18 @@ import ( "github.com/weaveworks/cortex/pkg/prom1/storage/metric" ) -var discardedSamples = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "cortex_ingester_out_of_order_samples_total", - Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", - }, - []string{discardReasonLabel}, +var ( + discardedSamples = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "cortex_ingester_out_of_order_samples_total", + Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.", + }, + []string{discardReasonLabel}, + ) + createdChunks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_chunks_created_total", + Help: "The total number of chunks the ingester has created.", + }) ) func init() { @@ -77,6 +83,7 @@ func (s *memorySeries) add(v model.SamplePair) error { newHead := newDesc(chunk.New(), v.Timestamp, v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false + createdChunks.Inc() } chunks, err := s.head().add(v) @@ -97,6 +104,7 @@ func (s *memorySeries) add(v model.SamplePair) error { return err } s.chunkDescs = append(s.chunkDescs, newDesc(c, c.FirstTime(), lastTime)) + createdChunks.Inc() } } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 6e20315df8..ba9cbd2a59 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -214,6 +214,7 @@ func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (mod return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit (%d) exceeded for %s: %s", cfg.MaxSeriesPerMetric, metricName, metric) } + util.Event().Log("msg", "new series", "userID", u.userID, "fp", fp, "series", metric) series = newMemorySeries(metric) u.fpToSeries.put(fp, series) u.index.add(metric, fp) diff --git a/pkg/util/priority_queue.go b/pkg/util/priority_queue.go index ce70cf94af..2b3b6d6178 100644 --- a/pkg/util/priority_queue.go +++ b/pkg/util/priority_queue.go @@ -76,9 +76,9 @@ func (pq *PriorityQueue) DrainAndClose() { pq.cond.Broadcast() } -// Enqueue adds an operation to the queue in priority order. If the operation -// is already on the queue, it will be ignored. -func (pq *PriorityQueue) Enqueue(op Op) { +// Enqueue adds an operation to the queue in priority order. Returns +// true if added; false if the operation was already on the queue. +func (pq *PriorityQueue) Enqueue(op Op) bool { pq.lock.Lock() defer pq.lock.Unlock() @@ -88,12 +88,13 @@ func (pq *PriorityQueue) Enqueue(op Op) { _, enqueued := pq.hit[op.Key()] if enqueued { - return + return false } pq.hit[op.Key()] = struct{}{} heap.Push(&pq.queue, op) pq.cond.Broadcast() + return true } // Dequeue will return the op with the highest priority; block if queue is