Skip to content

Commit b844be6

Browse files
Merge pull request #2845 from davidfestal/upsync-pods-from-synced-podspecable-resources
✨ Syncer: also mutate `StatefulSet`s and `ReplicaSet`s
2 parents df364a2 + e26c87b commit b844be6

File tree

7 files changed

+136
-84
lines changed

7 files changed

+136
-84
lines changed

pkg/syncer/spec/mutators/deployment.go renamed to pkg/syncer/spec/mutators/podspecable.go

Lines changed: 65 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323

2424
"github.com/kcp-dev/logicalcluster/v3"
2525

26-
appsv1 "k8s.io/api/apps/v1"
2726
corev1 "k8s.io/api/core/v1"
2827
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2928
"k8s.io/apimachinery/pkg/runtime"
@@ -38,7 +37,7 @@ import (
3837

3938
type ListSecretFunc func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error)
4039

41-
type DeploymentMutator struct {
40+
type PodSpecableMutator struct {
4241
upstreamURL *url.URL
4342
listSecrets ListSecretFunc
4443
serviceLister listerscorev1.ServiceLister
@@ -49,18 +48,30 @@ type DeploymentMutator struct {
4948
upsyncPods bool
5049
}
5150

52-
func (dm *DeploymentMutator) GVR() schema.GroupVersionResource {
53-
return schema.GroupVersionResource{
54-
Group: "apps",
55-
Version: "v1",
56-
Resource: "deployments",
51+
func (dm *PodSpecableMutator) GVRs() []schema.GroupVersionResource {
52+
return []schema.GroupVersionResource{
53+
{
54+
Group: "apps",
55+
Version: "v1",
56+
Resource: "deployments",
57+
},
58+
{
59+
Group: "apps",
60+
Version: "v1",
61+
Resource: "statefulsets",
62+
},
63+
{
64+
Group: "apps",
65+
Version: "v1",
66+
Resource: "replicasets",
67+
},
5768
}
5869
}
5970

60-
func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
71+
func NewPodspecableMutator(upstreamURL *url.URL, secretLister ListSecretFunc, serviceLister listerscorev1.ServiceLister,
6172
syncTargetClusterName logicalcluster.Name,
62-
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *DeploymentMutator {
63-
return &DeploymentMutator{
73+
syncTargetUID types.UID, syncTargetName, dnsNamespace string, upsyncPods bool) *PodSpecableMutator {
74+
return &PodSpecableMutator{
6475
upstreamURL: upstreamURL,
6576
listSecrets: secretLister,
6677
serviceLister: serviceLister,
@@ -73,24 +84,30 @@ func NewDeploymentMutator(upstreamURL *url.URL, secretLister ListSecretFunc, ser
7384
}
7485

7586
// Mutate applies the mutator changes to the object.
76-
func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
77-
var deployment appsv1.Deployment
78-
err := runtime.DefaultUnstructuredConverter.FromUnstructured(
79-
obj.UnstructuredContent(),
80-
&deployment)
87+
func (dm *PodSpecableMutator) Mutate(obj *unstructured.Unstructured) error {
88+
podTemplateUnstr, ok, err := unstructured.NestedMap(obj.UnstructuredContent(), "spec", "template")
8189
if err != nil {
8290
return err
8391
}
84-
upstreamLogicalName := logicalcluster.From(obj)
92+
if !ok {
93+
return fmt.Errorf("object should have a PodTemplate.Spec 'spec.template', but doesn't: %v", obj)
94+
}
8595

86-
templateSpec := &deployment.Spec.Template.Spec
96+
podTemplate := &corev1.PodTemplateSpec{}
97+
err = runtime.DefaultUnstructuredConverter.FromUnstructured(
98+
podTemplateUnstr,
99+
&podTemplate)
100+
if err != nil {
101+
return err
102+
}
103+
upstreamLogicalName := logicalcluster.From(obj)
87104

88105
desiredServiceAccountName := "default"
89-
if templateSpec.ServiceAccountName != "" && templateSpec.ServiceAccountName != "default" {
90-
desiredServiceAccountName = templateSpec.ServiceAccountName
106+
if podTemplate.Spec.ServiceAccountName != "" && podTemplate.Spec.ServiceAccountName != "default" {
107+
desiredServiceAccountName = podTemplate.Spec.ServiceAccountName
91108
}
92109

93-
rawSecretList, err := dm.listSecrets(upstreamLogicalName, deployment.Namespace)
110+
rawSecretList, err := dm.listSecrets(upstreamLogicalName, obj.GetNamespace())
94111
if err != nil {
95112
return fmt.Errorf("error listing secrets for workspace %s: %w", upstreamLogicalName.String(), err)
96113
}
@@ -123,14 +140,14 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
123140
}
124141

125142
if desiredSecretName == "" {
126-
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, deployment.Namespace, upstreamLogicalName.String())
143+
return fmt.Errorf("couldn't find a token upstream for the serviceaccount %s/%s in workspace %s", desiredServiceAccountName, obj.GetNamespace(), upstreamLogicalName.String())
127144
}
128145

129146
// Setting AutomountServiceAccountToken to false allow us to control the ServiceAccount
130147
// VolumeMount and Volume definitions.
131-
templateSpec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
148+
podTemplate.Spec.AutomountServiceAccountToken = utilspointer.BoolPtr(false)
132149
// Set to empty the serviceAccountName on podTemplate as we are not syncing the serviceAccount down to the workload cluster.
133-
templateSpec.ServiceAccountName = ""
150+
podTemplate.Spec.ServiceAccountName = ""
134151

135152
kcpExternalHost := dm.upstreamURL.Hostname()
136153
kcpExternalPort := dm.upstreamURL.Port()
@@ -193,42 +210,42 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
193210
}
194211

195212
// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the containers
196-
for i := range deployment.Spec.Template.Spec.Containers {
213+
for i := range podTemplate.Spec.Containers {
197214
for _, overrideEnv := range overrideEnvs {
198-
templateSpec.Containers[i].Env = updateEnv(templateSpec.Containers[i].Env, overrideEnv)
215+
podTemplate.Spec.Containers[i].Env = updateEnv(podTemplate.Spec.Containers[i].Env, overrideEnv)
199216
}
200-
templateSpec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.Containers[i].Env, deployment)
201-
templateSpec.Containers[i].VolumeMounts = updateVolumeMount(templateSpec.Containers[i].VolumeMounts, serviceAccountMount)
217+
podTemplate.Spec.Containers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.Containers[i].Env, obj)
218+
podTemplate.Spec.Containers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.Containers[i].VolumeMounts, serviceAccountMount)
202219
}
203220

204221
// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Init containers
205-
for i := range templateSpec.InitContainers {
222+
for i := range podTemplate.Spec.InitContainers {
206223
for _, overrideEnv := range overrideEnvs {
207-
templateSpec.InitContainers[i].Env = updateEnv(templateSpec.InitContainers[i].Env, overrideEnv)
224+
podTemplate.Spec.InitContainers[i].Env = updateEnv(podTemplate.Spec.InitContainers[i].Env, overrideEnv)
208225
}
209-
templateSpec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.InitContainers[i].Env, deployment)
210-
templateSpec.InitContainers[i].VolumeMounts = updateVolumeMount(templateSpec.InitContainers[i].VolumeMounts, serviceAccountMount)
226+
podTemplate.Spec.InitContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.InitContainers[i].Env, obj)
227+
podTemplate.Spec.InitContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.InitContainers[i].VolumeMounts, serviceAccountMount)
211228
}
212229

213230
// Override Envs, resolve downwardAPI FieldRef and add the VolumeMount to all the Ephemeral containers
214-
for i := range templateSpec.EphemeralContainers {
231+
for i := range podTemplate.Spec.EphemeralContainers {
215232
for _, overrideEnv := range overrideEnvs {
216-
templateSpec.EphemeralContainers[i].Env = updateEnv(templateSpec.EphemeralContainers[i].Env, overrideEnv)
233+
podTemplate.Spec.EphemeralContainers[i].Env = updateEnv(podTemplate.Spec.EphemeralContainers[i].Env, overrideEnv)
217234
}
218-
templateSpec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(templateSpec.EphemeralContainers[i].Env, deployment)
219-
templateSpec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(templateSpec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
235+
podTemplate.Spec.EphemeralContainers[i].Env = resolveDownwardAPIFieldRefEnv(podTemplate.Spec.EphemeralContainers[i].Env, obj)
236+
podTemplate.Spec.EphemeralContainers[i].VolumeMounts = updateVolumeMount(podTemplate.Spec.EphemeralContainers[i].VolumeMounts, serviceAccountMount)
220237
}
221238

222239
// Add the ServiceAccount volume with our overrides.
223240
found := false
224-
for i := range templateSpec.Volumes {
225-
if templateSpec.Volumes[i].Name == "kcp-api-access" {
226-
templateSpec.Volumes[i] = serviceAccountVolume
241+
for i := range podTemplate.Spec.Volumes {
242+
if podTemplate.Spec.Volumes[i].Name == "kcp-api-access" {
243+
podTemplate.Spec.Volumes[i] = serviceAccountVolume
227244
found = true
228245
}
229246
}
230247
if !found {
231-
templateSpec.Volumes = append(templateSpec.Volumes, serviceAccountVolume)
248+
podTemplate.Spec.Volumes = append(podTemplate.Spec.Volumes, serviceAccountVolume)
232249
}
233250

234251
// Overrides DNS to point to the workspace DNS
@@ -238,8 +255,8 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
238255
return err // retry
239256
}
240257

241-
deployment.Spec.Template.Spec.DNSPolicy = corev1.DNSNone
242-
deployment.Spec.Template.Spec.DNSConfig = &corev1.PodDNSConfig{
258+
podTemplate.Spec.DNSPolicy = corev1.DNSNone
259+
podTemplate.Spec.DNSConfig = &corev1.PodDNSConfig{
243260
Nameservers: []string{dnsIP},
244261
Searches: []string{ // TODO(LV): from /etc/resolv.conf
245262
obj.GetNamespace() + ".svc.cluster.local",
@@ -256,31 +273,29 @@ func (dm *DeploymentMutator) Mutate(obj *unstructured.Unstructured) error {
256273

257274
if dm.upsyncPods {
258275
syncTargetKey := workloadv1alpha1.ToSyncTargetKey(dm.syncTargetClusterName, dm.syncTargetName)
259-
labels := deployment.Spec.Template.Labels
276+
labels := podTemplate.Labels
260277
if labels == nil {
261278
labels = map[string]string{}
262279
}
263280
labels[workloadv1alpha1.ClusterResourceStateLabelPrefix+syncTargetKey] = string(workloadv1alpha1.ResourceStateUpsync)
264281
labels[workloadv1alpha1.InternalDownstreamClusterLabel] = syncTargetKey
265-
deployment.Spec.Template.Labels = labels
282+
podTemplate.Labels = labels
266283

267284
// TODO(davidfestal): In the future we could add a diff annotation to transform the resource while upsyncing:
268285
// - remove unnecessary fields we don't want leaking to upstream
269286
// - add an owner reference to the upstream deployment
270287
}
271288

272-
unstructured, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&deployment)
289+
newPodTemplateUnstr, err := runtime.DefaultUnstructuredConverter.ToUnstructured(podTemplate)
273290
if err != nil {
274291
return err
275292
}
276293

277294
// Set the changes back into the obj.
278-
obj.SetUnstructuredContent(unstructured)
279-
280-
return nil
295+
return unstructured.SetNestedMap(obj.Object, newPodTemplateUnstr, "spec", "template")
281296
}
282297

283-
func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
298+
func (dm *PodSpecableMutator) getDNSIPForWorkspace(workspace logicalcluster.Name) (string, error) {
284299
// Retrieve the DNS IP associated to the workspace
285300
dnsServiceName := shared.GetDNSID(workspace, dm.syncTargetUID, dm.syncTargetName)
286301

@@ -299,13 +314,13 @@ func (dm *DeploymentMutator) getDNSIPForWorkspace(workspace logicalcluster.Name)
299314
}
300315

301316
// resolveDownwardAPIFieldRefEnv replaces the downwardAPI FieldRef EnvVars with the value from the deployment, right now it only replaces the metadata.namespace.
302-
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, deployment appsv1.Deployment) []corev1.EnvVar {
317+
func resolveDownwardAPIFieldRefEnv(envs []corev1.EnvVar, podspecable *unstructured.Unstructured) []corev1.EnvVar {
303318
var result []corev1.EnvVar
304319
for _, env := range envs {
305320
if env.ValueFrom != nil && env.ValueFrom.FieldRef != nil && env.ValueFrom.FieldRef.FieldPath == "metadata.namespace" {
306321
result = append(result, corev1.EnvVar{
307322
Name: env.Name,
308-
Value: deployment.Namespace,
323+
Value: podspecable.GetNamespace(),
309324
})
310325
} else {
311326
result = append(result, env)

pkg/syncer/spec/mutators/deployment_test.go renamed to pkg/syncer/spec/mutators/podspecable_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -941,7 +941,7 @@ func TestDeploymentMutate(t *testing.T) {
941941
require.NoError(t, err, "Service Add() = %v", err)
942942
svcLister := listerscorev1.NewServiceLister(serviceIndexer)
943943

944-
dm := NewDeploymentMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)
944+
dm := NewPodspecableMutator(upstreamURL, secretLister, svcLister, clusterName, "syncTargetUID", "syncTargetName", "dnsNamespace", c.upsyncPods)
945945

946946
unstrOriginalDeployment, err := toUnstructured(c.originalDeployment)
947947
require.NoError(t, err, "toUnstructured() = %v", err)

pkg/syncer/spec/mutators/secrets.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@ import (
2525
type SecretMutator struct {
2626
}
2727

28-
func (sm *SecretMutator) GVR() schema.GroupVersionResource {
29-
return schema.GroupVersionResource{
30-
Group: "",
31-
Version: "v1",
32-
Resource: "secrets",
28+
func (sm *SecretMutator) GVRs() []schema.GroupVersionResource {
29+
return []schema.GroupVersionResource{
30+
{
31+
Group: "",
32+
Version: "v1",
33+
Resource: "secrets",
34+
},
3335
}
3436
}
3537

pkg/syncer/spec/spec_controller.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package spec
1919
import (
2020
"context"
2121
"encoding/json"
22-
"errors"
2322
"fmt"
2423
"net/url"
2524
"time"
@@ -33,8 +32,6 @@ import (
3332
apierrors "k8s.io/apimachinery/pkg/api/errors"
3433
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3534
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36-
"k8s.io/apimachinery/pkg/labels"
37-
"k8s.io/apimachinery/pkg/runtime"
3835
"k8s.io/apimachinery/pkg/runtime/schema"
3936
"k8s.io/apimachinery/pkg/types"
4037
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@@ -53,7 +50,6 @@ import (
5350
syncerindexers "github.com/kcp-dev/kcp/pkg/syncer/indexers"
5451
"github.com/kcp-dev/kcp/pkg/syncer/shared"
5552
"github.com/kcp-dev/kcp/pkg/syncer/spec/dns"
56-
specmutators "github.com/kcp-dev/kcp/pkg/syncer/spec/mutators"
5753
. "github.com/kcp-dev/kcp/tmc/pkg/logging"
5854
)
5955

@@ -63,10 +59,15 @@ const (
6359

6460
var namespaceGVR schema.GroupVersionResource = corev1.SchemeGroupVersion.WithResource("namespaces")
6561

62+
type Mutator interface {
63+
GVRs() []schema.GroupVersionResource
64+
Mutate(obj *unstructured.Unstructured) error
65+
}
66+
6667
type Controller struct {
6768
queue workqueue.RateLimitingInterface
6869

69-
mutators mutatorGvrMap
70+
mutators map[schema.GroupVersionResource]Mutator
7071
dnsProcessor *dns.DNSProcessor
7172

7273
upstreamClient kcpdynamic.ClusterInterface
@@ -94,7 +95,7 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
9495
dnsNamespace string,
9596
syncerNamespaceInformerFactory informers.SharedInformerFactory,
9697
dnsImage string,
97-
upsyncPods bool) (*Controller, error) {
98+
mutators ...Mutator) (*Controller, error) {
9899
c := Controller{
99100
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
100101

@@ -118,9 +119,9 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
118119
informer, ok := informers[gvr]
119120
if !ok {
120121
if shared.ContainsGVR(notSynced, gvr) {
121-
return nil, fmt.Errorf("informer for gvr %v not synced in the downstream informer factory", gvr)
122+
return nil, fmt.Errorf("informer for gvr %v not synced in the upstream informer factory", gvr)
122123
}
123-
return nil, fmt.Errorf("gvr %v should be known in the downstream informer factory", gvr)
124+
return nil, fmt.Errorf("gvr %v should be known in the upstream informer factory", gvr)
124125
}
125126
return informer.Lister(), nil
126127
},
@@ -137,6 +138,8 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
137138
syncTargetUID: syncTargetUID,
138139
syncTargetKey: syncTargetKey,
139140
advancedSchedulingEnabled: advancedSchedulingEnabled,
141+
142+
mutators: make(map[schema.GroupVersionResource]Mutator, 2),
140143
}
141144

142145
logger := logging.WithReconciler(syncerLogger, controllerName)
@@ -254,29 +257,18 @@ func NewSpecSyncer(syncerLogger logr.Logger, syncTargetClusterName logicalcluste
254257
},
255258
})
256259

257-
secretMutator := specmutators.NewSecretMutator()
258-
259-
dnsServiceLister := syncerNamespaceInformerFactory.Core().V1().Services().Lister()
260-
261-
deploymentMutator := specmutators.NewDeploymentMutator(upstreamURL, func(clusterName logicalcluster.Name, namespace string) ([]runtime.Object, error) {
262-
secretLister, err := c.getUpstreamLister(corev1.SchemeGroupVersion.WithResource("secrets"))
263-
if err != nil {
264-
return nil, errors.New("informer should be up and synced for namespaces in the upstream syncer informer factory")
260+
for _, mutator := range mutators {
261+
for _, gvr := range mutator.GVRs() {
262+
c.mutators[gvr] = mutator
265263
}
266-
return secretLister.ByCluster(clusterName).ByNamespace(namespace).List(labels.Everything())
267-
}, dnsServiceLister, syncTargetClusterName, syncTargetUID, syncTargetName, dnsNamespace, upsyncPods)
268-
269-
c.mutators = mutatorGvrMap{
270-
deploymentMutator.GVR(): deploymentMutator.Mutate,
271-
secretMutator.GVR(): secretMutator.Mutate,
272264
}
273265

274266
c.dnsProcessor = dns.NewDNSProcessor(downstreamKubeClient,
275267
syncerNamespaceInformerFactory.Core().V1().ServiceAccounts().Lister(),
276268
syncerNamespaceInformerFactory.Rbac().V1().Roles().Lister(),
277269
syncerNamespaceInformerFactory.Rbac().V1().RoleBindings().Lister(),
278270
syncerNamespaceInformerFactory.Apps().V1().Deployments().Lister(),
279-
dnsServiceLister,
271+
syncerNamespaceInformerFactory.Core().V1().Services().Lister(),
280272
syncerNamespaceInformerFactory.Core().V1().Endpoints().Lister(),
281273
syncerNamespaceInformerFactory.Networking().V1().NetworkPolicies().Lister(),
282274
syncTargetUID, syncTargetName, dnsNamespace, dnsImage)

pkg/syncer/spec/spec_process.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ const (
5151
syncerApplyManager = "syncer"
5252
)
5353

54-
type mutatorGvrMap map[schema.GroupVersionResource]func(obj *unstructured.Unstructured) error
55-
5654
func deepEqualApartFromStatus(logger logr.Logger, oldUnstrob, newUnstrob *unstructured.Unstructured) bool {
5755
// TODO(jmprusi): Remove this after switching to virtual workspaces.
5856
// remove status annotation from oldObj and newObj before comparing
@@ -443,7 +441,7 @@ func (c *Controller) applyToDownstream(ctx context.Context, gvr schema.GroupVers
443441

444442
// Run any transformations on the object before we apply it to the downstream cluster.
445443
if mutator, ok := c.mutators[gvr]; ok {
446-
if err := mutator(downstreamObj); err != nil {
444+
if err := mutator.Mutate(downstreamObj); err != nil {
447445
return err
448446
}
449447
}

0 commit comments

Comments
 (0)