Skip to content

Commit 9a5771b

Browse files
committed
Other adjustments to new packages and types
1 parent 55b8d35 commit 9a5771b

37 files changed

+316
-414
lines changed

pkg/chunk/aws_storage_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chunk
22

33
import (
44
"bytes"
5+
"context"
56
"flag"
67
"fmt"
78
"io/ioutil"
@@ -21,7 +22,6 @@ import (
2122
"github.com/aws/aws-sdk-go/service/s3/s3iface"
2223
"github.com/prometheus/client_golang/prometheus"
2324
"github.com/prometheus/common/log"
24-
"golang.org/x/net/context"
2525

2626
awscommon "github.com/weaveworks/common/aws"
2727
"github.com/weaveworks/common/instrument"

pkg/chunk/aws_storage_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package chunk
22

33
import (
44
"bytes"
5+
"context"
56
"fmt"
67
"io/ioutil"
78
"math/rand"
@@ -22,7 +23,6 @@ import (
2223
"github.com/prometheus/common/log"
2324
"github.com/prometheus/common/model"
2425
"github.com/stretchr/testify/require"
25-
"golang.org/x/net/context"
2626

2727
"github.com/weaveworks/cortex/pkg/util"
2828
)

pkg/chunk/chunk.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ import (
1313
"github.com/golang/snappy"
1414
"github.com/pkg/errors"
1515
"github.com/prometheus/common/model"
16-
"github.com/prometheus/prometheus/storage/local"
17-
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
16+
prom_chunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
1817

1918
errs "github.com/weaveworks/common/errors"
2019
"github.com/weaveworks/cortex/pkg/util"
@@ -280,7 +279,7 @@ func (c *Chunk) Decode(input []byte) error {
280279
})
281280
}
282281

283-
func chunksToIterators(chunks []Chunk) ([]local.SeriesIterator, error) {
282+
func chunksToMatrix(chunks []Chunk) (model.Matrix, error) {
284283
// Group chunks by series, sort and dedupe samples.
285284
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
286285
for _, c := range chunks {
@@ -301,12 +300,15 @@ func chunksToIterators(chunks []Chunk) ([]local.SeriesIterator, error) {
301300
ss.Values = util.MergeSampleSets(ss.Values, samples)
302301
}
303302

304-
iterators := make([]local.SeriesIterator, 0, len(sampleStreams))
303+
matrix := make(model.Matrix, 0, len(sampleStreams))
305304
for _, ss := range sampleStreams {
306-
iterators = append(iterators, util.NewSampleStreamIterator(ss))
305+
matrix = append(matrix, &model.SampleStream{
306+
Metric: ss.Metric,
307+
Values: ss.Values,
308+
})
307309
}
308310

309-
return iterators, nil
311+
return matrix, nil
310312
}
311313

312314
// Samples returns all SamplePairs for the chunk.

pkg/chunk/chunk_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package chunk
22

33
import (
4+
"context"
45
"flag"
56
"sync"
67
"time"
@@ -9,7 +10,6 @@ import (
910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/common/log"
1112
"github.com/weaveworks/common/instrument"
12-
"golang.org/x/net/context"
1313

1414
"github.com/weaveworks/cortex/pkg/util"
1515
)

pkg/chunk/chunk_cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77

88
"github.com/bradfitz/gomemcache/memcache"
99
"github.com/prometheus/common/model"
10-
"github.com/prometheus/prometheus/storage/local/chunk"
1110
"github.com/stretchr/testify/require"
11+
"github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
1212
"golang.org/x/net/context"
1313
)
1414

pkg/chunk/chunk_store.go

Lines changed: 47 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package chunk
22

33
import (
4+
"context"
45
"encoding/json"
56
"flag"
67
"fmt"
78
"sort"
89

910
"github.com/prometheus/client_golang/prometheus"
1011
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/pkg/labels"
1113
"github.com/prometheus/prometheus/promql"
12-
"github.com/prometheus/prometheus/storage/local"
13-
"github.com/prometheus/prometheus/storage/metric"
14-
"golang.org/x/net/context"
1514

1615
"github.com/weaveworks/common/user"
1716
"github.com/weaveworks/cortex/pkg/util"
@@ -150,7 +149,7 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch
150149
}
151150

152151
// Get implements ChunkStore
153-
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*metric.LabelMatcher) ([]local.SeriesIterator, error) {
152+
func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) {
154153
if through < from {
155154
return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from)
156155
}
@@ -159,7 +158,7 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
159158
if from.After(now) {
160159
// time-span start is in future ... regard as legal
161160
util.WithContext(ctx).Debugf("Whole timerange %v..%v in future (now=%v) yield empty resultset", through, from, now)
162-
return []local.SeriesIterator{}, nil
161+
return nil, nil
163162
}
164163

165164
if through.After(now) {
@@ -170,23 +169,23 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers .
170169

171170
// Fetch metric name chunks if the matcher is of type equal,
172171
metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(allMatchers)
173-
if ok && metricNameMatcher.Type == metric.Equal {
174-
return c.getMetricNameIterators(ctx, from, through, matchers, metricNameMatcher.Value)
172+
if ok && metricNameMatcher.Type == labels.MatchEqual {
173+
return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value)
175174
}
176175

177176
// Otherwise we will create lazy iterators for all series in our index
178-
return c.getSeriesIterators(ctx, from, through, matchers, metricNameMatcher)
177+
return c.getSeriesMatrix(ctx, from, through, matchers, metricNameMatcher)
179178
}
180179

181-
func (c *Store) getMetricNameIterators(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricName model.LabelValue) ([]local.SeriesIterator, error) {
180+
func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) (model.Matrix, error) {
182181
chunks, err := c.getMetricNameChunks(ctx, from, through, allMatchers, metricName)
183182
if err != nil {
184183
return nil, err
185184
}
186-
return chunksToIterators(chunks)
185+
return chunksToMatrix(chunks)
187186
}
188187

189-
func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
188+
func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) {
190189
logger := util.WithContext(ctx)
191190
filters, matchers := util.SplitFiltersAndMatchers(allMatchers)
192191
chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName)
@@ -228,7 +227,7 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim
228227
outer:
229228
for _, chunk := range allChunks {
230229
for _, filter := range filters {
231-
if !filter.Match(chunk.Metric[filter.Name]) {
230+
if !filter.Matches(string(chunk.Metric[model.LabelName(filter.Name)])) {
232231
continue outer
233232
}
234233
}
@@ -238,7 +237,13 @@ outer:
238237
return filteredChunks, nil
239238
}
240239

241-
func (c *Store) getSeriesIterators(ctx context.Context, from, through model.Time, allMatchers []*metric.LabelMatcher, metricNameMatcher *metric.LabelMatcher) ([]local.SeriesIterator, error) {
240+
type byMatcherLabel []*labels.Matcher
241+
242+
func (lms byMatcherLabel) Len() int { return len(lms) }
243+
func (lms byMatcherLabel) Swap(i, j int) { lms[i], lms[j] = lms[j], lms[i] }
244+
func (lms byMatcherLabel) Less(i, j int) bool { return lms[i].Name < lms[j].Name }
245+
246+
func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) {
242247
// Get all series from the index
243248
userID, err := user.ExtractOrgID(ctx)
244249
if err != nil {
@@ -253,7 +258,7 @@ func (c *Store) getSeriesIterators(ctx context.Context, from, through model.Time
253258
return nil, err
254259
}
255260

256-
lazyIterators := make([]local.SeriesIterator, 0, len(seriesEntries))
261+
matrix := make(model.Matrix, 0, len(seriesEntries))
257262
outer:
258263
for _, seriesEntry := range seriesEntries {
259264
metric, err := parseSeriesRangeValue(seriesEntry.RangeValue, seriesEntry.Value)
@@ -262,40 +267,52 @@ outer:
262267
}
263268

264269
// Apply metric name matcher
265-
if metricNameMatcher != nil && !metricNameMatcher.Match(metric[metricNameMatcher.Name]) {
270+
if metricNameMatcher != nil && !metricNameMatcher.Matches(string(metric[model.LabelName(metricNameMatcher.Name)])) {
266271
continue outer
267272
}
268273

269274
// Apply matchers
270275
for _, matcher := range allMatchers {
271-
if !matcher.Match(metric[matcher.Name]) {
276+
if !matcher.Matches(string(metric[model.LabelName(matcher.Name)])) {
272277
continue outer
273278
}
274279
}
275280

276-
orgID, err := user.ExtractOrgID(ctx)
277-
if err != nil {
278-
return nil, err
281+
// TODO(prom2): Does this contraption over-match?
282+
var matchers []*labels.Matcher
283+
for labelName, labelValue := range metric {
284+
if labelName == "__name__" {
285+
continue
286+
}
287+
288+
matcher, err := labels.NewMatcher(labels.MatchEqual, string(labelName), string(labelValue))
289+
if err != nil {
290+
return nil, err
291+
}
292+
matchers = append(matchers, matcher)
279293
}
280-
newIterator, err := NewLazySeriesIterator(c, metric, from, through, orgID)
294+
// TODO(prom2): why is sorting needed?
295+
sort.Sort(byMatcherLabel(matchers))
296+
297+
m, err := c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Name)
281298
if err != nil {
282299
return nil, err
283300
}
284301

285-
lazyIterators = append(lazyIterators, newIterator)
302+
matrix = append(matrix, m...)
286303
}
287-
return lazyIterators, nil
304+
return matrix, nil
288305
}
289306

290-
func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*metric.LabelMatcher, metricName model.LabelValue) ([]Chunk, error) {
307+
func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) {
291308
userID, err := user.ExtractOrgID(ctx)
292309
if err != nil {
293310
return nil, err
294311
}
295312

296313
// Just get chunks for metric if there are no matchers
297314
if len(matchers) == 0 {
298-
queries, err := c.schema.GetReadQueriesForMetric(from, through, userID, metricName)
315+
queries, err := c.schema.GetReadQueriesForMetric(from, through, userID, model.LabelValue(metricName))
299316
if err != nil {
300317
return nil, err
301318
}
@@ -312,14 +329,14 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode
312329
incomingChunkSets := make(chan ByKey)
313330
incomingErrors := make(chan error)
314331
for _, matcher := range matchers {
315-
go func(matcher *metric.LabelMatcher) {
332+
go func(matcher *labels.Matcher) {
316333
// Lookup IndexQuery's
317334
var queries []IndexQuery
318335
var err error
319-
if matcher.Type != metric.Equal {
320-
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name)
336+
if matcher.Type != labels.MatchEqual {
337+
queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name))
321338
} else {
322-
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value)
339+
queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, model.LabelValue(metricName), model.LabelName(matcher.Name), model.LabelValue(matcher.Value))
323340
}
324341
if err != nil {
325342
incomingErrors <- err
@@ -409,7 +426,7 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I
409426
return entries, nil
410427
}
411428

412-
func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []IndexEntry, matcher *metric.LabelMatcher) (ByKey, error) {
429+
func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) (ByKey, error) {
413430
userID, err := user.ExtractOrgID(ctx)
414431
if err != nil {
415432
return nil, err
@@ -437,7 +454,7 @@ func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []Index
437454
chunk.metadataInIndex = true
438455
}
439456

440-
if matcher != nil && !matcher.Match(labelValue) {
457+
if matcher != nil && !matcher.Matches(string(labelValue)) {
441458
util.WithContext(ctx).Debug("Dropping chunk for non-matching metric ", chunk.Metric)
442459
continue
443460
}

0 commit comments

Comments
 (0)