Skip to content

Commit 8bfb9ee

Browse files
authored
Merge pull request #981 from grafana/batch-index-lookups
Batch index lookups (take #2)
2 parents e1f71fb + f440561 commit 8bfb9ee

16 files changed

+681
-950
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ cmd/lite/lite
1313
pkg/ingester/client/cortex.pb.go
1414
pkg/querier/frontend/frontend.pb.go
1515
pkg/ring/ring.pb.go
16+
pkg/chunk/storage/caching_storage_client.pb.go
1617
images/

pkg/chunk/aws/storage_client.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/weaveworks/common/instrument"
3131
"github.com/weaveworks/common/user"
3232
"github.com/weaveworks/cortex/pkg/chunk"
33+
chunk_util "github.com/weaveworks/cortex/pkg/chunk/util"
3334
"github.com/weaveworks/cortex/pkg/util"
3435
)
3536

@@ -301,7 +302,11 @@ func (a storageClient) BatchWrite(ctx context.Context, input chunk.WriteBatch) e
301302
return backoff.Err()
302303
}
303304

304-
func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
305+
func (a storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
306+
return chunk_util.DoParallelQueries(ctx, a.query, queries, callback)
307+
}
308+
309+
func (a storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
305310
sp, ctx := ot.StartSpanFromContext(ctx, "QueryPages", ot.Tag{Key: "tableName", Value: query.TableName}, ot.Tag{Key: "hashValue", Value: query.HashValue})
306311
defer sp.Finish()
307312

@@ -371,7 +376,7 @@ func (a storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, c
371376
return nil
372377
}
373378

374-
func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (dynamoDBReadResponse, error) {
379+
func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput, page dynamoDBRequest) (*dynamoDBReadResponse, error) {
375380
backoff := util.NewBackoff(ctx, a.cfg.backoffConfig)
376381
defer func() {
377382
dynamoQueryRetryCount.WithLabelValues("queryPage").Observe(float64(backoff.NumRetries()))
@@ -401,7 +406,9 @@ func (a storageClient) queryPage(ctx context.Context, input *dynamodb.QueryInput
401406
}
402407

403408
queryOutput := page.Data().(*dynamodb.QueryOutput)
404-
return dynamoDBReadResponse(queryOutput.Items), nil
409+
return &dynamoDBReadResponse{
410+
items: queryOutput.Items,
411+
}, nil
405412
}
406413
return nil, fmt.Errorf("QueryPage error: %s for table %v, last error %v", backoff.Err(), *input.TableName, err)
407414
}
@@ -785,18 +792,33 @@ func (a storageClient) putS3Chunk(ctx context.Context, key string, buf []byte) e
785792
}
786793

787794
// Slice of values returned; map key is attribute name
788-
type dynamoDBReadResponse []map[string]*dynamodb.AttributeValue
795+
type dynamoDBReadResponse struct {
796+
items []map[string]*dynamodb.AttributeValue
797+
}
798+
799+
func (b *dynamoDBReadResponse) Iterator() chunk.ReadBatchIterator {
800+
return &dynamoDBReadResponseIterator{
801+
i: -1,
802+
dynamoDBReadResponse: b,
803+
}
804+
}
805+
806+
type dynamoDBReadResponseIterator struct {
807+
i int
808+
*dynamoDBReadResponse
809+
}
789810

