Skip to content

Commit 9afacef

Browse files
authored
Merge pull request #203 from weaveworks/remove-dynamo-client-goroutine-pool
Remove dynamo client goroutine pool
2 parents d40ead0 + ba56027 commit 9afacef

File tree

5 files changed

+48
-128
lines changed

5 files changed

+48
-128
lines changed

chunk/chunk_store.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ func init() {
8484
type Store interface {
8585
Put(ctx context.Context, chunks []Chunk) error
8686
Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]Chunk, error)
87-
Stop()
8887
}
8988

9089
// StoreConfig specifies config for a ChunkStore
@@ -127,11 +126,6 @@ func NewAWSStore(cfg StoreConfig) *AWSStore {
127126
}
128127
}
129128

130-
// Stop background goroutines.
131-
func (c *AWSStore) Stop() {
132-
c.dynamo.Stop()
133-
}
134-
135129
type bucketSpec struct {
136130
tableName string
137131
bucket string

chunk/chunk_store_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ func TestChunkStoreUnprocessed(t *testing.T) {
4040
DynamoDB: dynamoDB,
4141
S3: NewMockS3(),
4242
})
43-
defer store.Stop()
4443

4544
ctx := user.WithID(context.Background(), "0")
4645
now := model.Now()
@@ -76,7 +75,6 @@ func TestChunkStore(t *testing.T) {
7675
DynamoDB: dynamoDB,
7776
S3: NewMockS3(),
7877
})
79-
defer store.Stop()
8078

8179
ctx := user.WithID(context.Background(), "0")
8280
now := model.Now()

chunk/dynamodb_client.go

Lines changed: 43 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"math/rand"
55
"net/url"
66
"strings"
7-
"sync"
87
"time"
98

