Skip to content

feat: support customization of sharding labels #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions pkg/apis/ctrlmesh/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package ctrlmesh

import (
"flag"
)

// Environments
const (
EnvEnableWebhookServer = "ENABLE_WEBHOOK_SERVER"
Expand All @@ -28,9 +32,6 @@ const (
// Labels
const (
CtrlmeshControlPrefix = "ctrlmesh.kusionstack.io/"
CtrlmeshShardHashKey = "ctrlmesh.kusionstack.io/shard-hash"
CtrlmeshControlKey = "ctrlmesh.kusionstack.io/control"
CtrlmeshNamespaceKey = "ctrlmesh.kusionstack.io/namespace"
CtrlmeshIgnoreWebhookLabel = "ctrlmesh.kusionstack.io/ignore-webhook"
CtrlmeshIgnoreValidateLabel = "ctrlmesh.kusionstack.io/ignore-validate"
CtrlmeshDefaultReplicasLabel = "ctrlmesh.kusionstack.io/default-replicas"
Expand Down Expand Up @@ -62,7 +63,42 @@ const (
ProtectFinalizer = "finalizer.ctrlmesh.kusionstack.io/protected"
)

// Name
const (
ShardingConfigMapName = "ctrlmesh-sharding-config"
var (
shardHashLabel string
meshControlLabel string
namespaceLabel string
)

func init() {
flag.StringVar(&shardHashLabel, "label-shard-hash", "ctrlmesh.kusionstack.io/shard-hash", "The sharding hash label.")
flag.StringVar(&meshControlLabel, "label-mesh-control", "ctrlmesh.kusionstack.io/control", "The controller mesh control label.")
flag.StringVar(&namespaceLabel, "label-namespace", "ctrlmesh.kusionstack.io/namespace", "The namespace label.")
}

func ShardHashLabel() string {
return shardHashLabel
}

func MeshControlLabel() string {
return meshControlLabel
}

func NamespaceShardLabel() string {
return namespaceLabel
}

func IsControlledByMesh(labels map[string]string) bool {
if labels == nil {
return false
}
val, ok := labels[MeshControlLabel()]
return ok && val == "true"
}

func ShouldSyncLabels() []string {
return []string{
ShardHashLabel(),
MeshControlLabel(),
NamespaceShardLabel(),
}
}
14 changes: 8 additions & 6 deletions pkg/cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")

restConfigQPS = flag.Int("rest-config-qps", 30, "QPS of rest config.")
restConfigBurst = flag.Int("rest-config-burst", 50, "Burst of rest config.")
restConfigQPS = flag.Int("rest-config-qps", 30, "QPS of rest config.")
restConfigBurst = flag.Int("rest-config-burst", 50, "Burst of rest config.")
enablePatchRunnable = flag.Bool("enable-patch-runnable", true, "")
)

func init() {
Expand Down Expand Up @@ -161,10 +162,11 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "CircuitBreaker")
os.Exit(1)
}

if err = patchrunnable.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create runnable", "runnable", "PatchRunnable")
os.Exit(1)
if *enablePatchRunnable {
if err = patchrunnable.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create runnable", "runnable", "PatchRunnable")
os.Exit(1)
}
}
}()

