Skip to content

Commit 3402152

Browse files
authored
Don't convert chunks to matrixes and then merges them... (#713)
* Make chunk store return chunks; converting to series is now separate. - Refactor chunk store to return chunks. - Refactor querier package to use new Prometheus 2.0 interfaces. - Have separate querier for ingesters, chunk store and metadata. - Make remote read handler take a Queryable. Signed-off-by: Tom Wilkie <[email protected]> * Simple query benchmark, using in-memory chunks, but running an acutal PromQL query. Signed-off-by: Tom Wilkie <[email protected]> * Don't convert chunks to matrixes and then merges them; use iterators and the upstream heap-based merging code. Signed-off-by: Tom Wilkie <[email protected]> * Optimise the ChunksToMatrix function. Signed-off-by: Tom Wilkie <[email protected]> * Update prometheus to pull in 78efdc6d - Avoid infinite loop on duplicate NaN values Signed-off-by: Tom Wilkie <[email protected]> * Register query engine metrics. Signed-off-by: Tom Wilkie <[email protected]> * Review feedback: clarify log message. Signed-off-by: Tom Wilkie <[email protected]>
1 parent 1ea271d commit 3402152

38 files changed

+1125
-504
lines changed

Gopkg.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/lite/main.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/common/route"
1313
"github.com/prometheus/prometheus/config"
14-
"github.com/prometheus/prometheus/promql"
1514
"github.com/prometheus/prometheus/web/api/v1"
1615
"github.com/prometheus/tsdb"
1716
"google.golang.org/grpc"
@@ -124,8 +123,7 @@ func main() {
124123
tableManager.Start()
125124
defer tableManager.Stop()
126125

127-
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
128-
queryable := querier.NewQueryable(dist, chunkStore)
126+
queryable, engine := querier.Make(querierConfig, dist, chunkStore)
129127

130128
if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
131129
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
@@ -186,7 +184,7 @@ func main() {
186184

187185
subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
188186
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
189-
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
187+
subrouter.Path("/read").Handler(activeMiddleware.Wrap(querier.RemoteReadHandler(queryable)))
190188
subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
191189
subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler)))
192190

cmd/querier/main.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"github.com/prometheus/client_golang/prometheus"
1212
"github.com/prometheus/common/route"
1313
"github.com/prometheus/prometheus/config"
14-
"github.com/prometheus/prometheus/promql"
1514
"github.com/prometheus/prometheus/web/api/v1"
1615
"github.com/prometheus/tsdb"
1716

@@ -88,9 +87,7 @@ func main() {
8887
}
8988
defer chunkStore.Stop()
9089

91-
queryable := querier.NewQueryable(dist, chunkStore)
92-
93-
engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
90+
queryable, engine := querier.Make(querierConfig, dist, chunkStore)
9491
api := v1.NewAPI(
9592
engine,
9693
queryable,
@@ -107,7 +104,7 @@ func main() {
107104

108105
subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
109106
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
110-
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
107+
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable)))
111108
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
112109
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))
113110

cmd/ruler/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func main() {
3636
schemaConfig chunk.SchemaConfig
3737
storageConfig storage.Config
3838
configStoreConfig ruler.ConfigStoreConfig
39+
querierConfig querier.Config
3940
logLevel util.LogLevel
4041
)
4142

@@ -44,7 +45,8 @@ func main() {
4445
defer trace.Close()
4546

4647
util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
47-
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
48+
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig,
49+
&querierConfig, &logLevel)
4850
flag.Parse()
4951

5052
util.InitLogger(logLevel.AllowedLevel)
@@ -78,7 +80,7 @@ func main() {
7880
prometheus.MustRegister(dist)
7981

8082
engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout)
81-
queryable := querier.NewQueryable(dist, chunkStore)
83+
queryable := querier.NewQueryable(dist, chunkStore, querierConfig.Iterators)
8284