109
"github.com/aws/aws-sdk-go/aws"
@@ -149,124 +148,16 @@ func (d dynamoRequestAdapter) Error() error {
149148

150149
type dynamoDBBackoffClient struct {
151150
client DynamoDBClient
152-
153-
dynamoRequests chan dynamoOp
154-
dynamoRequestsDone sync.WaitGroup
155151
}
156152

157153
func newDynamoDBBackoffClient(client DynamoDBClient) *dynamoDBBackoffClient {
158-
c := &dynamoDBBackoffClient{
159-
client: client,
160-
dynamoRequests: make(chan dynamoOp),
161-
}
162-
163-
c.dynamoRequestsDone.Add(numDynamoRequests)
164-
for i := 0; i < numDynamoRequests; i++ {
165-
go c.dynamoRequestLoop()
154+
return &dynamoDBBackoffClient{
155+
client: client,
166156
}
167-
168-
return c
169-
}
170-
171-
// Stop background goroutines.
172-
func (c *dynamoDBBackoffClient) Stop() {
173-
close(c.dynamoRequests)
174-
c.dynamoRequestsDone.Wait()
175157
}
176158

177159
// batchWriteDynamo writes many requests to dynamo in a single batch.
178160
func (c *dynamoDBBackoffClient) batchWriteDynamo(ctx context.Context, reqs map[string][]*dynamodb.WriteRequest) error {
179-
req := &dynamoBatchWriteItemsOp{
180-
ctx: ctx,
181-
reqs: reqs,
182-
dynamodb: c.client,
183-
done: make(chan error),
184-
}
185-
c.dynamoRequests <- req
186-
return <-req.done
187-
}
188-
189-
func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
190-
page, _ := c.client.QueryRequest(input)
191-
req := &dynamoQueryPagesOp{
192-
ctx: ctx,
193-
request: page,
194-
callback: callback,
195-
done: make(chan error),
196-
}
197-
c.dynamoRequests <- req
198-
return <-req.done
199-
}
200-
201-
func (c *dynamoDBBackoffClient) dynamoRequestLoop() {
202-
defer c.dynamoRequestsDone.Done()
203-
for {
204-
select {
205-
case request, ok := <-c.dynamoRequests:
206-
if !ok {
207-
return
208-
}
209-
request.do()
210-
}
211-
}
212-
}
213-
214-
type dynamoOp interface {
215-
do()
216-
}
217-
218-
type dynamoQueryPagesOp struct {
219-
ctx context.Context
220-
request dynamoRequest
221-
callback func(resp interface{}, lastPage bool) (shouldContinue bool)
222-
done chan error
223-
}
224-
225-
type dynamoBatchWriteItemsOp struct {
226-
ctx context.Context
227-
reqs map[string][]*dynamodb.WriteRequest
228-
dynamodb DynamoDBClient
229-
done chan error
230-
}
231-
232-
func (r *dynamoQueryPagesOp) do() {
233-
backoff := minBackoff
234-
235-
for page := r.request; page != nil; page = page.NextPage() {
236-
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
237-
return page.Send()
238-
})
239-
240-
if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil {
241-
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages").
242-
Add(float64(*cc.CapacityUnits))
243-
}
244-
245-
if err != nil {
246-
recordDynamoError(err)
247-
248-
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException {
249-
time.Sleep(backoff)
250-
backoff = nextBackoff(backoff)
251-
continue
252-
}
253-
254-
r.done <- page.Error()
255-
return
256-
}
257-
258-
if getNextPage := r.callback(page.Data(), !page.HasNextPage()); !getNextPage {
259-
r.done <- page.Error()
260-
return
261-
}
262-
263-
backoff = minBackoff
264-
}
265-
266-
r.done <- nil
267-
}
268-
269-
func (r *dynamoBatchWriteItemsOp) do() {
270161
min := func(i, j int) int {
271162
if i < j {
272163
return i
@@ -306,17 +197,17 @@ func (r *dynamoBatchWriteItemsOp) do() {
306197
}
307198
}
308199

309-
outstanding, unprocessed := r.reqs, map[string][]*dynamodb.WriteRequest{}
200+
outstanding, unprocessed := reqs, map[string][]*dynamodb.WriteRequest{}
310201
backoff := minBackoff
311202
for dictLen(outstanding)+dictLen(unprocessed) > 0 {
312203
reqs := map[string][]*dynamodb.WriteRequest{}
313204
fillReq(unprocessed, reqs)
314205
fillReq(outstanding, reqs)
315206

316207
var resp *dynamodb.BatchWriteItemOutput
317-
err := instrument.TimeRequestHistogram(r.ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error {
208+
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.BatchWriteItem", dynamoRequestDuration, func(_ context.Context) error {
318209
var err error
319-
resp, err = r.dynamodb.BatchWriteItem(&dynamodb.BatchWriteItemInput{
210+
resp, err = c.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
320211
RequestItems: reqs,
321212
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityTotal),
322213
})
@@ -350,14 +241,49 @@ func (r *dynamoBatchWriteItemsOp) do() {
350241

351242
// All other errors are fatal.
352243
if err != nil {
353-
r.done <- err
354-
return
244+
return err
245+
}
246+
247+
backoff = minBackoff
248+
}
249+
250+
return nil
251+
}
252+
253+
func (c *dynamoDBBackoffClient) queryPages(ctx context.Context, input *dynamodb.QueryInput, callback func(resp interface{}, lastPage bool) (shouldContinue bool)) error {
254+
request, _ := c.client.QueryRequest(input)
255+
backoff := minBackoff
256+
257+
for page := request; page != nil; page = page.NextPage() {
258+
err := instrument.TimeRequestHistogram(ctx, "DynamoDB.QueryPages", dynamoRequestDuration, func(_ context.Context) error {
259+
return page.Send()
260+
})
261+
262+
if cc := page.Data().(*dynamodb.QueryOutput).ConsumedCapacity; cc != nil {
263+
dynamoConsumedCapacity.WithLabelValues("DynamoDB.QueryPages").
264+
Add(float64(*cc.CapacityUnits))
265+
}
266+
267+
if err != nil {
268+
recordDynamoError(err)
269+
270+
if awsErr, ok := err.(awserr.Error); ok && awsErr.Code() == provisionedThroughputExceededException {
271+
time.Sleep(backoff)
272+
backoff = nextBackoff(backoff)
273+
continue
274+
}
275+
276+
return page.Error()
277+
}
278+
279+
if getNextPage := callback(page.Data(), !page.HasNextPage()); !getNextPage {
280+
return page.Error()
355281
}
356282

357283
backoff = minBackoff
358284
}
359285

360-
r.done <- nil
286+
return nil
361287
}
362288

363289
func nextBackoff(lastBackoff time.Duration) time.Duration {

cmd/cortex/main.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func main() {
115115
flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.")
116116
flag.DurationVar(&cfg.ingesterConfig.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.")
117117
flag.DurationVar(&cfg.ingesterConfig.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.")
118-
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", 25, "Number of concurrent goroutines flushing to dynamodb.")
118+
flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", ingester.DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.")
119119
flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.")
120120
flag.IntVar(&cfg.ingesterConfig.GRPCListenPort, "ingester.grpc.listen-port", 9095, "gRPC server listen port.")
121121

@@ -137,7 +137,6 @@ func main() {
137137
if cfg.dynamodbPollInterval < 1*time.Minute {
138138
log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval)
139139
}
140-
defer chunkStore.Stop()
141140

142141
consul, err := ring.NewConsulClient(cfg.consulHost)
143142
if err != nil {

ingester/ingester.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ const (
2626
// Reasons to discard samples.
2727
outOfOrderTimestamp = "timestamp_out_of_order"
2828
duplicateSample = "multiple_values_for_timestamp"
29+
30+
// DefaultConcurrentFlush is the number of series to flush concurrently
31+
DefaultConcurrentFlush = 50
2932
)
3033

3134
var (
@@ -129,7 +132,7 @@ func New(cfg Config, chunkStore cortex_chunk.Store) (*Ingester, error) {
129132
cfg.RateUpdatePeriod = 15 * time.Second
130133
}
131134
if cfg.ConcurrentFlushes <= 0 {
132-
cfg.ConcurrentFlushes = 25
135+
cfg.ConcurrentFlushes = DefaultConcurrentFlush
133136
}
134137

135138
i := &Ingester{

0 commit comments

Comments
 (0)