Skip to content

Lazily fetch series chunks #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions k8s/querier-dep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ spec:
- -dynamodb.v4-schema-from=2017-02-05
- -dynamodb.v5-schema-from=2017-02-22
- -dynamodb.v7-schema-from=2017-03-19
- -dynamodb.v8-schema-from=2017-06-05
- -dynamodb.chunk-table.from=2017-04-17
- -memcached.hostname=memcached.default.svc.cluster.local
- -memcached.timeout=100ms
Expand Down
1 change: 1 addition & 0 deletions k8s/ruler-dep.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ spec:
- -dynamodb.v4-schema-from=2017-02-05
- -dynamodb.v5-schema-from=2017-02-22
- -dynamodb.v7-schema-from=2017-03-19
- -dynamodb.v8-schema-from=2017-06-05
- -dynamodb.chunk-table.from=2017-04-17
- -memcached.hostname=memcached.default.svc.cluster.local
- -memcached.timeout=100ms
Expand Down
32 changes: 31 additions & 1 deletion pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"

"github.com/weaveworks/common/errors"
Expand Down Expand Up @@ -278,6 +279,35 @@ func (c *Chunk) decode(input []byte) error {
})
}

func chunksToIterators(chunks []Chunk) ([]local.SeriesIterator, error) {
// Group chunks by series, sort and dedupe samples.
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
for _, c := range chunks {
fp := c.Metric.Fingerprint()
ss, ok := sampleStreams[fp]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[fp] = ss
}

samples, err := c.samples()
if err != nil {
return nil, err
}

ss.Values = util.MergeSampleSets(ss.Values, samples)
}

iterators := make([]local.SeriesIterator, 0, len(sampleStreams))
for _, ss := range sampleStreams {
iterators = append(iterators, util.NewSampleStreamIterator(ss))
}

return iterators, nil
}

// ChunksToMatrix converts a slice of chunks into a model.Matrix.
func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
// Group chunks by series, sort and dedupe samples.
Expand All @@ -297,7 +327,7 @@ func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
return nil, err
}

ss.Values = util.MergeSamples(ss.Values, samples)
ss.Values = util.MergeSampleSets(ss.Values, samples)
}

matrix := make(model.Matrix, 0, len(sampleStreams))
Expand Down
92 changes: 47 additions & 45 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage/local"
"github.com/prometheus/prometheus/storage/metric"
"golang.org/x/net/context"

Expand Down Expand Up @@ -151,17 +152,35 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch
}

// Get implements ChunkStore
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]Chunk, error) {
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
if through < from {
return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from)
}

filters, matchers := util.SplitFiltersAndMatchers(allMatchers)

// Fetch chunk descriptors (just ID really) from storage
chunks, err := c.lookupChunksByMatchers(ctx, from, through, matchers)
// Fetch metric name chunks if the matcher is of type equal,
metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(matchers)
if ok && metricNameMatcher.Type == metric.Equal {
return c.getMetricNameIterators(ctx, from, through, filters, matchers, metricNameMatcher.Value)
}

// Otherwise we will create lazy iterators for all series in our index
return c.getFuzzyMetricLazySeriesIterators(ctx, from, through, filters, matchers, metricNameMatcher)
}

func (c *Store) getMetricNameIterators(ctx context.Context, from, through model.Time, filters []*metric.LabelMatcher, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]local.SeriesIterator, error) {
chunks, err := c.getMetricNameChunks(ctx, from, through, filters, matchers, metricName)
if err != nil {
return nil, promql.ErrStorage(err)
return nil, err
}
return chunksToIterators(chunks)
}

func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, filters []*metric.LabelMatcher, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName)
if err != nil {
return nil, err
}

// Filter out chunks that are not in the selected time range.
Expand Down Expand Up @@ -209,69 +228,52 @@ outer:
return filteredChunks, nil
}

func (c *Store) lookupChunksByMatchers(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher) ([]Chunk, error) {
metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(matchers)

// Only lookup by metric name if the matcher is of type equal, otherwise we
// have to fetch chunks for all metric names as other metric names could match.
if ok && metricNameMatcher.Type == metric.Equal {
return c.lookupChunksByMetricName(ctx, from, through, matchers, metricNameMatcher.Value)
}

func (c *Store) getFuzzyMetricLazySeriesIterators(ctx context.Context, from, through model.Time, filters []*metric.LabelMatcher, matchers []*metric.LabelMatcher, metricNameMatcher *metric.LabelMatcher) ([]local.SeriesIterator, error) {
// Get all series from the index
userID, err := user.Extract(ctx)
if err != nil {
return nil, err
}

// If there is no metric name, we want return chunks for all metric names
metricNameQueries, err := c.schema.GetReadQueries(from, through, userID)
seriesQueries, err := c.schema.GetReadQueries(from, through, userID)
if err != nil {
return nil, err
}
metricNameEntries, err := c.lookupEntriesByQueries(ctx, metricNameQueries)
seriesEntries, err := c.lookupEntriesByQueries(ctx, seriesQueries)
if err != nil {
return nil, err
}

incomingChunkSets := make(chan ByKey)
incomingErrors := make(chan error)
skippedMetricNames := 0

for _, metricNameEntry := range metricNameEntries {
metricName, err := parseMetricNameRangeValue(metricNameEntry.RangeValue, metricNameEntry.Value)
// Create a LazySeriesIterator for each series
lazyIterators := make([]local.SeriesIterator, 0, len(seriesEntries))
outer:
for _, seriesEntry := range seriesEntries {
metric, err := parseSeriesRangeValue(seriesEntry.RangeValue, seriesEntry.Value)
if err != nil {
return nil, err
}

// We are fetching all metric name chunks, however if there is a metricNameMatcher,
// we only want metric names that match
if ok && !metricNameMatcher.Match(metricName) {
skippedMetricNames++
// Apply metricNameMatcher filter
if metricNameMatcher != nil && !metricNameMatcher.Match(metric[metricNameMatcher.Name]) {
continue
}

go func(metricName model.LabelValue) {
chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName)
if err != nil {
incomingErrors <- err
} else {
incomingChunkSets <- chunks
// Apply matchers
for _, matcher := range matchers {
if !matcher.Match(metric[matcher.Name]) {
continue outer
}
}(metricName)
}
}

var chunkSets []ByKey
var lastErr error
for i := 0; i < (len(metricNameEntries) - skippedMetricNames); i++ {
select {
case incoming := <-incomingChunkSets:
chunkSets = append(chunkSets, incoming)
case err := <-incomingErrors:
lastErr = err
// Apply filters
for _, filter := range filters {
if !filter.Match(metric[filter.Name]) {
continue outer
}
}
}

return nWayUnion(chunkSets), lastErr
lazyIterators = append(lazyIterators, NewLazySeriesIterator(c, metric, from, through, matchers))
}
return lazyIterators, nil
}

func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
Expand Down
Loading