790-
func (b dynamoDBReadResponse) Len() int {
791-
return len(b)
811+
func (b *dynamoDBReadResponseIterator) Next() bool {
812+
b.i++
813+
return b.i < len(b.items)
792814
}
793815

794-
func (b dynamoDBReadResponse) RangeValue(i int) []byte {
795-
return b[i][rangeKey].B
816+
func (b *dynamoDBReadResponseIterator) RangeValue() []byte {
817+
return b.items[b.i][rangeKey].B
796818
}
797819

798-
func (b dynamoDBReadResponse) Value(i int) []byte {
799-
chunkValue, ok := b[i][valueKey]
820+
func (b *dynamoDBReadResponseIterator) Value() []byte {
821+
chunkValue, ok := b.items[b.i][valueKey]
800822
if !ok {
801823
return nil
802824
}

pkg/chunk/cache/fifo_cache.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"sync"
66
"time"
77

8-
ot "github.com/opentracing/opentracing-go"
9-
otlog "github.com/opentracing/opentracing-go/log"
108
"github.com/prometheus/client_golang/prometheus"
119
"github.com/prometheus/client_golang/prometheus/promauto"
1210
)
@@ -132,9 +130,6 @@ func (c *FifoCache) Stop() error {
132130

133131
// Put stores the value against the key.
134132
func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {
135-
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put")
136-
defer span.Finish()
137-
138133
c.entriesAdded.Inc()
139134
if c.size == 0 {
140135
return
@@ -202,9 +197,6 @@ func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) {
202197

203198
// Get returns the stored value against the key and when the key was last updated.
204199
func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
205-
span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get")
206-
defer span.Finish()
207-
208200
c.totalGets.Inc()
209201
if c.size == 0 {
210202
return nil, false
@@ -217,17 +209,15 @@ func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) {
217209
if ok {
218210
updated := c.entries[index].updated
219211
if time.Now().Sub(updated) < c.validity {
220-
span.LogFields(otlog.Bool("hit", true))
212+
221213
return c.entries[index].value, true
222214
}
223215

224216
c.totalMisses.Inc()
225217
c.staleGets.Inc()
226-
span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true))
227218
return nil, false
228219
}
229220

230-
span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false))
231221
c.totalMisses.Inc()
232222
return nil, false
233223
}

pkg/chunk/cassandra/storage_client.go

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/prometheus/common/model"
1313

1414
"github.com/weaveworks/cortex/pkg/chunk"
15+
"github.com/weaveworks/cortex/pkg/chunk/util"
1516
)
1617

1718
const (
@@ -185,7 +186,11 @@ func (s *storageClient) BatchWrite(ctx context.Context, batch chunk.WriteBatch)
185186
return nil
186187
}
187188

188-
func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
189+
func (s *storageClient) QueryPages(ctx context.Context, queries []chunk.IndexQuery, callback func(chunk.IndexQuery, chunk.ReadBatch) bool) error {
190+
return util.DoParallelQueries(ctx, s.query, queries, callback)
191+
}
192+
193+
func (s *storageClient) query(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error {
189194
var q *gocql.Query
190195

191196
switch {
@@ -218,7 +223,7 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,
218223
defer iter.Close()
219224
scanner := iter.Scanner()
220225
for scanner.Next() {
221-
var b readBatch
226+
b := &readBatch{}
222227
if err := scanner.Scan(&b.rangeValue, &b.value); err != nil {
223228
return errors.WithStack(err)
224229
}
@@ -231,27 +236,35 @@ func (s *storageClient) QueryPages(ctx context.Context, query chunk.IndexQuery,
231236

232237
// readBatch represents a batch of rows read from Cassandra.
233238
type readBatch struct {
239+
consumed bool
234240
rangeValue []byte
235241
value []byte
236242
}
237243

238-
// Len implements chunk.ReadBatch; in Cassandra we 'stream' results back
239-
// one-by-one, so this always returns 1.
240-
func (readBatch) Len() int {
241-
return 1
244+
func (r *readBatch) Iterator() chunk.ReadBatchIterator {
245+
return &readBatchIter{
246+
readBatch: r,
247+
}
248+
}
249+
250+
type readBatchIter struct {
251+
consumed bool
252+
*readBatch
242253
}
243254

244-
func (b readBatch) RangeValue(index int) []byte {
245-
if index != 0 {
246-
panic("index != 0")
255+
func (b *readBatchIter) Next() bool {
256+
if b.consumed {
257+
return false
247258
}
259+
b.consumed = true
260+
return true
261+
}
262+
263+
func (b *readBatchIter) RangeValue() []byte {
248264
return b.rangeValue
249265
}
250266

251-
func (b readBatch) Value(index int) []byte {
252-
if index != 0 {
253-
panic("index != 0")
254-
}
267+
func (b *readBatchIter) Value() []byte {
255268
return b.value
256269
}
257270

pkg/chunk/chunk_store.go

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -347,53 +347,23 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, from, through mode
347347
}
348348

349349
func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
350-
incomingEntries := make(chan []IndexEntry)
351-
incomingErrors := make(chan error)
352-
for _, query := range queries {
353-
go func(query IndexQuery) {
354-
entries, err := c.lookupEntriesByQuery(ctx, query)
355-
if err != nil {
356-
incomingErrors <- err
357-
} else {
358-
incomingEntries <- entries
359-
}
360-
}(query)
361-
}
362-
363-
// Combine the results into one slice
364-
var entries []IndexEntry
365-
var lastErr error
366-
for i := 0; i < len(queries); i++ {
367-
select {
368-
case incoming := <-incomingEntries:
369-
entries = append(entries, incoming...)
370-
case err := <-incomingErrors:
371-
lastErr = err
372-
}
373-
}
374-
375-
return entries, lastErr
376-
}
377-
378-
func (c *store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]IndexEntry, error) {
379350
var entries []IndexEntry
380-
381-
if err := c.storage.QueryPages(ctx, query, func(resp ReadBatch) (shouldContinue bool) {
382-
for i := 0; i < resp.Len(); i++ {
351+
err := c.storage.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
352+
iter := resp.Iterator()
353+
for iter.Next() {
383354
entries = append(entries, IndexEntry{
384355
TableName: query.TableName,
385356
HashValue: query.HashValue,
386-
RangeValue: resp.RangeValue(i),
387-
Value: resp.Value(i),
357+
RangeValue: iter.RangeValue(),
358+
Value: iter.Value(),
388359
})
389360
}
390361
return true
391-
}); err != nil {
362+
})
363+
if err != nil {
392364
level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "error querying storage", "err", err)
393-
return nil, err
394365
}
395-
396-
return entries, nil
366+
return entries, err
397367
}
398368

399369
func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {

0 commit comments

Comments
 (0)