Skip to content

fix: period_config table prefix key schema check #17265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
errInvalidTableName = errors.New("invalid table name")
errInvalidTablePrefixValue = errors.New("table prefix can not contain a path delimiter")
errConfigFileNotSet = errors.New("schema config file needs to be set")
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
Expand Down Expand Up @@ -579,15 +580,15 @@ func (cfg IndexPeriodicTableConfig) MarshalYAML() (interface{}, error) {

func ValidatePathPrefix(prefix string) error {
if prefix == "" {
return errors.New("prefix must be set")
return errors.New("path prefix must be set")
} else if strings.Contains(prefix, "\\") {
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
// We just need to always use `/` as a path separator.
return fmt.Errorf("prefix should only have '%s' as a path separator", pathPrefixDelimiter)
return fmt.Errorf("path prefix should only have '%s' as a path separator", pathPrefixDelimiter)
} else if strings.HasPrefix(prefix, pathPrefixDelimiter) {
return errors.New("prefix should never start with a path separator i.e '/'")
return errors.New("path prefix should never start with a path separator i.e '/'")
} else if !strings.HasSuffix(prefix, pathPrefixDelimiter) {
return errors.New("prefix should end with a path separator i.e '/'")
return errors.New("path prefix should end with a path separator i.e '/'")
}

return nil
Expand Down Expand Up @@ -639,6 +640,10 @@ func (cfg PeriodicTableConfig) Validate() error {
return errInvalidTablePeriod
}

if strings.Index(cfg.Prefix, pathPrefixDelimiter) > -1 {
return errInvalidTablePrefixValue
}

return nil
}

Expand Down
16 changes: 16 additions & 0 deletions pkg/storage/config/schema_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,22 @@ func TestSchemaConfig_Validate(t *testing.T) {
},
err: nil,
},
"should fail on index table prefix ending in path delimiter": {
config: &SchemaConfig{
Configs: []PeriodConfig{
{
Schema: "v13",
IndexTables: IndexPeriodicTableConfig{
PathPrefix: "index/",
PeriodicTableConfig: PeriodicTableConfig{
Prefix: "v13/key_",
},
},
},
},
},
err: errInvalidTablePrefixValue,
},
"should pass on index and chunk table period set to zero (no period tables)": {
config: &SchemaConfig{
Configs: []PeriodConfig{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s

prefix = strings.TrimSuffix(prefix, delimiter)
ss := strings.Split(prefix, delimiter)
// should only accept something like: table or table/userid
if len(ss) > 2 {
return nil, nil, fmt.Errorf("invalid prefix %s", prefix)
}
Expand Down Expand Up @@ -304,19 +305,29 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient
}

ss := strings.Split(object.Key, delimiter)
if len(ss) < 2 || len(ss) > 3 {
return fmt.Errorf("invalid key: %s", object.Key)

// db.gz
if len(ss) < 2 {
return fmt.Errorf("bare key without context: %s", object.Key)
}

if len(ss) == 2 {
// table/db.gz
if len(ss) < 3 {
t.commonObjects = append(t.commonObjects, object)
} else {
continue
}

// table/userid/db.gz
if len(ss) < 4 {
userID := ss[1]
if len(t.userObjects[userID]) == 0 {
t.userIDs = append(t.userIDs, client.StorageCommonPrefix(path.Join(t.name, userID)))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this use path.Join?
This might be doing some golang-fu here, but I think this should use t.name + delimiter + userID or something like that?

}
t.userObjects[userID] = append(t.userObjects[userID], object)
continue
}

return fmt.Errorf("key too long: %s", object.Key)
}

t.cacheBuiltAt = time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,106 @@ func TestCachedObjectClient_List(t *testing.T) {
require.Equal(t, objectsFromListCall, objects)
})

t.Run("supports path prefixed clients", func(t *testing.T) {
ctx := context.Background()

path_prefix := "my/long/path/prefix/"
objectsInStorage := []string{
path_prefix + "table1/db.gz",
path_prefix + "table2/db1.gz",
path_prefix + "table2/db2.gz",
path_prefix + "table3/user1/db.gz",
path_prefix + "table3/user2/db.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used path_prefix here because all the references to NewPrefixedObjectClient seemed to use the PathPrefix from the configuration.
These tests should match the usage more closly now

cachedObjectClient := newCachedObjectClient(prefixedClient)

objects, _, err := cachedObjectClient.List(ctx, "table1/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table1/db.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "table2/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table2/db1.gz", "table2/db2.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "table3/user1", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table3/user1/db.gz"}, objectKeys(objects))
})

t.Run("supports prefixed clients", func(t *testing.T) {
ctx := context.Background()

prefix := "my/amazing/prefix/"
path_prefix := "/"
prefix := "prefix_term_"
objectsInStorage := []string{
path_prefix + prefix + "table1/db.gz",
path_prefix + prefix + "table2/db1.gz",
path_prefix + prefix + "table2/db2.gz",
path_prefix + prefix + "table3/user1/db.gz",
path_prefix + prefix + "table3/user2/db.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
cachedObjectClient := newCachedObjectClient(prefixedClient)

objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects))
})

t.Run("supports both path prefixed and prefix clients", func(t *testing.T) {
ctx := context.Background()

path_prefix := "my/long/path/prefix/"
prefix := "prefix_term_"
objectsInStorage := []string{
prefix + "table1/db.gz",
prefix + "table2/db.gz",
prefix + "table2/db2.gz",
path_prefix + prefix + "table1/db.gz",
path_prefix + prefix + "table2/db1.gz",
path_prefix + prefix + "table2/db2.gz",
path_prefix + prefix + "table3/user1/db.gz",
path_prefix + prefix + "table3/user2/db.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix)
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
cachedObjectClient := newCachedObjectClient(prefixedClient)

objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false)
objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects))
require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects))

objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false)
require.Nil(t, err)
require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects))
})

t.Run("does not support prefix with delimiter", func(t *testing.T) {
ctx := context.Background()

path_prefix := "my/long/path/prefix/"
prefix := "prefix/term/with/delimiter_"
objectsInStorage := []string{
path_prefix + prefix + "table1/db.gz",
}
objectClient := newMockObjectClient(t, objectsInStorage)
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
cachedObjectClient := newCachedObjectClient(prefixedClient)

_, _, err := cachedObjectClient.List(ctx, "prefix/term/with/delimiter_table1/", "/", false)
require.Error(t, err)
})
}

Expand Down