8385
rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
8486
if err != nil {

pkg/chunk/chunk.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -319,36 +319,31 @@ func equalByKey(a, b Chunk) bool {
319319
a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum
320320
}
321321

322-
func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
322+
// ChunksToMatrix converts a set of chunks to a model.Matrix.
323+
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
323324
sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix")
324325
defer sp.Finish()
325326
sp.LogFields(otlog.Int("chunks", len(chunks)))
326327

327328
// Group chunks by series, sort and dedupe samples.
328-
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
329+
metrics := map[model.Fingerprint]model.Metric{}
330+
samplesBySeries := map[model.Fingerprint][][]model.SamplePair{}
329331
for _, c := range chunks {
330-
ss, ok := sampleStreams[c.Fingerprint]
331-
if !ok {
332-
ss = &model.SampleStream{
333-
Metric: c.Metric,
334-
}
335-
sampleStreams[c.Fingerprint] = ss
336-
}
337-
338-
samples, err := c.Samples(from, through)
332+
ss, err := c.Samples(from, through)
339333
if err != nil {
340334
return nil, err
341335
}
342336

343-
ss.Values = util.MergeSampleSets(ss.Values, samples)
337+
metrics[c.Fingerprint] = c.Metric
338+
samplesBySeries[c.Fingerprint] = append(samplesBySeries[c.Fingerprint], ss)
344339
}
345-
sp.LogFields(otlog.Int("sample streams", len(sampleStreams)))
340+
sp.LogFields(otlog.Int("series", len(samplesBySeries)))
346341

347-
matrix := make(model.Matrix, 0, len(sampleStreams))
348-
for _, ss := range sampleStreams {
342+
matrix := make(model.Matrix, 0, len(samplesBySeries))
343+
for fp, ss := range samplesBySeries {
349344
matrix = append(matrix, &model.SampleStream{
350-
Metric: ss.Metric,
351-
Values: ss.Values,
345+
Metric: metrics[fp],
346+
Values: util.MergeNSampleSets(ss...),
352347
})
353348
}
354349

pkg/chunk/chunk_store.go

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func (s *spanLogger) Log(kvps ...interface{}) error {
195195
}
196196

197197
// Get implements ChunkStore
198-
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) {
198+
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) {
199199
log, ctx := newSpanLogger(ctx, "ChunkStore.Get")
200200
defer log.Span.Finish()
201201

@@ -227,19 +227,11 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
227227
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
228228
if ok && metricNameMatcher.Type == labels.MatchEqual {
229229
log.Span.SetTag("metric", metricNameMatcher.Value)
230-
return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value)
230+
return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value)
231231
}
232232

233233
// Otherwise we consult the metric name index first and then create queries for each matching metric name.
234-
return c.getSeriesMatrix(ctx, from, through, matchers, metricNameMatcher)
235-
}
236-
237-
func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) (model.Matrix, error) {
238-
chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName)
239-
if err != nil {
240-
return nil, err
241-
}
242-
return chunksToMatrix(ctx, chunks, from, through)
234+
return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher)
243235
}
244236

245237
func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) {
@@ -346,7 +338,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [
346338
return
347339
}
348340

349-
func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) {
341+
func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) {
350342
// Get all series from the index
351343
userID, err := user.ExtractOrgID(ctx)
352344
if err != nil {
@@ -407,7 +399,7 @@ outer:
407399
}
408400
}
409401
}
410-
return chunksToMatrix(ctx, chunks, from, through)
402+
return chunks, nil
411403
}
412404

413405
func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {

pkg/chunk/chunk_store_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ func TestChunkStore_Get(t *testing.T) {
218218
}
219219

220220
// Query with ordinary time-range
221-
matrix1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
221+
chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
222+
require.NoError(t, err)
223+
224+
matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now)
222225
require.NoError(t, err)
223226

224227
sort.Sort(ByFingerprint(matrix1))
@@ -229,7 +232,10 @@ func TestChunkStore_Get(t *testing.T) {
229232
}
230233

231234
// Pushing end of time-range into future should yield exact same resultset
232-
matrix2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
235+
chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
236+
require.NoError(t, err)
237+
238+
matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now)
233239
require.NoError(t, err)
234240

235241
sort.Sort(ByFingerprint(matrix2))

