Skip to content

Commit 624b1f2

Browse files
authored
Merge pull request #119 from weaveworks/remove-put-parallelisation
Remove chunk store parallelisation; I want this controlled by the ingester
2 parents 357588f + 5b0761b commit 624b1f2

File tree

3 files changed

+25
-24
lines changed

3 files changed

+25
-24
lines changed

chunk/chunk_store.go

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ const (
3333

3434
// See http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Limits.html.
3535
dynamoMaxBatchSize = 25
36-
maxConcurrentPuts = 100
3736
)
3837

3938
var (
@@ -129,7 +128,6 @@ func NewAWSStore(cfg StoreConfig) (*AWSStore, error) {
129128
chunkCache: cfg.ChunkCache,
130129
tableName: tableName,
131130
bucketName: bucketName,
132-
putLimiter: NewSemaphore(maxConcurrentPuts),
133131
}, nil
134132
}
135133

@@ -155,7 +153,6 @@ type AWSStore struct {
155153
chunkCache *Cache
156154
tableName string
157155
bucketName string
158-
putLimiter Semaphore
159156
}
160157

161158
type dynamodbClient interface {
@@ -268,10 +265,8 @@ func (c *AWSStore) Put(ctx context.Context, chunks []Chunk) error {
268265
func (c *AWSStore) putChunks(userID string, chunks []Chunk) error {
269266
incomingErrors := make(chan error)
270267
for _, chunk := range chunks {
271-
c.putLimiter.Acquire()
272268
go func(chunk Chunk) {
273269
incomingErrors <- c.putChunk(userID, &chunk)
274-
c.putLimiter.Release()
275270
}(chunk)
276271
}
277272

@@ -314,25 +309,12 @@ func (c *AWSStore) updateIndex(userID string, chunks []Chunk) error {
314309
return err
315310
}
316311

317-
batches := c.batchRequests(writeReqs)
318-
319-
// Request all the batches in parallel.
320-
incomingErrors := make(chan error)
321-
for _, batch := range batches {
322-
c.putLimiter.Acquire()
323-
go func(batch []*dynamodb.WriteRequest) {
324-
incomingErrors <- c.batchWriteDynamo(batch)
325-
c.putLimiter.Release()
326-
}(batch)
327-
}
328-
var lastErr error
329-
for range batches {
330-
err = <-incomingErrors
331-
if err != nil {
332-
lastErr = err
312+
for _, batch := range c.batchRequests(writeReqs) {
313+
if err := c.batchWriteDynamo(batch); err != nil {
314+
return err
333315
}
334316
}
335-
return lastErr
317+
return nil
336318
}
337319

338320
// calculateDynamoWrites creates a set of batched WriteRequests to dynamo for all

chunk/chunk_store_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ func TestChunkStore(t *testing.T) {
4949
chunkCache: nil,
5050
tableName: "tablename",
5151
bucketName: "bucketname",
52-
putLimiter: NoopSemaphore,
5352
}
5453
store.CreateTables()
5554

ingester/ingester.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"sync"
77
"time"
88

9+
"github.com/aws/aws-sdk-go/aws/awserr"
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/common/log"
1112
"github.com/prometheus/common/model"
@@ -25,6 +26,12 @@ const (
2526
// Reasons to discard samples.
2627
outOfOrderTimestamp = "timestamp_out_of_order"
2728
duplicateSample = "multiple_values_for_timestamp"
29+
30+
// Backoff for flush
31+
minBackoff = 100 * time.Millisecond
32+
maxBackoff = 1 * time.Second
33+
34+
provisionedThroughputExceededException = "ProvisionedThroughputExceededException"
2835
)
2936

3037
var (
@@ -525,6 +532,8 @@ func (i *Ingester) flushSeries(u *userState, fp model.Fingerprint, series *memor
525532
}
526533

527534
func (i *Ingester) flushLoop(j int) {
535+
backoff := minBackoff
536+
528537
defer func() {
529538
log.Info("Ingester.flushLoop() exited")
530539
i.done.Done()
@@ -570,12 +579,23 @@ func (i *Ingester) flushLoop(j int) {
570579
}
571580

572581
// flush the chunks without locking the series
573-
if err := i.flushChunks(ctx, op.fp, series.metric, chunks); err != nil {
582+
err := i.flushChunks(ctx, op.fp, series.metric, chunks)
583+
if err != nil {
584+
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException {
585+
time.Sleep(backoff)
586+
backoff = backoff * 2
587+
if backoff > maxBackoff {
588+
backoff = maxBackoff
589+
}
590+
}
591+
574592
log.Errorf("Failed to flush chunks: %v", err)
575593
i.chunkStoreFailures.Add(float64(len(chunks)))
576594
continue
577595
}
578596

597+
backoff = minBackoff
598+
579599
// now remove the chunks
580600
userState.fpLocker.Lock(op.fp)
581601
series.chunkDescs = series.chunkDescs[len(chunks):]

0 commit comments

Comments
 (0)