Skip to content

Don't convert chunks to matrixes and then merges them... #713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions cmd/lite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"
"google.golang.org/grpc"
Expand Down Expand Up @@ -124,8 +123,7 @@ func main() {
tableManager.Start()
defer tableManager.Stop()

engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
queryable := querier.NewQueryable(dist, chunkStore)
queryable, engine := querier.Make(querierConfig, dist, chunkStore)

if configStoreConfig.ConfigsAPIURL.String() != "" || configStoreConfig.DBConfig.URI != "" {
rulesAPI, err := ruler.NewRulesAPI(configStoreConfig)
Expand Down Expand Up @@ -186,7 +184,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(activeMiddleware.Wrap(promRouter))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(activeMiddleware.Wrap(querier.RemoteReadHandler(queryable)))
subrouter.Path("/validate_expr").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(activeMiddleware.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
7 changes: 2 additions & 5 deletions cmd/querier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/web/api/v1"
"github.com/prometheus/tsdb"

Expand Down Expand Up @@ -88,9 +87,7 @@ func main() {
}
defer chunkStore.Stop()

queryable := querier.NewQueryable(dist, chunkStore)

engine := promql.NewEngine(util.Logger, nil, querierConfig.MaxConcurrent, querierConfig.Timeout)
queryable, engine := querier.Make(querierConfig, dist, chunkStore)
api := v1.NewAPI(
engine,
queryable,
Expand All @@ -107,7 +104,7 @@ func main() {

subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter()
subrouter.PathPrefix("/api/v1").Handler(middleware.AuthenticateUser.Wrap(promRouter))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(queryable.RemoteReadHandler)))
subrouter.Path("/read").Handler(middleware.AuthenticateUser.Wrap(querier.RemoteReadHandler(queryable)))
subrouter.Path("/validate_expr").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.ValidateExprHandler)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(dist.UserStatsHandler)))

Expand Down
6 changes: 4 additions & 2 deletions cmd/ruler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
schemaConfig chunk.SchemaConfig
storageConfig storage.Config
configStoreConfig ruler.ConfigStoreConfig
querierConfig querier.Config
logLevel util.LogLevel
)

Expand All @@ -44,7 +45,8 @@ func main() {
defer trace.Close()

util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig,
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &logLevel)
&rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig,
&querierConfig, &logLevel)
flag.Parse()

util.InitLogger(logLevel.AllowedLevel)
Expand Down Expand Up @@ -78,7 +80,7 @@ func main() {
prometheus.MustRegister(dist)

engine := promql.NewEngine(util.Logger, prometheus.DefaultRegisterer, rulerConfig.NumWorkers, rulerConfig.GroupTimeout)
queryable := querier.NewQueryable(dist, chunkStore)
queryable := querier.NewQueryable(dist, chunkStore, querierConfig.Iterators)

rlr, err := ruler.NewRuler(rulerConfig, engine, queryable, dist)
if err != nil {
Expand Down
29 changes: 12 additions & 17 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,36 +319,31 @@ func equalByKey(a, b Chunk) bool {
a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum
}

func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
// ChunksToMatrix converts a set of chunks to a model.Matrix.
func ChunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) {
sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix")
defer sp.Finish()
sp.LogFields(otlog.Int("chunks", len(chunks)))

// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
metrics := map[model.Fingerprint]model.Metric{}
samplesBySeries := map[model.Fingerprint][][]model.SamplePair{}
for _, c := range chunks {
ss, ok := sampleStreams[c.Fingerprint]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[c.Fingerprint] = ss
}

samples, err := c.Samples(from, through)
ss, err := c.Samples(from, through)
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
metrics[c.Fingerprint] = c.Metric
samplesBySeries[c.Fingerprint] = append(samplesBySeries[c.Fingerprint], ss)
}
sp.LogFields(otlog.Int("sample streams", len(sampleStreams)))
sp.LogFields(otlog.Int("series", len(samplesBySeries)))

matrix := make(model.Matrix, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix := make(model.Matrix, 0, len(samplesBySeries))
for fp, ss := range samplesBySeries {
matrix = append(matrix, &model.SampleStream{
Metric: ss.Metric,
Values: ss.Values,
Metric: metrics[fp],
Values: util.MergeNSampleSets(ss...),
})
}

Expand Down
18 changes: 5 additions & 13 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (s *spanLogger) Log(kvps ...interface{}) error {
}

// Get implements ChunkStore
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) {
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) {
log, ctx := newSpanLogger(ctx, "ChunkStore.Get")
defer log.Span.Finish()

Expand Down Expand Up @@ -227,19 +227,11 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(allMatchers)
if ok && metricNameMatcher.Type == labels.MatchEqual {
log.Span.SetTag("metric", metricNameMatcher.Value)
return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value)
return c.getMetricNameChunks(ctx, from, through, matchers, metricNameMatcher.Value)
}

// Otherwise we consult the metric name index first and then create queries for each matching metric name.
return c.getSeriesMatrix(ctx, from, through, matchers, metricNameMatcher)
}

func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) (model.Matrix, error) {
chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName)
if err != nil {
return nil, err
}
return chunksToMatrix(ctx, chunks, from, through)
return c.getSeriesChunks(ctx, from, through, matchers, metricNameMatcher)
}