Expand Down
26 changes: 13 additions & 13 deletions pkg/manager/controllers/patchrunnable/labelpatch_runnable.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,25 +295,25 @@ func updateLabel(ns *v1.Namespace) (bool, error) {

nsHash := strconv.Itoa(rand.Hash(ns.Name, constants.DefaultShardingSize))

if _, ok := ns.Labels[ctrlmesh.CtrlmeshControlKey]; !ok {
if _, exist := ns.Labels[ctrlmesh.CtrlmeshShardHashKey]; exist {
return false, fmt.Errorf("label %s already exist but can not find %s", ctrlmesh.CtrlmeshShardHashKey, ctrlmesh.CtrlmeshControlKey)
if _, ok := ns.Labels[ctrlmesh.MeshControlLabel()]; !ok {
if _, exist := ns.Labels[ctrlmesh.ShardHashLabel()]; exist {
return false, fmt.Errorf("label %s already exist but can not find %s", ctrlmesh.ShardHashLabel(), ctrlmesh.MeshControlLabel())
}
if _, exist := ns.Labels[ctrlmesh.CtrlmeshNamespaceKey]; exist {
return false, fmt.Errorf("label %s already exist but can not find %s", ctrlmesh.CtrlmeshNamespaceKey, ctrlmesh.CtrlmeshControlKey)
if _, exist := ns.Labels[ctrlmesh.NamespaceShardLabel()]; exist {
return false, fmt.Errorf("label %s already exist but can not find %s", ctrlmesh.NamespaceShardLabel(), ctrlmesh.MeshControlLabel())
}
ns.Labels[ctrlmesh.CtrlmeshControlKey] = "true"
ns.Labels[ctrlmesh.CtrlmeshNamespaceKey] = ns.Name
ns.Labels[ctrlmesh.CtrlmeshShardHashKey] = nsHash
ns.Labels[ctrlmesh.MeshControlLabel()] = "true"
ns.Labels[ctrlmesh.NamespaceShardLabel()] = ns.Name
ns.Labels[ctrlmesh.ShardHashLabel()] = nsHash
return true, nil
} else {
if val, exist := ns.Labels[ctrlmesh.CtrlmeshShardHashKey]; !exist || nsHash != val {
ns.Labels[ctrlmesh.CtrlmeshNamespaceKey] = ns.Name
ns.Labels[ctrlmesh.CtrlmeshShardHashKey] = nsHash
if val, exist := ns.Labels[ctrlmesh.ShardHashLabel()]; !exist || nsHash != val {
ns.Labels[ctrlmesh.NamespaceShardLabel()] = ns.Name
ns.Labels[ctrlmesh.ShardHashLabel()] = nsHash
return true, nil
}
if val, exist := ns.Labels[ctrlmesh.CtrlmeshNamespaceKey]; !exist || val != ns.Name {
ns.Labels[ctrlmesh.CtrlmeshNamespaceKey] = ns.Name
if val, exist := ns.Labels[ctrlmesh.NamespaceShardLabel()]; !exist || val != ns.Name {
ns.Labels[ctrlmesh.NamespaceShardLabel()] = ns.Name
return true, nil
}
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/manager/controllers/patchrunnable/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"flag"
"reflect"
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -101,12 +100,13 @@ type ResourceConfig struct {

func getShardingLabel(obj client.Object) map[string]string {
shardingLabels := map[string]string{}
if beNil(obj) || obj.GetLabels() == nil {
if beNil(obj) || obj.GetLabels() == nil || !ctrlmesh.IsControlledByMesh(obj.GetLabels()) {
return shardingLabels
}
for k, v := range obj.GetLabels() {
if strings.HasPrefix(k, ctrlmesh.CtrlmeshControlPrefix) {
shardingLabels[k] = v
lbs := obj.GetLabels()
for _, k := range ctrlmesh.ShouldSyncLabels() {
if val, ok := lbs[k]; ok {
shardingLabels[k] = val
}
}
return shardingLabels
Expand All @@ -119,6 +119,7 @@ func beNil(a interface{}) bool {
switch reflect.TypeOf(a).Kind() {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
return reflect.ValueOf(a).IsNil()
default:
return false
}
return false
}
10 changes: 5 additions & 5 deletions pkg/manager/controllers/shardingconfigserver/auto_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ func genCanaryLimits(canaryConfig *ctrlmeshv1alpha1.CanaryConfig, originLimiter
}
if len(canaryConfig.InNamespaces) > 0 {
newSel.Selector.MatchExpressions = append(newSel.Selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: ctrlmesh.CtrlmeshNamespaceKey,
Key: ctrlmesh.NamespaceShardLabel(),
Operator: metav1.LabelSelectorOpIn,
Values: canaryConfig.InNamespaces,
})
}
if len(canaryConfig.InShardHash) > 0 {
newSel.Selector.MatchExpressions = append(newSel.Selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: ctrlmesh.CtrlmeshShardHashKey,
Key: ctrlmesh.ShardHashLabel(),
Operator: metav1.LabelSelectorOpIn,
Values: canaryConfig.InShardHash,
})
Expand All @@ -214,20 +214,20 @@ func genShardingGlobalLimits(root *ctrlmeshv1alpha1.ShardingConfig, batch []stri
}
if canaryConfig != nil && len(canaryConfig.InNamespaces) > 0 && *root.Spec.Root.Canary.Replicas > 0 {
newSel.Selector.MatchExpressions = append(newSel.Selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: ctrlmesh.CtrlmeshNamespaceKey,
Key: ctrlmesh.NamespaceShardLabel(),
Operator: metav1.LabelSelectorOpNotIn,
Values: canaryConfig.InNamespaces,
})
}
if canaryConfig != nil && len(canaryConfig.InShardHash) > 0 && *root.Spec.Root.Canary.Replicas > 0 {
newSel.Selector.MatchExpressions = append(newSel.Selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: ctrlmesh.CtrlmeshShardHashKey,
Key: ctrlmesh.ShardHashLabel(),
Operator: metav1.LabelSelectorOpNotIn,
Values: canaryConfig.InShardHash,
})
}
newSel.Selector.MatchExpressions = append(newSel.Selector.MatchExpressions, metav1.LabelSelectorRequirement{
Key: ctrlmesh.CtrlmeshShardHashKey,
Key: ctrlmesh.ShardHashLabel(),
Operator: metav1.LabelSelectorOpIn,
Values: batch,
})
Expand Down
12 changes: 6 additions & 6 deletions pkg/webhook/ns/namespace_mutating_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,17 @@ func (h *MutatingHandler) Handle(ctx context.Context, req admission.Request) adm

func (h *MutatingHandler) shouldUpdateNs(ns *v1.Namespace) (shouldUpdate bool) {
shouldUpdate = false
if _, exist := ns.Labels[ctrlmesh.CtrlmeshControlKey]; !exist {
ns.Labels[ctrlmesh.CtrlmeshControlKey] = "true"
if _, exist := ns.Labels[ctrlmesh.MeshControlLabel()]; !exist {
ns.Labels[ctrlmesh.MeshControlLabel()] = "true"
shouldUpdate = true
}
if val, exist := ns.Labels[ctrlmesh.CtrlmeshNamespaceKey]; !exist || val != ns.Name {
ns.Labels[ctrlmesh.CtrlmeshNamespaceKey] = ns.Name
if val, exist := ns.Labels[ctrlmesh.NamespaceShardLabel()]; !exist || val != ns.Name {
ns.Labels[ctrlmesh.NamespaceShardLabel()] = ns.Name
shouldUpdate = true
}
nsHash := strconv.Itoa(rand.Hash(ns.Name, constants.DefaultShardingSize))
if val, exist := ns.Labels[ctrlmesh.CtrlmeshShardHashKey]; !exist || nsHash != val {
ns.Labels[ctrlmesh.CtrlmeshShardHashKey] = nsHash
if val, exist := ns.Labels[ctrlmesh.ShardHashLabel()]; !exist || nsHash != val {
ns.Labels[ctrlmesh.ShardHashLabel()] = nsHash
shouldUpdate = true
}
return shouldUpdate
Expand Down
27 changes: 15 additions & 12 deletions pkg/webhook/resources/resource_mutating_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"encoding/json"
"net/http"
"strings"

"gomodules.xyz/jsonpatch/v2"
admissionv1 "k8s.io/api/admission/v1"
Expand All @@ -34,6 +33,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"

"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh"
)

var (
Expand Down Expand Up @@ -75,18 +76,10 @@ func (r *MutatingHandler) Handle(ctx context.Context, req admission.Request) adm
return admission.Errored(http.StatusBadRequest, err)
}
}
patchLabel := map[string]string{}
if ns.Labels != nil {
for key, val := range ns.Labels {
if r.isSelecterKey(key) {
patchLabel[key] = val
}
}
}
patchLabel := r.getSyncLabels(ns.Labels)
if len(patchLabel) == 0 {
return admission.Allowed("")
}

marshalled, err := json.Marshal(&PatchMeta{&metav1.ObjectMeta{Labels: patchLabel}})
if err != nil {
klog.Errorf("meta marshal error, %s", err)
Expand Down Expand Up @@ -119,8 +112,18 @@ func (r *MutatingHandler) getNamespaceWithRetry(c client.Client, ctx context.Con
return nil
}

func (r *MutatingHandler) isSelecterKey(key string) bool {
return strings.HasPrefix(key, SelectorPrefix)
func (r *MutatingHandler) getSyncLabels(labels map[string]string) map[string]string {
syncLabels := ctrlmesh.ShouldSyncLabels()
res := map[string]string{}
if !ctrlmesh.IsControlledByMesh(labels) {
return res
}
for _, l := range syncLabels {
if val, ok := labels[l]; ok {
res[l] = val
}
}
return res
}

func (r *MutatingHandler) InjectDecoder(d *admission.Decoder) error {
Expand Down