diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 770f2a01e1..22839ca5de 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -250,6 +250,9 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e } if valuesLeft := outstanding.Len() + unprocessed.Len(); valuesLeft > 0 { + if valuesLeft < 4 { // protect against logging lots of data + level.Info(util.Logger).Log("msg", "DynamoDB BatchWrite values left", "count", valuesLeft, "outstanding", outstanding, "unprocessed", unprocessed) + } return fmt.Errorf("failed to write chunk, %d values remaining: %s", valuesLeft, backoff.Err()) } return backoff.Err() @@ -731,6 +734,7 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e }) } +// Slice of values returned; map key is attribute name type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue func (b dynamoDBReadResponse) Len() int { @@ -749,6 +753,7 @@ func (b dynamoDBReadResponse) Value(i int) []byte { return chunkValue.B } +// map key is table name; value is a slice of things to 'put' type dynamoDBWriteBatch map[string][]*dynamodb.WriteRequest func (b dynamoDBWriteBatch) Len() int { @@ -795,6 +800,31 @@ func (b dynamoDBWriteBatch) TakeReqs(from dynamoDBWriteBatch, max int) { } } +func (b dynamoDBWriteBatch) String() string { + buf := &bytes.Buffer{} + for table, reqs := range b { + for _, req := range reqs { + item := req.PutRequest.Item + hash := "" + if hashAttr, ok := item[hashKey]; ok { + if hashAttr.S != nil { + hash = *hashAttr.S + } + } + var rnge, value []byte + if rangeAttr, ok := item[rangeKey]; ok { + rnge = rangeAttr.B + } + if valueAttr, ok := item[valueKey]; ok { + value = valueAttr.B + } + fmt.Fprintf(buf, "%s: %s,%.32s,%.32s; ", table, hash, rnge, value) + } + } + return buf.String() +} + +// map key is table name type dynamoDBReadRequest map[string]*dynamodb.KeysAndAttributes func (b dynamoDBReadRequest) Len() int {