Skip to content

Remove dynamo client goroutine pool #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func init() {
type Store interface {
Put(ctx context.Context, chunks []Chunk) error
Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]Chunk, error)
Stop()
}

// StoreConfig specifies config for a ChunkStore
Expand Down Expand Up @@ -127,11 +126,6 @@ func NewAWSStore(cfg StoreConfig) *AWSStore {
}
}

// Stop background goroutines.
func (c *AWSStore) Stop() {
c.dynamo.Stop()
}

type bucketSpec struct {
tableName string
bucket string
Expand Down
2 changes: 0 additions & 2 deletions chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestChunkStoreUnprocessed(t *testing.T) {
DynamoDB: dynamoDB,
S3: NewMockS3(),
})
defer store.Stop()

ctx := user.WithID(context.Background(), "0")
now := model.Now()
Expand Down Expand Up @@ -76,7 +75,6 @@ func TestChunkStore(t *testing.T) {
DynamoDB: dynamoDB,
S3: NewMockS3(),
})
defer store.Stop()

ctx := user.WithID(context.Background(), "0")
now := model.Now()
Expand Down
160 changes: 43 additions & 117 deletions chunk/dynamodb_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"math/rand"
"net/url"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -149,124 +148,16 @@ func (d dynamoRequestAdapter) Error() error {

type dynamoDBBackoffClient struct {
client DynamoDBClient

dynamoRequests chan dynamoOp
dynamoRequestsDone sync.WaitGroup
}

func newDynamoDBBackoffClient(client DynamoDBClient) *dynamoDBBackoffClient {
c := &dynamoDBBackoffClient{
client: client,
dynamoRequests: make(chan dynamoOp),
}

c.dynamoRequestsDone.Add(numDynamoRequests)
for i := 0; i < numDynamoRequests; i++ {
go c.dynamoRequestLoop()
return &dynamoDBBackoffClient{
client: client,
}

return c
}

// Stop background goroutines.
func (c *dynamoDBBackoffClient) Stop() {
close(c.dynamoRequests)
c.dynamoRequestsDone.Wait()
}

// batchWriteDynamo writes many requests to dynamo in a single batch.
func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[string][]*dynamodb.WriteRequest) error {
req := &dynamoBatchWriteItemsOp{
ctx: ctx,
reqs: reqs,
dynamodb: c.client,
done: make(chan error),
}
c.dynamoRequests <- req
return <-req.done
}

func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
page, _ := c.client.QueryRequest(input)
req := &dynamoQueryPagesOp{
ctx: ctx,
request: page,
callback: callback,
done: make(chan error),
}
c.dynamoRequests <- req
return <-req.done
}

func (c *dynamoDBBackoffClient) dynamoRequestLoop() {
defer c.dynamoRequestsDone.Done()
for {
select {
case request, ok := <-c.dynamoRequests:
if !ok {
return
}
request.do()
}
}
}

type dynamoOp interface {
do()
}

type dynamoQueryPagesOp struct {
ctx context.Context
request dynamoRequest
callback func(resp interface{}, lastPage bool) (shouldContinue bool)
done chan error
}

type dynamoBatchWriteItemsOp struct {
ctx context.Context
reqs map[string][]*dynamodb.WriteRequest
dynamodb DynamoDBClient
done chan error
}

func (r *dynamoQueryPagesOp) do() {
backoff := minBackoff

for page := r.request; page != nil; page = page.NextPage() {
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
return page.Send()
})

if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages").
Add(float64(*cc.CapacityUnits))
}

if err != nil {
recordDynamoError(err)

if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException {
time.Sleep(backoff)
backoff = nextBackoff(backoff)
continue
}

r.done <- page.Error()
return
}

if getNextPage := r.callback(page.Data(), !page.HasNextPage()); !getNextPage {
r.done <- page.Error()
return
}

backoff = minBackoff
}

r.done <- nil
}

func (r *dynamoBatchWriteItemsOp) do() {
min := func(i, j int) int {
if i < j {
return i
Expand Down Expand Up @@ -306,17 +197,17 @@ func (r *dynamoBatchWriteItemsOp) do() {
}
}

outstanding, unprocessed := r.reqs, map[string][]*dynamodb.WriteRequest{}
outstanding, unprocessed := reqs, map[string][]*dynamodb.WriteRequest{}
backoff := minBackoff
for dictLen(outstanding)+dictLen(unprocessed) > 0 {
reqs := map[string][]*dynamodb.WriteRequest{}
fillReq(unprocessed, reqs)
fillReq(outstanding, reqs)

var resp *dynamodb.BatchWriteItemOutput
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error {
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error {
var err error
resp, err = r.dynamodb.BatchWriteItem(&dynamodb.BatchWriteItemInput{
resp, err = c.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
RequestItems: reqs,
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
})
Expand Down Expand Up @@ -350,14 +241,49 @@ func (r *dynamoBatchWriteItemsOp) do() {

// All other errors are fatal.
if err != nil {
r.done <- err
return
return err
}

backoff = minBackoff
}

return nil
}

func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
request, _ := c.client.QueryRequest(input)
backoff := minBackoff

for page := request; page != nil; page = page.NextPage() {
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
return page.Send()
})

if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil {
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages").
Add(float64(*cc.CapacityUnits))
}

if err != nil {
recordDynamoError(err)

if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException {
time.Sleep(backoff)
backoff = nextBackoff(backoff)
continue
}

return page.Error()
}

if getNextPage := callback(page.Data(), !page.HasNextPage()); !getNextPage {
return page.Error()
}

backoff = minBackoff
}

r.done <- nil
return nil
}

func nextBackoff(lastBackoff time.Duration) time.Duration {
Expand Down
3 changes: 1 addition & 2 deletions cmd/cortex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func main() {
flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
flag.DurationVar(&cfg.ingesterConfig.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.")
flag.DurationVar(&cfg.ingesterConfig.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.")
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", 25, "Number of concurrent goroutines flushing to dynamodb.")
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", ingester.DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.")
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
flag.IntVar(&cfg.ingesterConfig.GRPCListenPort, "ingester.grpc.listen-port", 9095, "gRPC server listen port.")

Expand All @@ -137,7 +137,6 @@ func main() {
if cfg.dynamodbPollInterval < 1*time.Minute {
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
}
defer chunkStore.Stop()

consul, err := ring.NewConsulClient(cfg.consulHost)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// Reasons to discard samples.
outOfOrderTimestamp = "timestamp_out_of_order"
duplicateSample = "multiple_values_for_timestamp"

// DefaultConcurrentFlush is the number of series to flush concurrently
DefaultConcurrentFlush = 50
)

var (
Expand Down Expand Up @@ -129,7 +132,7 @@ func New(cfg Config, chunkStore cortex_chunk.Store) (*Ingester, error) {
cfg.RateUpdatePeriod = 15 * time.Second
}
if cfg.ConcurrentFlushes <= 0 {
cfg.ConcurrentFlushes = 25
cfg.ConcurrentFlushes = DefaultConcurrentFlush
}

i := &Ingester{
Expand Down