Skip to content

Create v8Schema with series index #430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestChunkStore(t *testing.T) {
{"v5 schema", v5Schema},
{"v6 schema", v6Schema},
{"v7 schema", v7Schema},
{"v8 schema", v8Schema},
}

nameMatcher := mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo")
Expand Down Expand Up @@ -282,6 +283,7 @@ func TestChunkStoreRandom(t *testing.T) {
{name: "v5 schema", fn: v5Schema},
{name: "v6 schema", fn: v6Schema},
{name: "v7 schema", fn: v7Schema},
{name: "v8 schema", fn: v8Schema},
}

for i := range schemas {
Expand Down
5 changes: 3 additions & 2 deletions pkg/chunk/inmemory_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,10 @@ func (m *MockStorage) BatchWrite(_ context.Context, batch WriteBatch) error {
items = append(items, mockItem{})
copy(items[i+1:], items[i:])
} else {
// Return error if duplicate write and not metric name entry
// Return error if duplicate write and not metric name entry or series entry
itemComponents := decodeRangeKey(items[i].rangeValue)
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) {
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) &&
!bytes.Equal(itemComponents[3], seriesRangeKeyV1) {
return fmt.Errorf("Dupe write")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should change this to error if its a duplicate within the batch, to match server behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already doing so a few lines above. I didn't remove the Dupe write check across all writes as it's still useful for checking if chunks are no duplicated.

}
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/chunk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chunk

import (
"crypto/sha1"
"encoding/json"
"errors"
"fmt"
"strings"
Expand All @@ -17,6 +18,7 @@ var (
chunkTimeRangeKeyV4 = []byte{'4'}
chunkTimeRangeKeyV5 = []byte{'5'}
metricNameRangeKeyV1 = []byte{'6'}
seriesRangeKeyV1 = []byte{'7'}
)

// Errors
Expand Down Expand Up @@ -132,6 +134,14 @@ func v7Schema(cfg SchemaConfig) Schema {
}
}

// v8 schema is an extension of v6, with support for a labelset/series index
func v8Schema(cfg SchemaConfig) Schema {
return schema{
cfg.dailyBuckets,
v8Entries{},
}
}

// schema implements Schema given a bucketing function and and set of range key callbacks
type schema struct {
buckets func(from, through model.Time, userID string) []Bucket
Expand Down Expand Up @@ -536,3 +546,40 @@ func (v7Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
},
}, nil
}

// v8Entries is the same as v7Entries however with a series index instead of a metric name index
type v8Entries struct {
v6Entries
}

func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID)
if err != nil {
return nil, err
}

seriesID := metricSeriesID(labels)
seriesBytes, err := json.Marshal(labels)
if err != nil {
return nil, err
}

// Add IndexEntry for series with userID:bigBucket HashValue
indexEntries = append(indexEntries, IndexEntry{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
RangeValue: encodeRangeKey([]byte(seriesID), nil, nil, seriesRangeKeyV1),
Value: seriesBytes,
})

return indexEntries, nil
}

func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) {
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: bucket.hashKey,
},
}, nil
}
8 changes: 8 additions & 0 deletions pkg/chunk/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type SchemaConfig struct {

// After this time, we will read and write v7 schemas.
V7SchemaFrom util.DayValue

// After this time, we will read and write v8 schemas.
V8SchemaFrom util.DayValue
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -53,6 +56,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.V5SchemaFrom, "dynamodb.v5-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v5 schema.")
f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.")
f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema.")
f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema.")
}

