Skip to content

Commit 46e68c6

Browse files
committed
Refactor chunks-to-matrix code and its deps
1 parent 1df27aa commit 46e68c6

File tree

5 files changed

+88
-134
lines changed

5 files changed

+88
-134
lines changed

chunk/chunk.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ package chunk
44

55
import (
66
"github.com/prometheus/common/model"
7+
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
8+
9+
"github.com/weaveworks/cortex/util"
710
)
811

912
// Chunk contains encoded timeseries data
@@ -21,3 +24,50 @@ type ByID []Chunk
2124
func (cs ByID) Len() int { return len(cs) }
2225
func (cs ByID) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] }
2326
func (cs ByID) Less(i, j int) bool { return cs[i].ID < cs[j].ID }
27+
28+
// ChunksToMatrix converts a slice of chunks into a model.Matrix.
29+
func ChunksToMatrix(chunks []Chunk) (model.Matrix, error) {
30+
// Group chunks by series, sort and dedupe samples.
31+
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
32+
33+
for _, c := range chunks {
34+
fp := c.Metric.Fingerprint()
35+
ss, ok := sampleStreams[fp]
36+
if !ok {
37+
ss = &model.SampleStream{
38+
Metric: c.Metric,
39+
}
40+
sampleStreams[fp] = ss
41+
}
42+
43+
samples, err := decodeChunk(c.Data)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
ss.Values = util.MergeSamples(ss.Values, samples)
49+
}
50+
51+
matrix := make(model.Matrix, 0, len(sampleStreams))
52+
for _, ss := range sampleStreams {
53+
matrix = append(matrix, ss)
54+
}
55+
56+
return matrix, nil
57+
}
58+
59+
func decodeChunk(buf []byte) ([]model.SamplePair, error) {
60+
lc, err := prom_chunk.NewForEncoding(prom_chunk.DoubleDelta)
61+
if err != nil {
62+
return nil, err
63+
}
64+
lc.UnmarshalFromBuf(buf)
65+
it := lc.NewIterator()
66+
// TODO(juliusv): Pre-allocate this with the right length again once we
67+
// add a method upstream to get the number of samples in a chunk.
68+
var samples []model.SamplePair
69+
for it.Scan() {
70+
samples = append(samples, it.Value())
71+
}
72+
return samples, nil
73+
}

distributor.go

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/weaveworks/cortex/ingester"
2929
"github.com/weaveworks/cortex/ring"
3030
"github.com/weaveworks/cortex/user"
31+
"github.com/weaveworks/cortex/util"
3132
)
3233

3334
var (
@@ -294,7 +295,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
294295
Values: ss.Values,
295296
}
296297
} else {
297-
mss.Values = mergeSamples(fpToSampleStream[fp].Values, ss.Values)
298+
mss.Values = util.MergeSamples(fpToSampleStream[fp].Values, ss.Values)
298299
}
299300
}
300301
}
@@ -313,31 +314,6 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers .
313314
return result, err
314315
}
315316

