From 3daa05e549e288f17bca1c574cb0914013a6c21c Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Thu, 18 May 2017 15:32:47 +0100 Subject: [PATCH 1/4] Create v8Schema with series index --- pkg/chunk/chunk_store_test.go | 2 ++ pkg/chunk/inmemory_storage_client.go | 5 ++-- pkg/chunk/schema.go | 41 ++++++++++++++++++++++++++ pkg/chunk/schema_config.go | 8 +++++ pkg/chunk/schema_test.go | 44 ++++++++++++++++++++++++++++ pkg/chunk/schema_util.go | 22 ++++++++++++++ pkg/chunk/schema_util_test.go | 27 +++++++++++++++++ 7 files changed, 147 insertions(+), 2 deletions(-) diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index d3a5a882e5..10fd2eaff8 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -57,6 +57,7 @@ func TestChunkStore(t *testing.T) { {"v5 schema", v5Schema}, {"v6 schema", v6Schema}, {"v7 schema", v7Schema}, + {"v8 schema", v8Schema}, } nameMatcher := mustNewLabelMatcher(metric.Equal, model.MetricNameLabel, "foo") @@ -282,6 +283,7 @@ func TestChunkStoreRandom(t *testing.T) { {name: "v5 schema", fn: v5Schema}, {name: "v6 schema", fn: v6Schema}, {name: "v7 schema", fn: v7Schema}, + {name: "v8 schema", fn: v8Schema}, } for i := range schemas { diff --git a/pkg/chunk/inmemory_storage_client.go b/pkg/chunk/inmemory_storage_client.go index ccc1256e80..66fdfadcd7 100644 --- a/pkg/chunk/inmemory_storage_client.go +++ b/pkg/chunk/inmemory_storage_client.go @@ -139,9 +139,10 @@ func (m *MockStorage) BatchWrite(_ context.Context, batch WriteBatch) error { items = append(items, mockItem{}) copy(items[i+1:], items[i:]) } else { - // Return error if duplicate write and not metric name entry + // Return error if duplicate write and not metric name entry or series entry itemComponents := decodeRangeKey(items[i].rangeValue) - if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) { + if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) && + !bytes.Equal(itemComponents[3], seriesRangeKeyV1) { return fmt.Errorf("Dupe write") } } diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index 298c4bedb3..b8cc7e3e7e 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -2,6 +2,8 @@ package chunk import ( "crypto/sha1" + "encoding/binary" + "encoding/json" "errors" "fmt" "strings" @@ -17,6 +19,7 @@ var ( chunkTimeRangeKeyV4 = []byte{'4'} chunkTimeRangeKeyV5 = []byte{'5'} metricNameRangeKeyV1 = []byte{'6'} + seriesRangeKeyV1 = []byte{'7'} ) // Errors @@ -132,6 +135,14 @@ func v7Schema(cfg SchemaConfig) Schema { } } +// v8 schema is an extension of v6, with support for a labelset/series index +func v8Schema(cfg SchemaConfig) Schema { + return schema{ + cfg.dailyBuckets, + v8Entries{}, + } +} + // schema implements Schema given a bucketing function and and set of range key callbacks type schema struct { buckets func(from, through model.Time, userID string) []Bucket @@ -536,3 +547,33 @@ func (v7Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { }, }, nil } + +// v8Entries is the same as v7Entries however with a series index instead of a metric name index +type v8Entries struct { + v6Entries +} + +func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) { + indexEntries, err := entries.v6Entries.GetWriteEntries(bucket, metricName, labels, chunkID) + if err != nil { + return nil, err + } + + fingerprintBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(fingerprintBytes, uint64(labels.Fingerprint())) + + seriesBytes, err := json.Marshal(labels) + if err != nil { + return nil, err + } + + // Add IndexEntry for series with userID:bigBucket HashValue + indexEntries = append(indexEntries, IndexEntry{ + TableName: bucket.tableName, + HashValue: bucket.hashKey, + RangeValue: encodeRangeKey(encodeBase64Bytes(fingerprintBytes), nil, nil, seriesRangeKeyV1), + Value: seriesBytes, + }) + + return indexEntries, nil +} diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index 3c69150f60..e39947ec56 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -41,6 +41,9 @@ type SchemaConfig struct { // After this time, we will read and write v7 schemas. V7SchemaFrom util.DayValue + + // After this time, we will read and write v8 schemas. + V8SchemaFrom util.DayValue } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -53,6 +56,7 @@ func (cfg *SchemaConfig) RegisterFlags(f *flag.FlagSet) { f.Var(&cfg.V5SchemaFrom, "dynamodb.v5-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v5 schema.") f.Var(&cfg.V6SchemaFrom, "dynamodb.v6-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v6 schema.") f.Var(&cfg.V7SchemaFrom, "dynamodb.v7-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v7 schema.") + f.Var(&cfg.V8SchemaFrom, "dynamodb.v8-schema-from", "The date (in the format YYYY-MM-DD) after which we enable v8 schema.") } func (cfg *SchemaConfig) tableForBucket(bucketStart int64) string { @@ -177,6 +181,10 @@ func newCompositeSchema(cfg SchemaConfig) (Schema, error) { schemas = append(schemas, compositeSchemaEntry{cfg.V7SchemaFrom.Time, v7Schema(cfg)}) } + if cfg.V8SchemaFrom.IsSet() { + schemas = append(schemas, compositeSchemaEntry{cfg.V8SchemaFrom.Time, v8Schema(cfg)}) + } + if !sort.IsSorted(byStart(schemas)) { return nil, fmt.Errorf("schemas not in time-sorted order") } diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index 2af6663e57..cbbb71da1e 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -4,6 +4,8 @@ import ( "bytes" "crypto/sha1" "encoding/base64" + "encoding/binary" + "encoding/json" "fmt" "reflect" "sort" @@ -236,6 +238,7 @@ func TestSchemaHashKeys(t *testing.T) { const ( MetricNameRangeValue = iota + 1 ChunkTimeRangeValue + SeriesRangeValue ) // parseRangeValueType returns the type of rangeValue @@ -269,6 +272,10 @@ func parseRangeValueType(rangeValue []byte) (int, error) { case bytes.Equal(components[3], metricNameRangeKeyV1): return MetricNameRangeValue, nil + // series range values + case bytes.Equal(components[3], seriesRangeKeyV1): + return SeriesRangeValue, nil + default: return 0, fmt.Errorf("unrecognised range value type. version: '%v'", string(components[3])) } @@ -295,6 +302,7 @@ func TestSchemaRangeKey(t *testing.T) { tsRangeKeys = v5Schema(cfg) v6RangeKeys = v6Schema(cfg) v7RangeKeys = v7Schema(cfg) + v8RangeKeys = v8Schema(cfg) metric = model.Metric{ model.MetricNameLabel: metricName, "bar": "bary", @@ -303,6 +311,11 @@ func TestSchemaRangeKey(t *testing.T) { fooSha1Hash = sha1.Sum([]byte("foo")) ) + metricFingerprintBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(metricFingerprintBytes, uint64(metric.Fingerprint())) + metricBytes, err := json.Marshal(metric) + require.NoError(t, err) + mkEntries := func(hashKey string, callback func(labelName model.LabelName, labelValue model.LabelValue) ([]byte, []byte)) []IndexEntry { result := []IndexEntry{} for labelName, labelValue := range metric { @@ -434,6 +447,34 @@ func TestSchemaRangeKey(t *testing.T) { }, }, }, + { + v8RangeKeys, + []IndexEntry{ + { + TableName: table, + HashValue: "userid:d0", + RangeValue: append(encodeBase64Bytes(metricFingerprintBytes), []byte("\x00\x00\x007\x00")...), + Value: metricBytes, + }, + { + TableName: table, + HashValue: "userid:d0:foo", + RangeValue: []byte("0036ee7f\x00\x00chunkID\x003\x00"), + }, + { + TableName: table, + HashValue: "userid:d0:foo:bar", + RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), + Value: []byte("bary"), + }, + { + TableName: table, + HashValue: "userid:d0:foo:baz", + RangeValue: []byte("0036ee7f\x00\x00chunkID\x005\x00"), + Value: []byte("bazy"), + }, + }, + }, } { t.Run(fmt.Sprintf("TestSchameRangeKey[%d]", i), func(t *testing.T) { have, err := tc.Schema.GetWriteEntries( @@ -462,6 +503,9 @@ func TestSchemaRangeKey(t *testing.T) { case ChunkTimeRangeValue: _, _, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) require.NoError(t, err) + case SeriesRangeValue: + _, err := parseSeriesRangeValue(entry.RangeValue, entry.Value) + require.NoError(t, err) } } }) diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index 6ffb92778f..ca69449583 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/binary" "encoding/hex" + "encoding/json" "fmt" @@ -96,6 +97,27 @@ func parseMetricNameRangeValue(rangeValue []byte, value []byte) (model.LabelValu } } +// parseSeriesRangeValue returns the model.Metric stored in metric fingerprint +// range values. +func parseSeriesRangeValue(rangeValue []byte, value []byte) (model.Metric, error) { + components := decodeRangeKey(rangeValue) + switch { + case len(components) < 4: + return nil, fmt.Errorf("invalid metric range value: %x", rangeValue) + + // v1 has the metric name as the value (with the hash as the first component) + case bytes.Equal(components[3], seriesRangeKeyV1): + var series model.Metric + if err := json.Unmarshal(value, &series); err != nil { + return nil, err + } + return series, nil + + default: + return nil, fmt.Errorf("unrecognised seriesRangeKey version: '%v'", string(components[3])) + } +} + // parseChunkTimeRangeValue returns the chunkKey, labelValue and metadataInIndex // for chunk time range values. func parseChunkTimeRangeValue(rangeValue []byte, value []byte) (string, model.LabelValue, bool, error) { diff --git a/pkg/chunk/schema_util_test.go b/pkg/chunk/schema_util_test.go index 660e72fbab..3e9d95748e 100644 --- a/pkg/chunk/schema_util_test.go +++ b/pkg/chunk/schema_util_test.go @@ -2,6 +2,8 @@ package chunk import ( "bytes" + "encoding/binary" + "encoding/json" "math" "math/rand" "testing" @@ -83,3 +85,28 @@ func TestParseMetricNameRangeValue(t *testing.T) { assert.Equal(t, model.LabelValue(c.expMetricName), metricName) } } + +func TestParseSeriesRangeValue(t *testing.T) { + metric := model.Metric{ + model.MetricNameLabel: "foo", + "bar": "bary", + "baz": "bazy", + } + + fingerprintBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(fingerprintBytes, uint64(metric.Fingerprint())) + metricBytes, err := json.Marshal(metric) + require.NoError(t, err) + + for _, c := range []struct { + encoded []byte + value []byte + expMetric model.Metric + }{ + {encodeRangeKey(fingerprintBytes, nil, nil, seriesRangeKeyV1), metricBytes, metric}, + } { + metric, err := parseSeriesRangeValue(c.encoded, c.value) + require.NoError(t, err) + assert.Equal(t, c.expMetric, metric) + } +} From 27e1b56df271f450dce0719d7c93d940aa423225 Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Thu, 18 May 2017 15:50:45 +0100 Subject: [PATCH 2/4] Update comment --- pkg/chunk/schema_util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index ca69449583..b164f25d03 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -105,7 +105,7 @@ func parseSeriesRangeValue(rangeValue []byte, value []byte) (model.Metric, error case len(components) < 4: return nil, fmt.Errorf("invalid metric range value: %x", rangeValue) - // v1 has the metric name as the value (with the hash as the first component) + // v1 has the encoded json metric as the value (with the fingerprint as the first component) case bytes.Equal(components[3], seriesRangeKeyV1): var series model.Metric if err := json.Unmarshal(value, &series); err != nil { From 91f7fb1326cf10ad3ff6391d18070ed72d71578c Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Fri, 19 May 2017 12:12:37 +0100 Subject: [PATCH 3/4] Use sha256 for seriesID --- pkg/chunk/schema.go | 7 ++----- pkg/chunk/schema_test.go | 6 ++---- pkg/chunk/schema_util.go | 6 ++++++ pkg/chunk/schema_util_test.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index b8cc7e3e7e..c3b0735ace 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -2,7 +2,6 @@ package chunk import ( "crypto/sha1" - "encoding/binary" "encoding/json" "errors" "fmt" @@ -559,9 +558,7 @@ func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelVa return nil, err } - fingerprintBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(fingerprintBytes, uint64(labels.Fingerprint())) - + seriesID := metricSeriesID(labels) seriesBytes, err := json.Marshal(labels) if err != nil { return nil, err @@ -571,7 +568,7 @@ func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelVa indexEntries = append(indexEntries, IndexEntry{ TableName: bucket.tableName, HashValue: bucket.hashKey, - RangeValue: encodeRangeKey(encodeBase64Bytes(fingerprintBytes), nil, nil, seriesRangeKeyV1), + RangeValue: encodeRangeKey([]byte(seriesID), nil, nil, seriesRangeKeyV1), Value: seriesBytes, }) diff --git a/pkg/chunk/schema_test.go b/pkg/chunk/schema_test.go index cbbb71da1e..0178a843dc 100644 --- a/pkg/chunk/schema_test.go +++ b/pkg/chunk/schema_test.go @@ -4,7 +4,6 @@ import ( "bytes" "crypto/sha1" "encoding/base64" - "encoding/binary" "encoding/json" "fmt" "reflect" @@ -311,8 +310,7 @@ func TestSchemaRangeKey(t *testing.T) { fooSha1Hash = sha1.Sum([]byte("foo")) ) - metricFingerprintBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(metricFingerprintBytes, uint64(metric.Fingerprint())) + seriesID := metricSeriesID(metric) metricBytes, err := json.Marshal(metric) require.NoError(t, err) @@ -453,7 +451,7 @@ func TestSchemaRangeKey(t *testing.T) { { TableName: table, HashValue: "userid:d0", - RangeValue: append(encodeBase64Bytes(metricFingerprintBytes), []byte("\x00\x00\x007\x00")...), + RangeValue: append([]byte(seriesID), []byte("\x00\x00\x007\x00")...), Value: metricBytes, }, { diff --git a/pkg/chunk/schema_util.go b/pkg/chunk/schema_util.go index b164f25d03..b7503e2583 100644 --- a/pkg/chunk/schema_util.go +++ b/pkg/chunk/schema_util.go @@ -2,6 +2,7 @@ package chunk import ( "bytes" + "crypto/sha256" "encoding/base64" "encoding/binary" "encoding/hex" @@ -12,6 +13,11 @@ import ( "github.com/prometheus/common/model" ) +func metricSeriesID(m model.Metric) string { + h := sha256.Sum256([]byte(m.String())) + return string(encodeBase64Bytes(h[:])) +} + func encodeRangeKey(ss ...[]byte) []byte { length := 0 for _, s := range ss { diff --git a/pkg/chunk/schema_util_test.go b/pkg/chunk/schema_util_test.go index 3e9d95748e..d4cc2cab28 100644 --- a/pkg/chunk/schema_util_test.go +++ b/pkg/chunk/schema_util_test.go @@ -13,6 +13,34 @@ import ( "github.com/stretchr/testify/require" ) +func TestMetricSeriesID(t *testing.T) { + for _, c := range []struct { + metric model.Metric + expected string + }{ + { + model.Metric{model.MetricNameLabel: "foo"}, + "LCa0a2j/xo/5m0U8HTBBNBNCLXBkg7+g+YpeiGJm564", + }, + { + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + "toms": "code", + "flip": "flop", + }, + "KrbXMezYneba+o7wfEdtzOdAWhbfWcDrlVfs1uOCX3M", + }, + { + model.Metric{}, + "RBNvo1WzZ4oRRq0W9+hknpT7T8If536DEMBg9hyq/4o", + }, + } { + seriesID := metricSeriesID(c.metric) + assert.Equal(t, c.expected, seriesID) + } +} + func TestSchemaTimeEncoding(t *testing.T) { assert.Equal(t, uint32(0), decodeTime(encodeTime(0)), "0") assert.Equal(t, uint32(math.MaxUint32), decodeTime(encodeTime(math.MaxUint32)), "MaxUint32") From 83bb69d02a11ef471aebf9408c6a1f1190a6a9da Mon Sep 17 00:00:00 2001 From: Aaron Kirkbride Date: Wed, 24 May 2017 17:59:35 +0100 Subject: [PATCH 4/4] Add GetReadQueries for v8Schema --- pkg/chunk/schema.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index c3b0735ace..f50e6e10f8 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -574,3 +574,12 @@ func (entries v8Entries) GetWriteEntries(bucket Bucket, metricName model.LabelVa return indexEntries, nil } + +func (v8Entries) GetReadQueries(bucket Bucket) ([]IndexQuery, error) { + return []IndexQuery{ + { + TableName: bucket.tableName, + HashValue: bucket.hashKey, + }, + }, nil +}