Skip to content

Commit 577ec7c

Browse files
committed
Add initial ingester end-to-end tests
1 parent fede332 commit 577ec7c

File tree

1 file changed

+170
-0
lines changed

1 file changed

+170
-0
lines changed

ingester/ingester_test.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package ingester
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
"sort"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/common/model"
12+
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
13+
"github.com/prometheus/prometheus/storage/metric"
14+
"golang.org/x/net/context"
15+
16+
"github.com/weaveworks/cortex/chunk"
17+
"github.com/weaveworks/cortex/user"
18+
)
19+
20+
type testStore struct {
21+
mtx sync.Mutex
22+
// Chunks keyed by userID.
23+
chunks map[string][]chunk.Chunk
24+
}
25+
26+
func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
27+
s.mtx.Lock()
28+
defer s.mtx.Unlock()
29+
30+
userID, err := user.GetID(ctx)
31+
if err != nil {
32+
return err
33+
}
34+
s.chunks[userID] = append(s.chunks[userID], chunks...)
35+
return nil
36+
}
37+
38+
func (s *testStore) Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]chunk.Chunk, error) {
39+
return nil, nil
40+
}
41+
42+
func (s *testStore) queryMatrix(userID string) model.Matrix {
43+
sampleStreams := map[model.Fingerprint]*model.SampleStream{}
44+
45+
for _, c := range s.chunks[userID] {
46+
fp := c.Metric.Fingerprint()
47+
ss, ok := sampleStreams[fp]
48+
if !ok {
49+
ss = &model.SampleStream{
50+
Metric: c.Metric,
51+
}
52+
sampleStreams[fp] = ss
53+
}
54+
55+
lc, err := prom_chunk.NewForEncoding(prom_chunk.DoubleDelta)
56+
if err != nil {
57+
panic(err)
58+
}
59+
lc.UnmarshalFromBuf(c.Data)
60+
it := lc.NewIterator()
61+
var samples []model.SamplePair
62+
for it.Scan() {
63+
samples = append(samples, it.Value())
64+
}
65+
66+
ss.Values = append(ss.Values, samples...)
67+
}
68+
69+
matrix := make(model.Matrix, 0, len(sampleStreams))
70+
for _, ss := range sampleStreams {
71+
matrix = append(matrix, ss)
72+
}
73+
return matrix
74+
}
75+
76+
func buildTestMatrix(numSeries int, samplesPerSeries int, offset int) model.Matrix {
77+
m := make(model.Matrix, 0, numSeries)
78+
for i := 0; i < numSeries; i++ {
79+
ss := model.SampleStream{
80+
Metric: model.Metric{
81+
model.MetricNameLabel: model.LabelValue(fmt.Sprintf("testmetric_%d", i)),
82+
model.JobLabel: "testjob",
83+
},
84+
Values: make([]model.SamplePair, 0, samplesPerSeries),
85+
}
86+
for j := 0; j < samplesPerSeries; j++ {
87+
ss.Values = append(ss.Values, model.SamplePair{
88+
Timestamp: model.Time(i + j + offset),
89+
Value: model.SampleValue(i + j + offset),
90+
})
91+
}
92+
m = append(m, &ss)
93+
}
94+
sort.Sort(m)
95+
return m
96+
}
97+
98+
func matrixToSamples(m model.Matrix) []*model.Sample {
99+
var samples []*model.Sample
100+
for _, ss := range m {
101+
for _, sp := range ss.Values {
102+
samples = append(samples, &model.Sample{
103+
Metric: ss.Metric,
104+
Timestamp: sp.Timestamp,
105+
Value: sp.Value,
106+
})
107+
}
108+
}
109+
return samples
110+
}
111+
112+
func TestIngesterAppend(t *testing.T) {
113+
cfg := Config{
114+
FlushCheckPeriod: 99999 * time.Hour,
115+
MaxChunkAge: 99999 * time.Hour,
116+
}
117+
store := &testStore{
118+
chunks: map[string][]chunk.Chunk{},
119+
}
120+
ing, err := New(cfg, store)
121+
if err != nil {
122+
t.Fatal(err)
123+
}
124+
125+
userIDs := []string{"1", "2", "3"}
126+
127+
// Create test samples.
128+
testData := map[string]model.Matrix{}
129+
for i, userID := range userIDs {
130+
testData[userID] = buildTestMatrix(10, 1000, i)
131+
}
132+
133+
// Append samples.
134+
for _, userID := range userIDs {
135+
ctx := user.WithID(context.Background(), userID)
136+
err = ing.Append(ctx, matrixToSamples(testData[userID]))
137+
if err != nil {
138+
t.Fatal(err)
139+
}
140+
}
141+
142+
// Read samples back via ingester queries.
143+
for _, userID := range userIDs {
144+
ctx := user.WithID(context.Background(), userID)
145+
matcher, err := metric.NewLabelMatcher(metric.RegexMatch, model.JobLabel, ".+")
146+
if err != nil {
147+
t.Fatal(err)
148+
}
149+
res, err := ing.Query(ctx, model.Earliest, model.Latest, matcher)
150+
if err != nil {
151+
t.Fatal(err)
152+
}
153+
sort.Sort(res)
154+
155+
if !reflect.DeepEqual(res, testData[userID]) {
156+
t.Fatalf("unexpected query result\n\nwant:\n\n%v\n\ngot:\n\n%v\n\n", testData[userID], res)
157+
}
158+
}
159+
160+
// Read samples back via chunk store.
161+
ing.Stop()
162+
for _, userID := range userIDs {
163+
res := store.queryMatrix(userID)
164+
sort.Sort(res)
165+
166+
if !reflect.DeepEqual(res, testData[userID]) {
167+
t.Fatalf("unexpected chunk store result\n\nwant:\n\n%v\n\ngot:\n\n%v\n\n", testData[userID], res)
168+
}
169+
}
170+
}

0 commit comments

Comments
 (0)