316-
func mergeSamples(a, b []model.SamplePair) []model.SamplePair {
317-
result := make([]model.SamplePair, 0, len(a)+len(b))
318-
i, j := 0, 0
319-
for i < len(a) && j < len(b) {
320-
if a[i].Timestamp < b[j].Timestamp {
321-
result = append(result, a[i])
322-
i++
323-
} else if a[i].Timestamp > b[j].Timestamp {
324-
result = append(result, b[j])
325-
j++
326-
} else {
327-
result = append(result, a[i])
328-
i++
329-
j++
330-
}
331-
}
332-
for ; i < len(a); i++ {
333-
result = append(result, a[i])
334-
}
335-
for ; j < len(b); j++ {
336-
result = append(result, b[j])
337-
}
338-
return result
339-
}
340-
341317
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
342318
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName model.LabelName) (model.LabelValues, error) {
343319
valueSet := map[model.LabelValue]struct{}{}

ingester/ingester_test.go

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"time"
1010

1111
"github.com/prometheus/common/model"
12-
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
1312
"github.com/prometheus/prometheus/storage/metric"
1413
"golang.org/x/net/context"
1514

@@ -39,40 +38,6 @@ func (s *testStore) Get(ctx context.Context, from, through model.Time, matchers
3938
return nil, nil
4039
}
4140

42-
func (s *testStore) queryMatrix(userID string) model.Matrix {
43-
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
44-
45-
for _, c := range s.chunks[userID] {
46-
fp := c.Metric.Fingerprint()
47-
ss, ok := sampleStreams[fp]
48-
if !ok {
49-
ss = &model.SampleStream{
50-
Metric: c.Metric,
51-
}
52-
sampleStreams[fp] = ss
53-
}
54-
55-
lc, err := prom_chunk.NewForEncoding(prom_chunk.DoubleDelta)
56-
if err != nil {
57-
panic(err)
58-
}
59-
lc.UnmarshalFromBuf(c.Data)
60-
it := lc.NewIterator()
61-
var samples []model.SamplePair
62-
for it.Scan() {
63-
samples = append(samples, it.Value())
64-
}
65-
66-
ss.Values = append(ss.Values, samples...)
67-
}
68-
69-
matrix := make(model.Matrix, 0, len(sampleStreams))
70-
for _, ss := range sampleStreams {
71-
matrix = append(matrix, ss)
72-
}
73-
return matrix
74-
}
75-
7641
func buildTestMatrix(numSeries int, samplesPerSeries int, offset int) model.Matrix {
7742
m := make(model.Matrix, 0, numSeries)
7843
for i := 0; i < numSeries; i++ {
@@ -160,7 +125,10 @@ func TestIngesterAppend(t *testing.T) {
160125
// Read samples back via chunk store.
161126
ing.Stop()
162127
for _, userID := range userIDs {
163-
res := store.queryMatrix(userID)
128+
res, err := chunk.ChunksToMatrix(store.chunks[userID])
129+
if err != nil {
130+
t.Fatal(err)
131+
}
164132
sort.Sort(res)
165133

166134
if !reflect.DeepEqual(res, testData[userID]) {

querier/querier.go

Lines changed: 3 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@ package querier
1515

1616
import (
1717
"fmt"
18-
"sort"
1918
"time"
2019

2120
"github.com/prometheus/common/model"
2221
"github.com/prometheus/prometheus/storage/local"
23-
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
2422
"github.com/prometheus/prometheus/storage/metric"
2523
"golang.org/x/net/context"
2624

2725
"github.com/weaveworks/cortex/chunk"
26+
"github.com/weaveworks/cortex/util"
2827
)
2928

3029
// A Querier allows querying all samples in a given time range that match a set
@@ -48,68 +47,7 @@ func (q *ChunkQuerier) Query(ctx context.Context, from, to model.Time, matchers
4847
return nil, err
4948
}
5049

51-
// Group chunks by series, sort and dedupe samples.
52-
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
53-
54-
for _, c := range chunks {
55-
fp := c.Metric.Fingerprint()
56-
ss, ok := sampleStreams[fp]
57-
if !ok {
58-
ss = &model.SampleStream{
59-
Metric: c.Metric,
60-
}
61-
sampleStreams[fp] = ss
62-
}
63-
64-
samples, err := decodeChunk(c.Data)
65-
if err != nil {
66-
return nil, err
67-
}
68-
69-
ss.Values = append(ss.Values, samples...)
70-
}
71-
72-
for _, ss := range sampleStreams {
73-
sort.Sort(timeSortableSamplePairs(ss.Values))
74-
// TODO: should we also dedupe samples here or leave that to the upper layers?
75-
}
76-
77-
matrix := make(model.Matrix, 0, len(sampleStreams))
78-
for _, ss := range sampleStreams {
79-
matrix = append(matrix, ss)
80-
}
81-
82-
return matrix, nil
83-
}
84-
85-
func decodeChunk(buf []byte) ([]model.SamplePair, error) {
86-
lc, err := prom_chunk.NewForEncoding(prom_chunk.DoubleDelta)
87-
if err != nil {
88-
return nil, err
89-
}
90-
lc.UnmarshalFromBuf(buf)
91-
it := lc.NewIterator()
92-
// TODO(juliusv): Pre-allocate this with the right length again once we
93-
// add a method upstream to get the number of samples in a chunk.
94-
var samples []model.SamplePair
95-
for it.Scan() {
96-
samples = append(samples, it.Value())
97-
}
98-
return samples, nil
99-
}
100-
101-
type timeSortableSamplePairs []model.SamplePair
102-
103-
func (ts timeSortableSamplePairs) Len() int {
104-
return len(ts)
105-
}
106-
107-
func (ts timeSortableSamplePairs) Less(i, j int) bool {
108-
return ts[i].Timestamp < ts[j].Timestamp
109-
}
110-
111-
func (ts timeSortableSamplePairs) Swap(i, j int) {
112-
ts[i], ts[j] = ts[j], ts[i]
50+
return chunk.ChunksToMatrix(chunks)
11351
}
11452

11553
// LabelValuesForLabelName returns all of the label values that are associated with a given label name.
@@ -145,18 +83,11 @@ func (qm MergeQuerier) QueryRange(ctx context.Context, from, to model.Time, matc
14583
}
14684
} else {
14785
ssIt := it.(sampleStreamIterator)
148-
ssIt.ss.Values = append(ssIt.ss.Values, ss.Values...)
86+
ssIt.ss.Values = util.MergeSamples(ssIt.ss.Values, ss.Values)
14987
}
15088
}
15189
}
15290

153-
// Sort and dedupe samples.
154-
for _, it := range fpToIt {
155-
sortable := timeSortableSamplePairs(it.(sampleStreamIterator).ss.Values)
156-
sort.Sort(sortable)
157-
// TODO: Dedupe samples. Not strictly necessary.
158-
}
159-
16091
iterators := make([]local.SeriesIterator, 0, len(fpToIt))
16192
for _, it := range fpToIt {
16293
iterators = append(iterators, it)

util/util.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package util
2+
3+
import "github.com/prometheus/common/model"
4+
5+
// MergeSamples merges and dedupes two sets of already sorted sample pairs.
6+
func MergeSamples(a, b []model.SamplePair) []model.SamplePair {
7+
result := make([]model.SamplePair, 0, len(a)+len(b))
8+
i, j := 0, 0
9+
for i < len(a) && j < len(b) {
10+
if a[i].Timestamp < b[j].Timestamp {
11+
result = append(result, a[i])
12+
i++
13+
} else if a[i].Timestamp > b[j].Timestamp {
14+
result = append(result, b[j])
15+
j++
16+
} else {
17+
result = append(result, a[i])
18+
i++
19+
j++
20+
}
21+
}
22+
for ; i < len(a); i++ {
23+
result = append(result, a[i])
24+
}
25+
for ; j < len(b); j++ {
26+
result = append(result, b[j])
27+
}
28+
return result
29+
}

0 commit comments

Comments
 (0)