Skip to content

Add initial ingester end-to-end tests #78

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 28, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 170 additions & 0 deletions ingester/ingester_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package ingester

import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"

"github.com/prometheus/common/model"
prom_chunk "github.com/prometheus/prometheus/storage/local/chunk"
"github.com/prometheus/prometheus/storage/metric"
"golang.org/x/net/context"

"github.com/weaveworks/cortex/chunk"
"github.com/weaveworks/cortex/user"
)

type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.
chunks map[string][]chunk.Chunk
}

func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()

userID, err := user.GetID(ctx)
if err != nil {
return err
}
s.chunks[userID] = append(s.chunks[userID], chunks...)
return nil
}

func (s *testStore) Get(ctx context.Context, from, through model.Time, matchers ...*metric.LabelMatcher) ([]chunk.Chunk, error) {
return nil, nil
}

func (s *testStore) queryMatrix(userID string) model.Matrix {
sampleStreams := map[model.Fingerprint]*model.SampleStream{}

for _, c := range s.chunks[userID] {
fp := c.Metric.Fingerprint()
ss, ok := sampleStreams[fp]
if !ok {
ss = &model.SampleStream{
Metric: c.Metric,
}
sampleStreams[fp] = ss
}

lc, err := prom_chunk.NewForEncoding(prom_chunk.DoubleDelta)
if err != nil {
panic(err)
}
lc.UnmarshalFromBuf(c.Data)
it := lc.NewIterator()
var samples []model.SamplePair
for it.Scan() {
samples = append(samples, it.Value())
}

ss.Values = append(ss.Values, samples...)
}

matrix := make(model.Matrix, 0, len(sampleStreams))
for _, ss := range sampleStreams {
matrix = append(matrix, ss)
}
return matrix
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

@juliusv juliusv Oct 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's where I copied it from :) Was thinking about factoring that part out, but then I wasn't sure where to put it. Maybe just a ChunksToMatrix([]Chunk) (model.Matrix, error) function in the chunk package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ends up being a larger refactoring due to transitive dependencies and entanglements. I've got the result already locally, but I'd want to test it a bit more to make sure I didn't break any of the chunk sorting in other places which were touched as well. How about merging this one first and I'll do the refactoring as a followup.


func buildTestMatrix(numSeries int, samplesPerSeries int, offset int) model.Matrix {
m := make(model.Matrix, 0, numSeries)
for i := 0; i < numSeries; i++ {
ss := model.SampleStream{
Metric: model.Metric{
model.MetricNameLabel: model.LabelValue(fmt.Sprintf("testmetric_%d", i)),
model.JobLabel: "testjob",
},
Values: make([]model.SamplePair, 0, samplesPerSeries),
}
for j := 0; j < samplesPerSeries; j++ {
ss.Values = append(ss.Values, model.SamplePair{
Timestamp: model.Time(i + j + offset),
Value: model.SampleValue(i + j + offset),
})
}
m = append(m, &ss)
}
sort.Sort(m)
return m
}

func matrixToSamples(m model.Matrix) []*model.Sample {
var samples []*model.Sample
for _, ss := range m {
for _, sp := range ss.Values {
samples = append(samples, &model.Sample{
Metric: ss.Metric,
Timestamp: sp.Timestamp,
Value: sp.Value,
})
}
}
return samples
}

func TestIngesterAppend(t *testing.T) {
cfg := Config{
FlushCheckPeriod: 99999 * time.Hour,
MaxChunkAge: 99999 * time.Hour,
}
store := &testStore{
chunks: map[string][]chunk.Chunk{},
}
ing, err := New(cfg, store)
if err != nil {
t.Fatal(err)
}

userIDs := []string{"1", "2", "3"}

// Create test samples.
testData := map[string]model.Matrix{}
for i, userID := range userIDs {
testData[userID] = buildTestMatrix(10, 1000, i)
}

// Append samples.
for _, userID := range userIDs {
ctx := user.WithID(context.Background(), userID)
err = ing.Append(ctx, matrixToSamples(testData[userID]))
if err != nil {
t.Fatal(err)
}
}

// Read samples back via ingester queries.
for _, userID := range userIDs {
ctx := user.WithID(context.Background(), userID)
matcher, err := metric.NewLabelMatcher(metric.RegexMatch, model.JobLabel, ".+")
if err != nil {
t.Fatal(err)
}
res, err := ing.Query(ctx, model.Earliest, model.Latest, matcher)
if err != nil {
t.Fatal(err)
}
sort.Sort(res)

if !reflect.DeepEqual(res, testData[userID]) {
t.Fatalf("unexpected query result\n\nwant:\n\n%v\n\ngot:\n\n%v\n\n", testData[userID], res)
}
}

// Read samples back via chunk store.
ing.Stop()
for _, userID := range userIDs {
res := store.queryMatrix(userID)
sort.Sort(res)

if !reflect.DeepEqual(res, testData[userID]) {
t.Fatalf("unexpected chunk store result\n\nwant:\n\n%v\n\ngot:\n\n%v\n\n", testData[userID], res)
}
}
}