Skip to content

Commit f25533c

Browse files
authored
More ingester events (#772)
* Stop generating flush error events They don't happen often enough to be meaningful. * Generate event on each chunk flushed This will make it possible to drill into the reasons for very short chunks. * Generate event on DynamoDB unprocessed items This should make it possible to analyze which keys generate the most retries. Replaces the code which would log a few keys after all retries were exhausted. * Only report an event the first time a series is added to the flush queue * Add a counter for chunks created, so we can get the rate * Add Event for series creation
1 parent f7f6cf9 commit f25533c

File tree

5 files changed

+50
-45
lines changed

5 files changed

+50
-45
lines changed

pkg/chunk/aws/storage_client.go

Lines changed: 20 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,24 @@ func (a storageClient) NewWriteBatch() chunk.WriteBatch {
188188
return dynamoDBWriteBatch(map[string][]*dynamodb.WriteRequest{})
189189
}
190190

191+
func logRetry(unprocessed dynamoDBWriteBatch) {
192+
for table, reqs := range unprocessed {
193+
for _, req := range reqs {
194+
item := req.PutRequest.Item
195+
var hash, rnge string
196+
if hashAttr, ok := item[hashKey]; ok {
197+
if hashAttr.S != nil {
198+
hash = *hashAttr.S
199+
}
200+
}
201+
if rangeAttr, ok := item[rangeKey]; ok {
202+
rnge = string(rangeAttr.B)
203+
}
204+
util.Event().Log("msg", "store retry", "table", table, "hashKey", hash, "rangeKey", rnge)
205+
}
206+
}
207+
}
208+
191209
// BatchWrite writes requests to the underlying storage, handling retries and backoff.
192210
// Structure is identical to getDynamoDBChunks(), but operating on different datatypes
193211
// so cannot share implementation. If you fix a bug here fix it there too.
@@ -228,6 +246,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
228246
// If we get provisionedThroughputExceededException, then no items were processed,
229247
// so back off and retry all.
230248
if awsErr, ok := err.(awserr.Error); ok && ((awsErr.Code() == dynamodb.ErrCodeProvisionedThroughputExceededException) || request.Retryable()) {
249+
logRetry(requests)
231250
unprocessed.TakeReqs(requests, -1)
232251
backoff.Wait()
233252
continue
@@ -239,6 +258,7 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
239258

240259
// If there are unprocessed items, backoff and retry those items.
241260
if unprocessedItems := resp.UnprocessedItems; unprocessedItems != nil && dynamoDBWriteBatch(unprocessedItems).Len() > 0 {
261+
logRetry(dynamoDBWriteBatch(unprocessedItems))
242262
unprocessed.TakeReqs(unprocessedItems, -1)
243263
// I am unclear why we don't count here; perhaps the idea is
244264
// that while we are making _some_ progress we should carry on.
@@ -250,9 +270,6 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
250270
}
251271

252272
if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 {
253-
if valuesLeft < 4 { // protect against logging lots of data
254-
level.Info(util.Logger).Log("msg", "DynamoDB BatchWrite values left", "count", valuesLeft, "outstanding", outstanding, "unprocessed", unprocessed)
255-
}
256273
return fmt.Errorf("failed to write chunk, %d values remaining: %s", valuesLeft, backoff.Err())
257274
}
258275
return backoff.Err()
@@ -800,30 +817,6 @@ func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) {
800817
}
801818
}
802819

803-
func (b dynamoDBWriteBatch) String() string {
804-
buf := &bytes.Buffer{}
805-
for table, reqs := range b {
806-
for _, req := range reqs {
807-
item := req.PutRequest.Item
808-
hash := ""
809-
if hashAttr, ok := item[hashKey]; ok {
810-
if hashAttr.S != nil {
811-
hash = *hashAttr.S
812-
}
813-
}
814-
var rnge, value []byte
815-
if rangeAttr, ok := item[rangeKey]; ok {
816-
rnge = rangeAttr.B
817-
}
818-
if valueAttr, ok := item[valueKey]; ok {
819-
value = valueAttr.B
820-
}
821-
fmt.Fprintf(buf, "%s: %s,%.32s,%.32s; ", table, hash, rnge, value)
822-
}
823-
}
824-
return buf.String()
825-
}
826-
827820
// map key is table name
828821
type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes
829822

pkg/ingester/ingester_flush.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo
8282

8383
if flush != noFlush {
8484
flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes))
85-
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric)
86-
i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate})
85+
if i.flushQueues[flushQueueIndex].Enqueue(&flushOp{firstTime, userID, fp, immediate}) {
86+
util.Event().Log("msg", "add to flush queue", "userID", userID, "reason", flush, "firstTime", firstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex)
87+
}
8788
}
8889
}
8990

@@ -131,7 +132,7 @@ func (i *Ingester) flushLoop(j int) {
131132
}
132133
op := o.(*flushOp)
133134

