Skip to content

✨ Syncer: also mutate StatefulSets and ReplicaSets #2845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/kcp-dev/logicalcluster/v3"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,7 +37,7 @@ import (

type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error)

type DeploymentMutator struct {
type PodSpecableMutator struct {
upstreamURL *url.URL
listSecrets ListSecretFunc
serviceLister listerscorev1.ServiceLister
Expand All @@ -49,18 +48,30 @@ type DeploymentMutator struct {
upsyncPods bool
}

func (dm *DeploymentMutator) GVR() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: "apps",
Version: "v1",
Resource: "deployments",
func (dm *PodSpecableMutator) GVRs() []schema.GroupVersionResource {
return []schema.GroupVersionResource{
{
Group: "apps",
Version: "v1",
Resource: "deployments",
},
{
Group: "apps",
Version: "v1",
Resource: "statefulsets",
},
{
Group: "apps",
Version: "v1",
Resource: "replicasets",
},
}
}

func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
func NewPodspecableMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
syncTargetClusterName logicalcluster.Name,
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *DeploymentMutator {
return &DeploymentMutator{
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *PodSpecableMutator {
return &PodSpecableMutator{
upstreamURL: upstreamURL,
listSecrets: secretLister,
serviceLister: serviceLister,
Expand All @@ -73,24 +84,30 @@ func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, ser
}

// Mutate applies the mutator changes to the object.
func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
var deployment appsv1.Deployment
err := runtime.DefaultUnstructuredConverter.FromUnstructured(
obj.UnstructuredContent(),
&deployment)
func (dm *PodSpecableMutator) Mutate(obj *unstructured.Unstructured) error {
podTemplateUnstr, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template")
if err != nil {
return err
}
upstreamLogicalName := logicalcluster.From(obj)
if !ok {
return fmt.Errorf("object should have a PodTemplate.Spec 'spec.template', but doesn't: %v", obj)
}

templateSpec := &deployment.Spec.Template.Spec
podTemplate := &corev1.PodTemplateSpec{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
podTemplateUnstr,
&podTemplate)
if err != nil {
return err
}
upstreamLogicalName := logicalcluster.From(obj)

desiredServiceAccountName := "default"
if templateSpec.ServiceAccountName != "" && templateSpec.ServiceAccountName != "default" {
desiredServiceAccountName = templateSpec.ServiceAccountName
if podTemplate.Spec.ServiceAccountName != "" && podTemplate.Spec.ServiceAccountName != "default" {
desiredServiceAccountName = podTemplate.Spec.ServiceAccountName
}

rawSecretList, err := dm.listSecrets(upstreamLogicalName, deployment.Namespace)
rawSecretList, err := dm.listSecrets(upstreamLogicalName, obj.GetNamespace())
if err != nil {
return fmt.Errorf("error listing secrets for workspace %s: %w", upstreamLogicalName.String(), err)
}
Expand Down Expand Up @@ -123,14 +140,14 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
}

if desiredSecretName == "" {
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, deployment.Namespace, upstreamLogicalName.String())
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, obj.GetNamespace(), upstreamLogicalName.String())
}

// Setting AutomountServiceAccountToken to false allow us to control the ServiceAccount
// VolumeMount and Volume definitions.
templateSpec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
podTemplate.Spec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
// Set to empty the serviceAccountName on podTemplate as we are not syncing the serviceAccount down to the workload cluster.
templateSpec.ServiceAccountName = ""
podTemplate.Spec.ServiceAccountName = ""

kcpExternalHost := dm.upstreamURL.Hostname()
kcpExternalPort := dm.upstreamURL.Port()
Expand Down Expand Up @@ -193,42 +210,42 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the containers
for i := range deployment.Spec.Template.Spec.Containers {
for i := range podTemplate.Spec.Containers {
for _, overrideEnv := range overrideEnvs {
templateSpec.Containers[i].Env = updateEnv(templateSpec.Containers[i].Env, overrideEnv)
podTemplate.Spec.Containers[i].Env = updateEnv(podTemplate.Spec.Containers[i].Env, overrideEnv)
}
templateSpec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.Containers[i].Env, deployment)
templateSpec.Containers[i].VolumeMounts = updateVolumeMount(templateSpec.Containers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.Containers[i].Env, obj)
podTemplate.Spec.Containers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.Containers[i].VolumeMounts, serviceAccountMount)
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Init containers
for i := range templateSpec.InitContainers {
for i := range podTemplate.Spec.InitContainers {
for _, overrideEnv := range overrideEnvs {
templateSpec.InitContainers[i].Env = updateEnv(templateSpec.InitContainers[i].Env, overrideEnv)
podTemplate.Spec.InitContainers[i].Env = updateEnv(podTemplate.Spec.InitContainers[i].Env, overrideEnv)
}
templateSpec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.InitContainers[i].Env, deployment)
templateSpec.InitContainers[i].VolumeMounts = updateVolumeMount(templateSpec.InitContainers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.InitContainers[i].Env, obj)
podTemplate.Spec.InitContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.InitContainers[i].VolumeMounts, serviceAccountMount)
}

// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Ephemeral containers
for i := range templateSpec.EphemeralContainers {
for i := range podTemplate.Spec.EphemeralContainers {
for _, overrideEnv := range overrideEnvs {
templateSpec.EphemeralContainers[i].Env = updateEnv(templateSpec.EphemeralContainers[i].Env, overrideEnv)
podTemplate.Spec.EphemeralContainers[i].Env = updateEnv(podTemplate.Spec.EphemeralContainers[i].Env, overrideEnv)
}
templateSpec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.EphemeralContainers[i].Env, deployment)
templateSpec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(templateSpec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
podTemplate.Spec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.EphemeralContainers[i].Env, obj)
podTemplate.Spec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
}

// Add the ServiceAccount volume with our overrides.
found := false
for i := range templateSpec.Volumes {
if templateSpec.Volumes[i].Name == "kcp-api-access" {
templateSpec.Volumes[i] = serviceAccountVolume
for i := range podTemplate.Spec.Volumes {
if podTemplate.Spec.Volumes[i].Name == "kcp-api-access" {
podTemplate.Spec.Volumes[i] = serviceAccountVolume
found = true
}
}
if !found {
templateSpec.Volumes = append(templateSpec.Volumes, serviceAccountVolume)
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, serviceAccountVolume)
}

// Overrides DNS to point to the workspace DNS
Expand All @@ -238,8 +255,8 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
return err // retry
}

deployment.Spec.Template.Spec.DNSPolicy = corev1.DNSNone
deployment.Spec.Template.Spec.DNSConfig = &corev1.PodDNSConfig{
podTemplate.Spec.DNSPolicy = corev1.DNSNone
podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{
Nameservers: []string{dnsIP},
Searches: []string{ // TODO(LV): from /etc/resolv.conf
obj.GetNamespace() + ".svc.cluster.local",
Expand All @@ -256,31 +273,29 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {

if dm.upsyncPods {
syncTargetKey := workloadv1alpha1.ToSyncTargetKey(dm.syncTargetClusterName, dm.syncTargetName)
labels := deployment.Spec.Template.Labels
labels := podTemplate.Labels
if labels == nil {
labels = map[string]string{}
}
labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] = string(workloadv1alpha1.ResourceStateUpsync)
labels[workloadv1alpha1.InternalDownstreamClusterLabel] = syncTargetKey
deployment.Spec.Template.Labels = labels
podTemplate.Labels = labels

// TODO(davidfestal): In the future we could add a diff annotation to transform the resource while upsyncing:
// - remove unnecessary fields we don't want leaking to upstream
// - add an owner reference to the upstream deployment
}

unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployment)
newPodTemplateUnstr, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplate)
if err != nil {
return err
}

// Set the changes back into the obj.
obj.SetUnstructuredContent(unstructured)

return nil
return unstructured.SetNestedMap(obj.Object, newPodTemplateUnstr, "spec", "template")
}

func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
func (dm *PodSpecableMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
// Retrieve the DNS IP associated to the workspace
dnsServiceName := shared.GetDNSID(workspace, dm.syncTargetUID, dm.syncTargetName)

Expand All @@ -299,13 +314,13 @@ func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name)
}

// resolveDownwardAPIFieldRefEnv replaces the downwardAPI FieldRef EnvVars with the value from the deployment, right now it only replaces the metadata.namespace.
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, deployment appsv1.Deployment) []corev1.EnvVar {
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, podspecable *unstructured.Unstructured) []corev1.EnvVar {
var result []corev1.EnvVar
for _, env := range envs {
if env.ValueFrom != nil && env.ValueFrom.FieldRef != nil && env.ValueFrom.FieldRef.FieldPath == "metadata.namespace" {
result = append(result, corev1.EnvVar{
Name: env.Name,
Value: deployment.Namespace,
Value: podspecable.GetNamespace(),
})
} else {
result = append(result, env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,7 +941,7 @@ func TestDeploymentMutate(t *testing.T) {
require.NoError(t, err, "Service Add() = %v", err)
svcLister := listerscorev1.NewServiceLister(serviceIndexer)

dm := NewDeploymentMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)
dm := NewPodspecableMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)

unstrOriginalDeployment, err := toUnstructured(c.originalDeployment)
require.NoError(t, err, "toUnstructured() = %v", err)
Expand Down
12 changes: 7 additions & 5 deletions pkg/syncer/spec/mutators/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
type SecretMutator struct {
}

func (sm *SecretMutator) GVR() schema.GroupVersionResource {
return schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "secrets",
func (sm *SecretMutator) GVRs() []schema.GroupVersionResource {
return []schema.GroupVersionResource{
{
Group: "",
Version: "v1",
Resource: "secrets",
},
}
}

Expand Down
38 changes: 15 additions & 23 deletions pkg/syncer/spec/spec_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package spec
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"time"
Expand All @@ -33,8 +32,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -53,7 +50,6 @@ import (
syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers"
"github.com/kcp-dev/kcp/pkg/syncer/shared"
"github.com/kcp-dev/kcp/pkg/syncer/spec/dns"
specmutators "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators"
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
)

Expand All @@ -63,10 +59,15 @@ const (

var namespaceGVR schema.GroupVersionResource = corev1.SchemeGroupVersion.WithResource("namespaces")

type Mutator interface {
GVRs() []schema.GroupVersionResource
Mutate(obj *unstructured.Unstructured) error
}

type Controller struct {
queue workqueue.RateLimitingInterface

mutators mutatorGvrMap
mutators map[schema.GroupVersionResource]Mutator
dnsProcessor *dns.DNSProcessor

upstreamClient kcpdynamic.ClusterInterface
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
dnsNamespace string,
syncerNamespaceInformerFactory informers.SharedInformerFactory,
dnsImage string,
upsyncPods bool) (*Controller, error) {
mutators ...Mutator) (*Controller, error) {
c := Controller{
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),

Expand All @@ -118,9 +119,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
informer, ok := informers[gvr]
if !ok {
if shared.ContainsGVR(notSynced, gvr) {
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", gvr)
}
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
return nil, fmt.Errorf("gvr %v should be known in the upstream informer factory", gvr)
}
return informer.Lister(), nil
},
Expand All @@ -137,6 +138,8 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
syncTargetUID: syncTargetUID,
syncTargetKey: syncTargetKey,
advancedSchedulingEnabled: advancedSchedulingEnabled,

mutators: make(map[schema.GroupVersionResource]Mutator, 2),
}

logger := logging.WithReconciler(syncerLogger, controllerName)
Expand Down Expand Up @@ -254,29 +257,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
},
})

secretMutator := specmutators.NewSecretMutator()

dnsServiceLister := syncerNamespaceInformerFactory.Core().V1().Services().Lister()

deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) {
secretLister, err := c.getUpstreamLister(corev1.SchemeGroupVersion.WithResource("secrets"))
if err != nil {
return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory")
for _, mutator := range mutators {
for _, gvr := range mutator.GVRs() {
c.mutators[gvr] = mutator
}
return secretLister.ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything())
}, dnsServiceLister, syncTargetClusterName, syncTargetUID, syncTargetName, dnsNamespace, upsyncPods)

c.mutators = mutatorGvrMap{
deploymentMutator.GVR(): deploymentMutator.Mutate,
secretMutator.GVR(): secretMutator.Mutate,
}

c.dnsProcessor = dns.NewDNSProcessor(downstreamKubeClient,
syncerNamespaceInformerFactory.Core().V1().ServiceAccounts().Lister(),
syncerNamespaceInformerFactory.Rbac().V1().Roles().Lister(),
syncerNamespaceInformerFactory.Rbac().V1().RoleBindings().Lister(),
syncerNamespaceInformerFactory.Apps().V1().Deployments().Lister(),
dnsServiceLister,
syncerNamespaceInformerFactory.Core().V1().Services().Lister(),
syncerNamespaceInformerFactory.Core().V1().Endpoints().Lister(),
syncerNamespaceInformerFactory.Networking().V1().NetworkPolicies().Lister(),
syncTargetUID, syncTargetName, dnsNamespace, dnsImage)
Expand Down
4 changes: 1 addition & 3 deletions pkg/syncer/spec/spec_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ const (
syncerApplyManager = "syncer"
)

type mutatorGvrMap map[schema.GroupVersionResource]func(obj *unstructured.Unstructured) error

func deepEqualApartFromStatus(logger logr.Logger, oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
// TODO(jmprusi): Remove this after switching to virtual workspaces.
// remove status annotation from oldObj and newObj before comparing
Expand Down Expand Up @@ -443,7 +441,7 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers

// Run any transformations on the object before we apply it to the downstream cluster.
if mutator, ok := c.mutators[gvr]; ok {
if err := mutator(downstreamObj); err != nil {
if err := mutator.Mutate(downstreamObj); err != nil {
return err
}
}
Expand Down
Loading