func (cfg *SchemaConfig) tableForBucket(bucketStart int64) string {
Expand Down Expand Up @@ -177,6 +181,10 @@ func newCompositeSchema(cfg SchemaConfig) (Schema, error) {
schemas = append(schemas, compositeSchemaEntry{cfg.V7SchemaFrom.Time, v7Schema(cfg)})
}

if cfg.V8SchemaFrom.IsSet() {
schemas = append(schemas, compositeSchemaEntry{cfg.V8SchemaFrom.Time, v8Schema(cfg)})
}

if !sort.IsSorted(byStart(schemas)) {
return nil, fmt.Errorf("schemas not in time-sorted order")
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/chunk/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"crypto/sha1"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"sort"
Expand Down Expand Up @@ -236,6 +237,7 @@ func TestSchemaHashKeys(t *testing.T) {
const (
MetricNameRangeValue = iota + 1
ChunkTimeRangeValue
SeriesRangeValue
)

// parseRangeValueType returns the type of rangeValue
Expand Down Expand Up @@ -269,6 +271,10 @@ func parseRangeValueType(rangeValue []byte) (int, error) {
case bytes.Equal(components[3], metricNameRangeKeyV1):
return MetricNameRangeValue, nil

// series range values
case bytes.Equal(components[3], seriesRangeKeyV1):
return SeriesRangeValue, nil

default:
return 0, fmt.Errorf("unrecognised range value type. version: '%v'", string(components[3]))
}
Expand All @@ -295,6 +301,7 @@ func TestSchemaRangeKey(t *testing.T) {
tsRangeKeys = v5Schema(cfg)
v6RangeKeys = v6Schema(cfg)
v7RangeKeys = v7Schema(cfg)
v8RangeKeys = v8Schema(cfg)
metric = model.Metric{
model.MetricNameLabel: metricName,
"bar": "bary",
Expand All @@ -303,6 +310,10 @@ func TestSchemaRangeKey(t *testing.T) {
fooSha1Hash = sha1.Sum([]byte("foo"))
)

seriesID := metricSeriesID(metric)
metricBytes, err := json.Marshal(metric)
require.NoError(t, err)

mkEntries := func(hashKey string, callback func(labelName model.LabelName, labelValue model.LabelValue) ([]byte, []byte)) []IndexEntry {
result := []IndexEntry{}
for labelName, labelValue := range metric {
Expand Down Expand Up @@ -434,6 +445,34 @@ func TestSchemaRangeKey(t *testing.T) {
},
},
},
{
v8RangeKeys,
[]IndexEntry{
{
TableName: table,
HashValue: "userid:d0",
RangeValue: append([]byte(seriesID), []byte("\x00\x00\x007\x00")...),
Value: metricBytes,
},
{
TableName: table,
HashValue: "userid:d0:foo",
RangeValue: []byte("0036ee7f\x00\x00chunkID\x003\x00"),
},
{
TableName: table,
HashValue: "userid:d0:foo:bar",
RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"),
Value: []byte("bary"),
},
{
TableName: table,
HashValue: "userid:d0:foo:baz",
RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"),
Value: []byte("bazy"),
},
},
},
} {
t.Run(fmt.Sprintf("TestSchameRangeKey[%d]", i), func(t *testing.T) {
have, err := tc.Schema.GetWriteEntries(
Expand Down Expand Up @@ -462,6 +501,9 @@ func TestSchemaRangeKey(t *testing.T) {
case ChunkTimeRangeValue:
_, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)
require.NoError(t, err)
case SeriesRangeValue:
_, err := parseSeriesRangeValue(entry.RangeValue, entry.Value)
require.NoError(t, err)
}
}
})
Expand Down
28 changes: 28 additions & 0 deletions pkg/chunk/schema_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package chunk

import (
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"encoding/json"

"fmt"

"github.com/prometheus/common/model"
)

func metricSeriesID(m model.Metric) string {
h := sha256.Sum256([]byte(m.String()))
return string(encodeBase64Bytes(h[:]))
}

func encodeRangeKey(ss ...[]byte) []byte {
length := 0
for _, s := range ss {
Expand Down Expand Up @@ -96,6 +103,27 @@ func parseMetricNameRangeValue(rangeValue []byte, value []byte) (model.LabelValu
}
}

// parseSeriesRangeValue returns the model.Metric stored in metric fingerprint
// range values.
func parseSeriesRangeValue(rangeValue []byte, value []byte) (model.Metric, error) {
components := decodeRangeKey(rangeValue)
switch {
case len(components) < 4:
return nil, fmt.Errorf("invalid metric range value: %x", rangeValue)

// v1 has the encoded json metric as the value (with the fingerprint as the first component)
case bytes.Equal(components[3], seriesRangeKeyV1):
var series model.Metric
if err := json.Unmarshal(value, &series); err != nil {
return nil, err
}
return series, nil

default:
return nil, fmt.Errorf("unrecognised seriesRangeKey version: '%v'", string(components[3]))
}
}

// parseChunkTimeRangeValue returns the chunkKey, labelValue and metadataInIndex
// for chunk time range values.
func parseChunkTimeRangeValue(rangeValue []byte, value []byte) (string, model.LabelValue, bool, error) {
Expand Down
55 changes: 55 additions & 0 deletions pkg/chunk/schema_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package chunk

import (
"bytes"
"encoding/binary"
"encoding/json"
"math"
"math/rand"
"testing"
Expand All @@ -11,6 +13,34 @@ import (
"github.com/stretchr/testify/require"
)

func TestMetricSeriesID(t *testing.T) {
for _, c := range []struct {
metric model.Metric
expected string
}{
{
model.Metric{model.MetricNameLabel: "foo"},
"LCa0a2j/xo/5m0U8HTBBNBNCLXBkg7+g+YpeiGJm564",
},
{
model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
"toms": "code",
"flip": "flop",
},
"KrbXMezYneba+o7wfEdtzOdAWhbfWcDrlVfs1uOCX3M",
},
{
model.Metric{},
"RBNvo1WzZ4oRRq0W9+hknpT7T8If536DEMBg9hyq/4o",
},
} {
seriesID := metricSeriesID(c.metric)
assert.Equal(t, c.expected, seriesID)
}
}

func TestSchemaTimeEncoding(t *testing.T) {
assert.Equal(t, uint32(0), decodeTime(encodeTime(0)), "0")
assert.Equal(t, uint32(math.MaxUint32), decodeTime(encodeTime(math.MaxUint32)), "MaxUint32")
Expand Down Expand Up @@ -83,3 +113,28 @@ func TestParseMetricNameRangeValue(t *testing.T) {
assert.Equal(t, model.LabelValue(c.expMetricName), metricName)
}
}

func TestParseSeriesRangeValue(t *testing.T) {
metric := model.Metric{
model.MetricNameLabel: "foo",
"bar": "bary",
"baz": "bazy",
}

fingerprintBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(fingerprintBytes, uint64(metric.Fingerprint()))
metricBytes, err := json.Marshal(metric)
require.NoError(t, err)

for _, c := range []struct {
encoded []byte
value []byte
expMetric model.Metric
}{
{encodeRangeKey(fingerprintBytes, nil, nil, seriesRangeKeyV1), metricBytes, metric},
} {
metric, err := parseSeriesRangeValue(c.encoded, c.value)
require.NoError(t, err)
assert.Equal(t, c.expMetric, metric)
}
}