134-
err := i.flushUserSeries(op.userID, op.fp, op.immediate)
135+
err := i.flushUserSeries(j, op.userID, op.fp, op.immediate)
135136
if err != nil {
136137
level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
137138
}
@@ -145,7 +146,7 @@ func (i *Ingester) flushLoop(j int) {
145146
}
146147
}
147148

148-
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
149+
func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model.Fingerprint, immediate bool) error {
149150
if i.preFlushUserSeries != nil {
150151
i.preFlushUserSeries()
151152
}
@@ -185,10 +186,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
185186
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
186187
defer cancel() // releases resources if slowOperation completes before timeout elapses
187188

188-
util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric)
189+
util.Event().Log("msg", "flush chunks", "userID", userID, "reason", reason, "numChunks", len(chunks), "firstTime", chunks[0].FirstTime, "fp", fp, "series", series.metric, "queue", flushQueueIndex)
189190
err := i.flushChunks(ctx, fp, series.metric, chunks)
190191
if err != nil {
191-
util.Event().Log("msg", "flush error", "userID", userID, "err", err, "fp", fp, "series", series.metric)
192192
return err
193193
}
194194

@@ -220,8 +220,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric
220220

221221
// Record statistsics only when actual put request did not return error.
222222
for _, chunkDesc := range chunkDescs {
223-
i.chunkUtilization.Observe(chunkDesc.C.Utilization())
224-
i.chunkLength.Observe(float64(chunkDesc.C.Len()))
223+
utilization, length := chunkDesc.C.Utilization(), chunkDesc.C.Len()
224+
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
225+
i.chunkUtilization.Observe(utilization)
226+
i.chunkLength.Observe(float64(length))
225227
i.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
226228
}
227229

pkg/ingester/series.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ import (
1212
"github.com/weaveworks/cortex/pkg/prom1/storage/metric"
1313
)
1414

15-
var discardedSamples = prometheus.NewCounterVec(
16-
prometheus.CounterOpts{
17-
Name: "cortex_ingester_out_of_order_samples_total",
18-
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
19-
},
20-
[]string{discardReasonLabel},
15+
var (
16+
discardedSamples = prometheus.NewCounterVec(
17+
prometheus.CounterOpts{
18+
Name: "cortex_ingester_out_of_order_samples_total",
19+
Help: "The total number of samples that were discarded because their timestamps were at or before the last received sample for a series.",
20+
},
21+
[]string{discardReasonLabel},
22+
)
23+
createdChunks = prometheus.NewCounter(prometheus.CounterOpts{
24+
Name: "cortex_ingester_chunks_created_total",
25+
Help: "The total number of chunks the ingester has created.",
26+
})
2127
)
2228

2329
func init() {
@@ -77,6 +83,7 @@ func (s *memorySeries) add(v model.SamplePair) error {
7783
newHead := newDesc(chunk.New(), v.Timestamp, v.Timestamp)
7884
s.chunkDescs = append(s.chunkDescs, newHead)
7985
s.headChunkClosed = false
86+
createdChunks.Inc()
8087
}
8188

8289
chunks, err := s.head().add(v)
@@ -97,6 +104,7 @@ func (s *memorySeries) add(v model.SamplePair) error {
97104
return err
98105
}
99106
s.chunkDescs = append(s.chunkDescs, newDesc(c, c.FirstTime(), lastTime))
107+
createdChunks.Inc()
100108
}
101109
}
102110

pkg/ingester/user_state.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func (u *userState) unlockedGet(metric model.Metric, cfg *UserStatesConfig) (mod
214214
return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-metric series limit (%d) exceeded for %s: %s", cfg.MaxSeriesPerMetric, metricName, metric)
215215
}
216216

217+
util.Event().Log("msg", "new series", "userID", u.userID, "fp", fp, "series", metric)
217218
series = newMemorySeries(metric)
218219
u.fpToSeries.put(fp, series)
219220
u.index.add(metric, fp)

pkg/util/priority_queue.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ func (pq *PriorityQueue) DrainAndClose() {
7676
pq.cond.Broadcast()
7777
}
7878

79-
// Enqueue adds an operation to the queue in priority order. If the operation
80-
// is already on the queue, it will be ignored.
81-
func (pq *PriorityQueue) Enqueue(op Op) {
79+
// Enqueue adds an operation to the queue in priority order. Returns
80+
// true if added; false if the operation was already on the queue.
81+
func (pq *PriorityQueue) Enqueue(op Op) bool {
8282
pq.lock.Lock()
8383
defer pq.lock.Unlock()
8484

@@ -88,12 +88,13 @@ func (pq *PriorityQueue) Enqueue(op Op) {
8888

8989
_, enqueued := pq.hit[op.Key()]
9090
if enqueued {
91-
return
91+
return false
9292
}
9393

9494
pq.hit[op.Key()] = struct{}{}
9595
heap.Push(&pq.queue, op)
9696
pq.cond.Broadcast()
97+
return true
9798
}
9899

99100
// Dequeue will return the op with the highest priority; block if queue is

0 commit comments

Comments
 (0)