Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 99 additions & 199 deletions pkg/chunk/chunk_store.go

Large diffs are not rendered by default.

247 changes: 104 additions & 143 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,51 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
"github.com/weaveworks/cortex/pkg/util/extract"
"golang.org/x/net/context"

"github.com/weaveworks/common/test"
"github.com/weaveworks/common/user"
"github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
"github.com/weaveworks/cortex/pkg/util"
"github.com/weaveworks/cortex/pkg/util/extract"
)

type schemaFactory func(cfg SchemaConfig) Schema
type storeFactory func(StoreConfig, Schema, StorageClient) (Store, error)

var schemas = []struct {
name string
schemaFn schemaFactory
storeFn storeFactory
requireMetricName bool
}{
{"v1 schema", v1Schema, newStore, true},
{"v2 schema", v2Schema, newStore, true},
{"v3 schema", v3Schema, newStore, true},
{"v4 schema", v4Schema, newStore, true},
{"v5 schema", v5Schema, newStore, true},
{"v6 schema", v6Schema, newStore, true},
{"v7 schema", v7Schema, newStore, true},
{"v8 schema", v8Schema, newStore, false},
{"v9 schema", v9Schema, newSeriesStore, true},
}

// newTestStore creates a new Store for testing.
func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store {
func newTestChunkStore(t *testing.T, schemaFactory schemaFactory, storeFactory storeFactory) Store {
var (
storeCfg StoreConfig
schemaCfg SchemaConfig
)
util.DefaultValues(&storeCfg, &schemaCfg)

storage := NewMockStorage()
schemaCfg := SchemaConfig{}
tableManager, err := NewTableManager(schemaCfg, maxChunkAge, storage)
require.NoError(t, err)

err = tableManager.SyncTables(context.Background())
require.NoError(t, err)
store, err := NewStore(cfg, schemaCfg, storage)

store, err := storeFactory(storeCfg, schemaFactory(schemaCfg), storage)
require.NoError(t, err)
return store
}
Expand Down Expand Up @@ -102,21 +130,6 @@ func TestChunkStore_Get(t *testing.T) {
barSampleStream2, err := createSampleStreamFrom(barChunk2)
require.NoError(t, err)

schemas := []struct {
name string
fn func(cfg SchemaConfig) Schema
requireMetricName bool
}{
{"v1 schema", v1Schema, true},
{"v2 schema", v2Schema, true},
{"v3 schema", v3Schema, true},
{"v4 schema", v4Schema, true},
{"v5 schema", v5Schema, true},
{"v6 schema", v6Schema, true},
{"v7 schema", v7Schema, true},
{"v8 schema", v8Schema, false},
}

for _, tc := range []struct {
query string
expect model.Matrix
Expand Down Expand Up @@ -193,10 +206,7 @@ func TestChunkStore_Get(t *testing.T) {
for _, schema := range schemas {
t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) {
t.Log("========= Running query", tc.query, "with schema", schema.name)
store := newTestChunkStore(t, StoreConfig{
schemaFactory: schema.fn,
QueryChunkLimit: 2e6,
})
store := newTestChunkStore(t, schema.schemaFn, schema.storeFn)

if err := store.Put(ctx, []Chunk{
fooChunk1,
Expand Down Expand Up @@ -226,8 +236,6 @@ func TestChunkStore_Get(t *testing.T) {

sort.Sort(ByFingerprint(matrix1))
if !reflect.DeepEqual(tc.expect, matrix1) {
t.Fatalf("jml\nstart = %#v\nnow = %#v\nfooChunk1 = %#v\nfooChunk2 = %#v\nbarChunk1 = %#v\nbarChunk2 = %#v\n",
now.Add(-time.Hour), now, fooChunk1, fooChunk2, barChunk1, barChunk2)
t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, matrix1))
}

Expand Down Expand Up @@ -259,7 +267,6 @@ func TestChunkStore_Get(t *testing.T) {
func TestChunkStore_getMetricNameChunks(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
now := model.Now()
metricName := "foo"
chunk1 := dummyChunkFor(now, model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
Expand All @@ -272,83 +279,62 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) {
"toms": "code",
})

schemas := []struct {
name string
fn func(cfg SchemaConfig) Schema
}{
{"v1 schema", v1Schema},
{"v2 schema", v2Schema},
{"v3 schema", v3Schema},
{"v4 schema", v4Schema},
{"v5 schema", v5Schema},
{"v6 schema", v6Schema},
{"v7 schema", v7Schema},
{"v8 schema", v8Schema},
}

for _, tc := range []struct {
query string
expect []Chunk
matchers []*labels.Matcher
query string
expect []Chunk
}{
{
`foo`,
[]Chunk{chunk1, chunk2},
[]*labels.Matcher{},
},
{
`foo{flip=""}`,
[]Chunk{chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "flip", "")},
},
{
`foo{bar="baz"}`,
[]Chunk{chunk1},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")},
},
{
`foo{bar="beep"}`,
[]Chunk{chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "beep")},
},
{
`foo{toms="code"}`,
[]Chunk{chunk1, chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code")},
},
{
`foo{bar!="baz"}`,
[]Chunk{chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchNotEqual, "bar", "baz")},
},
{
`foo{bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]Chunk{chunk1, chunk2},
[]*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchRegexp, "bar", "beep|baz")},
},
{
`foo{toms="code", bar="baz"}`,
[]Chunk{chunk1}, []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "toms", "code"), mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")},
[]Chunk{chunk1},
},
} {
for _, schema := range schemas {
t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) {
t.Log("========= Running query", tc.query, "with schema", schema.name)
store := newTestChunkStore(t, StoreConfig{
schemaFactory: schema.fn,
QueryChunkLimit: 2e6,
})
store := newTestChunkStore(t, schema.schemaFn, schema.storeFn)

if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil {
t.Fatal(err)
}

chunks, err := store.getMetricNameChunks(ctx, now.Add(-time.Hour), now, tc.matchers, metricName)
matchers, err := promql.ParseMetricSelector(tc.query)
if err != nil {
t.Fatal(err)
}

chunks, err := store.Get(ctx, now.Add(-time.Hour), now, matchers...)
require.NoError(t, err)

if !reflect.DeepEqual(tc.expect, chunks) {
Expand All @@ -369,96 +355,73 @@ func mustNewLabelMatcher(matchType labels.MatchType, name string, value string)

func TestChunkStoreRandom(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), userID)
schemas := []struct {
name string
fn func(cfg SchemaConfig) Schema
store *Store
}{
{name: "v1 schema", fn: v1Schema},
{name: "v2 schema", fn: v2Schema},
{name: "v3 schema", fn: v3Schema},
{name: "v4 schema", fn: v4Schema},
{name: "v5 schema", fn: v5Schema},
{name: "v6 schema", fn: v6Schema},
{name: "v7 schema", fn: v7Schema},
{name: "v8 schema", fn: v8Schema},
}

for i := range schemas {
schemas[i].store = newTestChunkStore(t, StoreConfig{
schemaFactory: schemas[i].fn,
QueryChunkLimit: 2e6,
})
}
for _, schema := range schemas {
t.Run(schema.name, func(t *testing.T) {
store := newTestChunkStore(t, schema.schemaFn, schema.storeFn)

// put 100 chunks from 0 to 99
const chunkLen = 13 * 3600 // in seconds
for i := 0; i < 100; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
chunks, _ := chunk.New().Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(float64(i)),
})
chunk := NewChunk(
userID,
model.Fingerprint(1),
model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
},
chunks[0],
ts,
ts.Add(chunkLen*time.Second),
)

err := store.Put(ctx, []Chunk{chunk})
require.NoError(t, err)
}

// put 100 chunks from 0 to 99
const chunkLen = 13 * 3600 // in seconds
for i := 0; i < 100; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
chunks, _ := chunk.New().Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(float64(i)),
})
chunk := NewChunk(
userID,
model.Fingerprint(1),
model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
},
chunks[0],
ts,
ts.Add(chunkLen*time.Second),
)
for _, s := range schemas {
err := s.store.Put(ctx, []Chunk{chunk})
require.NoError(t, err)
}
}
// pick two random numbers and do a query
for i := 0; i < 100; i++ {
start := rand.Int63n(100 * chunkLen)
end := start + rand.Int63n((100*chunkLen)-start)
assert.True(t, start < end)

// pick two random numbers and do a query
for i := 0; i < 100; i++ {
start := rand.Int63n(100 * chunkLen)
end := start + rand.Int63n((100*chunkLen)-start)
assert.True(t, start < end)
startTime := model.TimeFromUnix(start)
endTime := model.TimeFromUnix(end)

startTime := model.TimeFromUnix(start)
endTime := model.TimeFromUnix(end)
matchers := []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"),
}
chunks, err := store.Get(ctx, startTime, endTime, matchers...)
require.NoError(t, err)

metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}

for _, s := range schemas {
chunks, err := s.store.getMetricNameChunks(ctx, startTime, endTime,
matchers,
metricNameLabel.Value,
)
require.NoError(t, err)

// We need to check that each chunk is in the time range
for _, chunk := range chunks {
assert.False(t, chunk.From.After(endTime))
assert.False(t, chunk.Through.Before(startTime))
samples, err := chunk.Samples(chunk.From, chunk.Through)
assert.NoError(t, err)
assert.Equal(t, 1, len(samples))
// TODO verify chunk contents
}
// We need to check that each chunk is in the time range
for _, chunk := range chunks {
assert.False(t, chunk.From.After(endTime))
assert.False(t, chunk.Through.Before(startTime))
samples, err := chunk.Samples(chunk.From, chunk.Through)
assert.NoError(t, err)
assert.Equal(t, 1, len(samples))
// TODO verify chunk contents
}

// And check we got all the chunks we want
numChunks := (end / chunkLen) - (start / chunkLen) + 1
assert.Equal(t, int(numChunks), len(chunks), s.name)
}
// And check we got all the chunks we want
numChunks := (end / chunkLen) - (start / chunkLen) + 1
assert.Equal(t, int(numChunks), len(chunks))
}
})
}
}

func TestChunkStoreLeastRead(t *testing.T) {
// Test we don't read too much from the index
ctx := user.InjectOrgID(context.Background(), userID)
store := newTestChunkStore(t, StoreConfig{
schemaFactory: v6Schema,
QueryChunkLimit: 2e6,
})
store := newTestChunkStore(t, v6Schema, newStore)

// Put 24 chunks 1hr chunks in the store
const chunkLen = 60 // in seconds
Expand Down Expand Up @@ -492,14 +455,12 @@ func TestChunkStoreLeastRead(t *testing.T) {

startTime := model.TimeFromUnix(start)
endTime := model.TimeFromUnix(end)
matchers := []*labels.Matcher{
mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"),
}

metricNameLabel := mustNewLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
matchers := []*labels.Matcher{mustNewLabelMatcher(labels.MatchEqual, "bar", "baz")}

chunks, err := store.getMetricNameChunks(ctx, startTime, endTime,
matchers,
metricNameLabel.Value,
)
chunks, err := store.Get(ctx, startTime, endTime, matchers...)
if err != nil {
t.Fatal(t, err)
}
Expand Down
Loading