@@ -12,6 +12,7 @@ import (
12
12
13
13
"github.com/go-kit/kit/log/level"
14
14
ot "github.com/opentracing/opentracing-go"
15
+ otlog "github.com/opentracing/opentracing-go/log"
15
16
"golang.org/x/time/rate"
16
17
17
18
"github.com/aws/aws-sdk-go/aws"
@@ -144,7 +145,6 @@ type dynamoDBStorageClient struct {
144
145
145
146
// These functions exists for mocking, so we don't have to write a whole load
146
147
// of boilerplate.
147
- queryRequestFn func (ctx context.Context , input * dynamodb.QueryInput ) dynamoDBRequest
148
148
batchGetItemRequestFn func (ctx context.Context , input * dynamodb.BatchGetItemInput ) dynamoDBRequest
149
149
batchWriteItemRequestFn func (ctx context.Context , input * dynamodb.BatchWriteItemInput ) dynamoDBRequest
150
150
}
@@ -172,7 +172,6 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig)
172
172
DynamoDB : dynamoDB ,
173
173
writeThrottle : rate .NewLimiter (rate .Limit (cfg .ThrottleLimit ), dynamoDBMaxWriteBatchSize ),
174
174
}
175
- client .queryRequestFn = client .queryRequest
176
175
client .batchGetItemRequestFn = client .batchGetItemRequest
177
176
client .batchWriteItemRequestFn = client .batchWriteItemRequest
178
177
return client , nil
@@ -327,71 +326,35 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery
327
326
}
328
327
}
329
328
330
- request := a .queryRequestFn (ctx , input )
331
329
pageCount := 0
332
330
defer func () {
333
331
dynamoQueryPagesCount .Observe (float64 (pageCount ))
334
332
}()
335
333
336
- for page := request ; page != nil ; page = page .NextPage () {
337
- pageCount ++
338
-
339
- response , err := a .queryPage (ctx , input , page , query .HashValue , pageCount )
340
- if err != nil {
341
- return err
334
+ retryer := newRetryer (ctx , a .cfg .backoffConfig )
335
+ err := instrument .CollectedRequest (ctx , "DynamoDB.QueryPages" , dynamoRequestDuration , instrument .ErrorCode , func (innerCtx context.Context ) error {
336
+ if sp := ot .SpanFromContext (innerCtx ); sp != nil {
337
+ sp .SetTag ("tableName" , query .TableName )
338
+ sp .SetTag ("hashValue" , query .HashValue )
342
339
}
343
-
344
- if ! callback (response ) {
345
- if err != nil {
346
- return fmt .Errorf ("QueryPages error: table=%v, err=%v" , * input .TableName , page .Error ())
347
- }
348
- return nil
349
- }
350
- if ! page .HasNextPage () {
351
- return nil
352
- }
353
- }
354
- return nil
355
- }
356
-
357
- func (a dynamoDBStorageClient ) queryPage (ctx context.Context , input * dynamodb.QueryInput , page dynamoDBRequest , hashValue string , pageCount int ) (* dynamoDBReadResponse , error ) {
358
- backoff := util .NewBackoff (ctx , a .cfg .backoffConfig )
359
-
360
- var err error
361
- for backoff .Ongoing () {
362
- err = instrument .CollectedRequest (ctx , "DynamoDB.QueryPages" , dynamoRequestDuration , instrument .ErrorCode , func (innerCtx context.Context ) error {
340
+ return a .DynamoDB .QueryPagesWithContext (ctx , input , func (output * dynamodb.QueryOutput , _ bool ) bool {
341
+ pageCount ++
363
342
if sp := ot .SpanFromContext (innerCtx ); sp != nil {
364
- sp .SetTag ("tableName" , aws .StringValue (input .TableName ))
365
- sp .SetTag ("hashValue" , hashValue )
366
- sp .SetTag ("page" , pageCount )
367
- sp .SetTag ("retry" , backoff .NumRetries ())
343
+ sp .LogFields (otlog .Int ("page" , pageCount ))
368
344
}
369
- return page .Send ()
370
- })
371
-
372
- if cc := page .Data ().(* dynamodb.QueryOutput ).ConsumedCapacity ; cc != nil {
373
- dynamoConsumedCapacity .WithLabelValues ("DynamoDB.QueryPages" , * cc .TableName ).
374
- Add (float64 (* cc .CapacityUnits ))
375
- }
376
345
377
- if err != nil {
378
- recordDynamoError (* input .TableName , err , "DynamoDB.QueryPages" )
379
- if awsErr , ok := err .(awserr.Error ); ok && ((awsErr .Code () == dynamodb .ErrCodeProvisionedThroughputExceededException ) || page .Retryable ()) {
380
- if awsErr .Code () != dynamodb .ErrCodeProvisionedThroughputExceededException {
381
- level .Warn (util .Logger ).Log ("msg" , "DynamoDB error" , "retry" , backoff .NumRetries (), "table" , * input .TableName , "err" , err )
382
- }
383
- backoff .Wait ()
384
- continue
346
+ if cc := output .ConsumedCapacity ; cc != nil {
347
+ dynamoConsumedCapacity .WithLabelValues ("DynamoDB.QueryPages" , * cc .TableName ).
348
+ Add (float64 (* cc .CapacityUnits ))
385
349
}
386
- return nil , fmt .Errorf ("QueryPage error: table=%v, err=%v" , * input .TableName , err )
387
- }
388
350
389
- queryOutput := page .Data ().(* dynamodb.QueryOutput )
390
- return & dynamoDBReadResponse {
391
- items : queryOutput .Items ,
392
- }, nil
351
+ return callback (& dynamoDBReadResponse {items : output .Items })
352
+ }, retryer .withRetrys , withErrorHandler (query .TableName , "DynamoDB.QueryPages" ))
353
+ })
354
+ if err != nil {
355
+ return fmt .Errorf ("QueryPage error: table=%v, err=%v" , query .TableName , err )
393
356
}
394
- return nil , fmt . Errorf ( "QueryPage error: %s for table %v, last error %v" , backoff . Err (), * input . TableName , err )
357
+ return err
395
358
}
396
359
397
360
type dynamoDBRequest interface {
@@ -403,12 +366,6 @@ type dynamoDBRequest interface {
403
366
Retryable () bool
404
367
}
405
368
406
- func (a dynamoDBStorageClient ) queryRequest (ctx context.Context , input * dynamodb.QueryInput ) dynamoDBRequest {
407
- req , _ := a .DynamoDB .QueryRequest (input )
408
- req .SetContext (ctx )
409
- return dynamoDBRequestAdapter {req }
410
- }
411
-
412
369
func (a dynamoDBStorageClient ) batchGetItemRequest (ctx context.Context , input * dynamodb.BatchGetItemInput ) dynamoDBRequest {
413
370
req , _ := a .DynamoDB .BatchGetItemRequest (input )
414
371
req .SetContext (ctx )
@@ -840,6 +797,16 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
840
797
}
841
798
}
842
799
800
+ func withErrorHandler (tableName , operation string ) func (req * request.Request ) {
801
+ return func (req * request.Request ) {
802
+ req .Handlers .CompleteAttempt .PushBack (func (req * request.Request ) {
803
+ if req .Error != nil {
804
+ recordDynamoError (tableName , req .Error , operation )
805
+ }
806
+ })
807
+ }
808
+ }
809
+
843
810
func recordDynamoError (tableName string , err error , operation string ) {
844
811
if awsErr , ok := err .(awserr.Error ); ok {
845
812
dynamoFailures .WithLabelValues (tableName , awsErr .Code (), operation ).Add (float64 (1 ))
0 commit comments