pkg/chunk/chunk_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func TestChunksToMatrix(t *testing.T) {
188188
},
189189
},
190190
} {
191-
matrix, err := chunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
191+
matrix, err := ChunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
192192
require.NoError(t, err)
193193

194194
sort.Sort(matrix)

pkg/ingester/ingester_test.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package ingester
33
import (
44
"context"
55
"fmt"
6+
"math"
67
"net/http"
78
"sort"
89
"sync"
@@ -18,7 +19,6 @@ import (
1819
"github.com/weaveworks/common/user"
1920
"github.com/weaveworks/cortex/pkg/chunk"
2021
"github.com/weaveworks/cortex/pkg/ingester/client"
21-
"github.com/weaveworks/cortex/pkg/util"
2222
)
2323

2424
type testStore struct {
@@ -86,36 +86,6 @@ func matrixToSamples(m model.Matrix) []model.Sample {
8686
return samples
8787
}
8888

89-
// chunksToMatrix converts a slice of chunks into a model.Matrix.
90-
func chunksToMatrix(chunks []chunk.Chunk) (model.Matrix, error) {
91-
// Group chunks by series, sort and dedupe samples.
92-
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
93-
for _, c := range chunks {
94-
fp := c.Metric.Fingerprint()
95-
ss, ok := sampleStreams[fp]
96-
if !ok {
97-
ss = &model.SampleStream{
98-
Metric: c.Metric,
99-
}
100-
sampleStreams[fp] = ss
101-
}
102-
103-
samples, err := c.Samples(c.From, c.Through)
104-
if err != nil {
105-
return nil, err
106-
}
107-
108-
ss.Values = util.MergeSampleSets(ss.Values, samples)
109-
}
110-
111-
matrix := make(model.Matrix, 0, len(sampleStreams))
112-
for _, ss := range sampleStreams {
113-
matrix = append(matrix, ss)
114-
}
115-
116-
return matrix, nil
117-
}
118-
11989
func TestIngesterAppend(t *testing.T) {
12090
store, ing := newTestStore(t, defaultIngesterTestConfig())
12191

@@ -154,7 +124,7 @@ func TestIngesterAppend(t *testing.T) {
154124
// Read samples back via chunk store.
155125
ing.Shutdown()
156126
for _, userID := range userIDs {
157-
res, err := chunksToMatrix(store.chunks[userID])
127+
res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64))
158128
require.NoError(t, err)
159129
sort.Sort(res)
160130
assert.Equal(t, testData[userID], res)

pkg/ingester/lifecycle_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ingester
22

33
import (
44
"io"
5+
"math"
56
"reflect"
67
"runtime"
78
"testing"
@@ -17,6 +18,7 @@ import (
1718
"github.com/prometheus/prometheus/pkg/labels"
1819

1920
"github.com/weaveworks/common/user"
21+
"github.com/weaveworks/cortex/pkg/chunk"
2022
"github.com/weaveworks/cortex/pkg/ingester/client"
2123
"github.com/weaveworks/cortex/pkg/ring"
2224
"github.com/weaveworks/cortex/pkg/util"
@@ -314,7 +316,7 @@ func TestIngesterFlush(t *testing.T) {
314316
})
315317

316318
// And check the store has the chunk
317-
res, err := chunksToMatrix(store.chunks[userID])
319+
res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64))
318320
require.NoError(t, err)
319321
assert.Equal(t, model.Matrix{
320322
&model.SampleStream{

pkg/querier/benchmark_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package querier
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/prometheus/prometheus/promql"
8+
)
9+
10+
var result *promql.Result
11+
12+
func BenchmarkChunkQueryable(b *testing.B) {
13+
for _, encoding := range encodings {
14+
store, from := makeMockChunkStore(b, 24*30, encoding.e)
15+
16+
for _, q := range queryables {
17+
b.Run(fmt.Sprintf("%s/%s", q.name, encoding.name), func(b *testing.B) {
18+
queryable := q.f(store)
19+
var r *promql.Result
20+
for n := 0; n < b.N; n++ {
21+
r = testQuery(b, queryable, from)
22+
}
23+
result = r
24+
})
25+
}
26+
}
27+
}

pkg/querier/chunk_iterator.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package querier
2+
3+
import (
4+
"github.com/prometheus/common/model"
5+
"github.com/weaveworks/cortex/pkg/chunk"
6+
promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
7+
)
8+
9+
type chunkIterator struct {
10+
chunk.Chunk
11+
it promchunk.Iterator
12+
13+
// At() is called often in the heap code, so caching its result seems like
14+
// a good idea.
15+
cacheValid bool
16+
cachedTime int64
17+
cachedValue float64
18+
}
19+
20+
// Seek advances the iterator forward to the value at or after
21+
// the given timestamp.
22+
func (i *chunkIterator) Seek(t int64) bool {
23+
i.cacheValid = false
24+
25+
// We assume seeks only care about a specific window; if this chunk doesn't
26+
// contain samples in that window, we can shortcut.
27+
if int64(i.Through) < t {
28+
return false
29+
}
30+
31+
return i.it.FindAtOrAfter(model.Time(t))
32+
}
33+
34+
func (i *chunkIterator) AtTime() int64 {
35+
if !i.cacheValid {
36+
v := i.it.Value()
37+
i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value)
38+
i.cacheValid = true
39+
}
40+
return i.cachedTime
41+
}
42+
43+
func (i *chunkIterator) At() (int64, float64) {
44+
if !i.cacheValid {
45+
v := i.it.Value()
46+
i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value)
47+
i.cacheValid = true
48+
}
49+
return i.cachedTime, i.cachedValue
50+
}
51+
52+
func (i *chunkIterator) Next() bool {
53+
i.cacheValid = false
54+
return i.it.Scan()
55+
}
56+
57+
func (i *chunkIterator) Err() error {
58+
return i.it.Err()
59+
}

0 commit comments

Comments
 (0)