func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) {
Expand Down Expand Up @@ -345,7 +337,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [
return
}

func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) {
func (c *Store) getSeriesChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) ([]Chunk, error) {
// Get all series from the index
userID, err := user.ExtractOrgID(ctx)
if err != nil {
Expand Down Expand Up @@ -406,7 +398,7 @@ outer:
}
}
}
return chunksToMatrix(ctx, chunks, from, through)
return chunks, nil
}

func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {
Expand Down
10 changes: 8 additions & 2 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,10 @@ func TestChunkStore_Get(t *testing.T) {
}

// Query with ordinary time-range
matrix1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
chunks1, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
require.NoError(t, err)

matrix1, err := ChunksToMatrix(ctx, chunks1, now.Add(-time.Hour), now)
require.NoError(t, err)

sort.Sort(ByFingerprint(matrix1))
Expand All @@ -229,7 +232,10 @@ func TestChunkStore_Get(t *testing.T) {
}

// Pushing end of time-range into future should yield exact same resultset
matrix2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
chunks2, err := store.Get(ctx, now.Add(-time.Hour), now.Add(time.Hour*24*30), matchers...)
require.NoError(t, err)

matrix2, err := ChunksToMatrix(ctx, chunks2, now.Add(-time.Hour), now)
require.NoError(t, err)

sort.Sort(ByFingerprint(matrix2))
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func TestChunksToMatrix(t *testing.T) {
},
},
} {
matrix, err := chunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
matrix, err := ChunksToMatrix(context.Background(), c.chunks, chunk1.From, chunk3.Through)
require.NoError(t, err)

sort.Sort(matrix)
Expand Down
34 changes: 2 additions & 32 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"fmt"
"math"
"net/http"
"sort"
"sync"
Expand All @@ -18,7 +19,6 @@ import (
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/util"
)

type testStore struct {
Expand Down Expand Up @@ -86,36 +86,6 @@ func matrixToSamples(m model.Matrix) []model.Sample {
return samples
}

// chunksToMatrix converts a slice of chunks into a model.Matrix.
func chunksToMatrix(chunks []chunk.Chunk) (model.Matrix, error) {
// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
for _, c := range chunks {
fp := c.Metric.Fingerprint()
ss, ok := sampleStreams[fp]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[fp] = ss
}

samples, err := c.Samples(c.From, c.Through)
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
}

matrix := make(model.Matrix, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix = append(matrix, ss)
}

return matrix, nil
}

func TestIngesterAppend(t *testing.T) {
store, ing := newTestStore(t, defaultIngesterTestConfig())

Expand Down Expand Up @@ -154,7 +124,7 @@ func TestIngesterAppend(t *testing.T) {
// Read samples back via chunk store.
ing.Shutdown()
for _, userID := range userIDs {
res, err := chunksToMatrix(store.chunks[userID])
res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64))
require.NoError(t, err)
sort.Sort(res)
assert.Equal(t, testData[userID], res)
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ingester

import (
"io"
"math"
"reflect"
"runtime"
"testing"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"

"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/ingester/client"
"github.com/weaveworks/cortex/pkg/ring"
"github.com/weaveworks/cortex/pkg/util"
Expand Down Expand Up @@ -314,7 +316,7 @@ func TestIngesterFlush(t *testing.T) {
})

// And check the store has the chunk
res, err := chunksToMatrix(store.chunks[userID])
res, err := chunk.ChunksToMatrix(context.Background(), store.chunks[userID], model.Time(0), model.Time(math.MaxInt64))
require.NoError(t, err)
assert.Equal(t, model.Matrix{
&model.SampleStream{
Expand Down
27 changes: 27 additions & 0 deletions pkg/querier/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package querier

import (
"fmt"
"testing"

"github.com/prometheus/prometheus/promql"
)

var result *promql.Result

func BenchmarkChunkQueryable(b *testing.B) {
for _, encoding := range encodings {
store, from := makeMockChunkStore(b, 24*30, encoding.e)

for _, q := range queryables {
b.Run(fmt.Sprintf("%s/%s", q.name, encoding.name), func(b *testing.B) {
queryable := q.f(store)
var r *promql.Result
for n := 0; n < b.N; n++ {
r = testQuery(b, queryable, from)
}
result = r
})
}
}
}
59 changes: 59 additions & 0 deletions pkg/querier/chunk_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package querier

import (
"github.com/prometheus/common/model"
"github.com/weaveworks/cortex/pkg/chunk"
promchunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
)

type chunkIterator struct {
chunk.Chunk
it promchunk.Iterator

// At() is called often in the heap code, so caching its result seems like
// a good idea.
cacheValid bool
cachedTime int64
cachedValue float64
}

// Seek advances the iterator forward to the value at or after
// the given timestamp.
func (i *chunkIterator) Seek(t int64) bool {
i.cacheValid = false

// We assume seeks only care about a specific window; if this chunk doesn't
// contain samples in that window, we can shortcut.
if int64(i.Through) < t {
return false
}

return i.it.FindAtOrAfter(model.Time(t))
}

func (i *chunkIterator) AtTime() int64 {
if !i.cacheValid {
v := i.it.Value()
i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value)
i.cacheValid = true
}
return i.cachedTime
}

func (i *chunkIterator) At() (int64, float64) {
if !i.cacheValid {
v := i.it.Value()
i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value)
i.cacheValid = true
}
return i.cachedTime, i.cachedValue
}

func (i *chunkIterator) Next() bool {
i.cacheValid = false
return i.it.Scan()
}

func (i *chunkIterator) Err() error {
return i.it.Err()
}
Loading