Skip to content

Commit 0c977d5

Browse files
authored
feat: drift for Kubelet Client ID (#888)
* Add kubernetes.azure.com/kubelet-identity-client-id label to all nodes, to match what AKS does. * Drift nodes if the expected kubelet-identity-client-id has changed. * Does not drift if the node doesn't have the AKS Kubelet ClientID label set.
1 parent fe55ae4 commit 0c977d5

File tree

6 files changed

+196
-126
lines changed

6 files changed

+196
-126
lines changed

pkg/apis/v1beta1/labels.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ var (
9494
// AKS labels
9595
AKSLabelDomain = "kubernetes.azure.com"
9696

97-
AKSLabelCluster = AKSLabelDomain + "/cluster"
97+
AKSLabelCluster = AKSLabelDomain + "/cluster"
98+
AKSLabelKubeletIdentityClientID = AKSLabelDomain + "/kubelet-identity-client-id"
9899

99100
AnnotationAKSNodeClassHash = apis.Group + "/aksnodeclass-hash"
100101
AnnotationAKSNodeClassHashVersion = apis.Group + "/aksnodeclass-hash-version"

pkg/cloudprovider/drift.go

Lines changed: 102 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,71 +23,76 @@ import (
2323

2424
sdkerrors "github.com/Azure/azure-sdk-for-go-extensions/pkg/errors"
2525
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork"
26+
"github.com/samber/lo"
27+
v1 "k8s.io/api/core/v1"
28+
"sigs.k8s.io/controller-runtime/pkg/log"
29+
2630
"github.com/Azure/karpenter-provider-azure/pkg/apis/v1beta1"
2731
"github.com/Azure/karpenter-provider-azure/pkg/operator/options"
2832
"github.com/Azure/karpenter-provider-azure/pkg/providers/instance"
2933
"github.com/Azure/karpenter-provider-azure/pkg/utils"
30-
"github.com/samber/lo"
31-
"sigs.k8s.io/controller-runtime/pkg/log"
3234

3335
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
3436
"sigs.k8s.io/karpenter/pkg/cloudprovider"
3537
nodeclaimutils "sigs.k8s.io/karpenter/pkg/utils/nodeclaim"
3638
)
3739

3840
const (
39-
NodeClassDrift cloudprovider.DriftReason = "NodeClassDrift"
40-
K8sVersionDrift cloudprovider.DriftReason = "K8sVersionDrift"
41-
ImageDrift cloudprovider.DriftReason = "ImageDrift"
42-
SubnetDrift cloudprovider.DriftReason = "SubnetDrift"
41+
NodeClassDrift cloudprovider.DriftReason = "NodeClassDrift"
42+
K8sVersionDrift cloudprovider.DriftReason = "K8sVersionDrift"
43+
ImageDrift cloudprovider.DriftReason = "ImageDrift"
44+
SubnetDrift cloudprovider.DriftReason = "SubnetDrift"
45+
KubeletIdentityDrift cloudprovider.DriftReason = "KubeletIdentityDrift"
4346

4447
// TODO (charliedmcb): Use this const across code and test locations which are signaling/checking for "no drift"
4548
NoDrift cloudprovider.DriftReason = ""
4649
)
4750

4851
func (c *CloudProvider) isNodeClassDrifted(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error) {
49-
// First check if the node class is statically staticFieldsDrifted to save on API calls.
50-
if staticFieldsDrifted := c.areStaticFieldsDrifted(nodeClaim, nodeClass); staticFieldsDrifted != "" {
51-
return staticFieldsDrifted, nil
52-
}
53-
k8sVersionDrifted, err := c.isK8sVersionDrifted(ctx, nodeClaim, nodeClass)
54-
if err != nil {
55-
return "", err
56-
}
57-
if k8sVersionDrifted != "" {
58-
return k8sVersionDrifted, nil
59-
}
60-
imageVersionDrifted, err := c.isImageVersionDrifted(ctx, nodeClaim, nodeClass)
61-
if err != nil {
62-
return "", err
63-
}
64-
if imageVersionDrifted != "" {
65-
return imageVersionDrifted, nil
66-
}
67-
subnetDrifted, err := c.isSubnetDrifted(ctx, nodeClaim, nodeClass)
68-
if err != nil {
69-
return "", err
52+
// TODO: if we find more expensive checks, such as reading VMs or NICs from Azure, are being duplicated between checks, we should
53+
// produce a lazy at-most-once that allows a check to cache a value for later checks to read.
54+
checks := []func(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error){
55+
c.areStaticFieldsDrifted,
56+
c.isK8sVersionDrifted,
57+
c.isKubeletIdentityDrifted,
58+
c.isImageVersionDrifted,
59+
c.isSubnetDrifted,
7060
}
71-
if subnetDrifted != "" {
72-
return subnetDrifted, nil
61+
for _, check := range checks {
62+
driftReason, err := check(ctx, nodeClaim, nodeClass)
63+
if err != nil {
64+
return "", err
65+
}
66+
if driftReason != "" {
67+
return driftReason, nil
68+
}
7369
}
70+
7471
return "", nil
7572
}
7673

77-
func (c *CloudProvider) areStaticFieldsDrifted(nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) cloudprovider.DriftReason {
74+
func (c *CloudProvider) areStaticFieldsDrifted(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error) {
75+
logger := log.FromContext(ctx)
76+
7877
nodeClassHash, foundNodeClassHash := nodeClass.Annotations[v1beta1.AnnotationAKSNodeClassHash]
7978
nodeClassHashVersion, foundNodeClassHashVersion := nodeClass.Annotations[v1beta1.AnnotationAKSNodeClassHashVersion]
8079
nodeClaimHash, foundNodeClaimHash := nodeClaim.Annotations[v1beta1.AnnotationAKSNodeClassHash]
8180
nodeClaimHashVersion, foundNodeClaimHashVersion := nodeClaim.Annotations[v1beta1.AnnotationAKSNodeClassHashVersion]
8281

8382
if !foundNodeClassHash || !foundNodeClaimHash || !foundNodeClassHashVersion || !foundNodeClaimHashVersion {
84-
return ""
83+
return "", nil
8584
}
8685
// validate that the hash version for the AKSNodeClass is the same as the NodeClaim before evaluating for static drift
8786
if nodeClassHashVersion != nodeClaimHashVersion {
88-
return ""
87+
return "", nil
88+
}
89+
90+
if nodeClassHash != nodeClaimHash {
91+
logger.V(1).Info(fmt.Sprintf("drift triggered for %s, as nodeClassHash (%s) != nodeClaimHash (%s)", NodeClassDrift, nodeClassHash, nodeClaimHash))
92+
return NodeClassDrift, nil
8993
}
90-
return lo.Ternary(nodeClassHash != nodeClaimHash, NodeClassDrift, "")
94+
95+
return "", nil
9196
}
9297

9398
func (c *CloudProvider) isK8sVersionDrifted(ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error) {
@@ -103,34 +108,12 @@ func (c *CloudProvider) isK8sVersionDrifted(ctx context.Context, nodeClaim *karp
103108
return "", nil //nolint:nilerr
104109
}
105110

106-
nodeName := nodeClaim.Status.NodeName
107-
if nodeName == "" {
108-
// We do not return an error here as its expected within the lifecycle of the nodeclaims registration.
109-
// Drift can be called for a nodeclaim once its launched, but .Status.NodeName is only filled out after the node is registered:
110-
// https://github.com/kubernetes-sigs/karpenter/blob/8b9ea2e7cd10acdb40bccdf91a153a2e69b71107/pkg/controllers/nodeclaim/lifecycle/registration.go#L83
111-
return "", nil
112-
}
113-
114-
n, err := nodeclaimutils.NodeForNodeClaim(ctx, c.kubeClient, nodeClaim)
115-
if err != nil {
116-
if nodeclaimutils.IsNodeNotFoundError(err) {
117-
// We do not return an error here as its expected within the lifecycle of the nodeclaims registration.
118-
// Core's checks only for Launched status which means we've started the create, but the node doesn't nessicarially exist yet
119-
// https://github.com/kubernetes-sigs/karpenter/blob/9877cf639e665eadcae9e46e5a702a1b30ced1d3/pkg/controllers/nodeclaim/disruption/drift.go#L51
120-
return "", nil
121-
}
122-
if nodeclaimutils.IsDuplicateNodeError(err) {
123-
logger.V(1).Info("WARN: Duplicate node error, invariant violated.")
124-
}
111+
node, err := c.getNodeForDrift(ctx, nodeClaim)
112+
if err != nil || node == nil {
125113
return "", err
126114
}
127-
if !n.DeletionTimestamp.IsZero() {
128-
// We do not need to check for drift if the node is being deleted.
129-
return "", nil
130-
}
131-
132-
nodeK8sVersion := strings.TrimPrefix(n.Status.NodeInfo.KubeletVersion, "v")
133115

116+
nodeK8sVersion := strings.TrimPrefix(node.Status.NodeInfo.KubeletVersion, "v")
134117
if nodeK8sVersion != k8sVersion {
135118
logger.V(1).Info(fmt.Sprintf("drift triggered for %s, with expected k8s version %s, and actual k8s version %s", K8sVersionDrift, k8sVersion, nodeK8sVersion))
136119
return K8sVersionDrift, nil
@@ -143,7 +126,10 @@ func (c *CloudProvider) isK8sVersionDrifted(ctx context.Context, nodeClaim *karp
143126
// Feel reassessing this within the future with a potential minor refactor would be best to fix the gocyclo.
144127
// nolint: gocyclo
145128
func (c *CloudProvider) isImageVersionDrifted(
146-
ctx context.Context, nodeClaim *karpv1.NodeClaim, nodeClass *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error) {
129+
ctx context.Context,
130+
nodeClaim *karpv1.NodeClaim,
131+
nodeClass *v1beta1.AKSNodeClass,
132+
) (cloudprovider.DriftReason, error) {
147133
logger := log.FromContext(ctx)
148134

149135
id, err := utils.GetVMName(nodeClaim.Status.ProviderID)
@@ -226,6 +212,63 @@ func (c *CloudProvider) isSubnetDrifted(ctx context.Context, nodeClaim *karpv1.N
226212
return "", nil
227213
}
228214

215+
// isKubeletIdentityDrifted returns drift if the kubelet identity has drifted
216+
func (c *CloudProvider) isKubeletIdentityDrifted(ctx context.Context, nodeClaim *karpv1.NodeClaim, _ *v1beta1.AKSNodeClass) (cloudprovider.DriftReason, error) {
217+
opts := options.FromContext(ctx)
218+
logger := log.FromContext(ctx)
219+
220+
node, err := c.getNodeForDrift(ctx, nodeClaim)
221+
if err != nil || node == nil {
222+
return "", err
223+
}
224+
225+
kubeletIdentityClientID := node.Labels[v1beta1.AKSLabelKubeletIdentityClientID]
226+
// The kubelet identity label is supposed to be set on every node, but prior to
227+
// 1.4.0 it was not set by Karpenter. In order to avoid rolling all existing nodes,
228+
// we don't count a missing kubelet identity as drift. This situation should resolve itself as
229+
// image version and Kubernetes version drift is performed.
230+
// TODO: This short-circuit should be removed post 1.4.0 (~2025-07-01)
231+
if kubeletIdentityClientID == "" {
232+
return "", nil
233+
}
234+
235+
if kubeletIdentityClientID != opts.KubeletIdentityClientID {
236+
logger.V(1).Info(
237+
fmt.Sprintf("drift triggered for %s, with expected kubelet identity client id %s, and actual kubelet identity client id %s",
238+
KubeletIdentityDrift,
239+
opts.KubeletIdentityClientID,
240+
kubeletIdentityClientID),
241+
)
242+
return KubeletIdentityDrift, nil
243+
}
244+
245+
return "", nil
246+
}
247+
248+
func (c *CloudProvider) getNodeForDrift(ctx context.Context, nodeClaim *karpv1.NodeClaim) (*v1.Node, error) {
249+
logger := log.FromContext(ctx)
250+
251+
n, err := nodeclaimutils.NodeForNodeClaim(ctx, c.kubeClient, nodeClaim)
252+
if err != nil {
253+
if nodeclaimutils.IsNodeNotFoundError(err) {
254+
// We do not return an error here as its expected within the lifecycle of the nodeclaims registration.
255+
// Core's checks only for Launched status which means we've started the create, but the node doesn't nessicarially exist yet
256+
// https://github.com/kubernetes-sigs/karpenter/blob/9877cf639e665eadcae9e46e5a702a1b30ced1d3/pkg/controllers/nodeclaim/disruption/drift.go#L51
257+
return nil, nil
258+
}
259+
if nodeclaimutils.IsDuplicateNodeError(err) {
260+
logger.V(1).Info("WARN: Duplicate node error, invariant violated.")
261+
}
262+
return nil, err
263+
}
264+
if !n.DeletionTimestamp.IsZero() {
265+
// We do not need to check for drift if the node is being deleted.
266+
return nil, nil
267+
}
268+
269+
return n, nil
270+
}
271+
229272
func getSubnetFromPrimaryIPConfig(nic *armnetwork.Interface) string {
230273
for _, ipConfig := range nic.Properties.IPConfigurations {
231274
if ipConfig.Properties.Subnet != nil && lo.FromPtr(ipConfig.Properties.Primary) {

pkg/cloudprovider/suite_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,38 +23,33 @@ import (
2323
"testing"
2424
"time"
2525

26-
"sigs.k8s.io/controller-runtime/pkg/client"
27-
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
28-
2926
"github.com/awslabs/operatorpkg/object"
3027
"github.com/blang/semver/v4"
3128
. "github.com/onsi/ginkgo/v2"
3229
. "github.com/onsi/gomega"
3330
"github.com/samber/lo"
3431
v1 "k8s.io/api/core/v1"
3532
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36-
3733
"k8s.io/client-go/tools/record"
3834
clock "k8s.io/utils/clock/testing"
39-
40-
"github.com/Azure/karpenter-provider-azure/pkg/utils"
35+
"sigs.k8s.io/controller-runtime/pkg/client"
4136
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
4237
corecloudprovider "sigs.k8s.io/karpenter/pkg/cloudprovider"
43-
4438
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
4539
"sigs.k8s.io/karpenter/pkg/controllers/state"
4640
"sigs.k8s.io/karpenter/pkg/events"
4741
coreoptions "sigs.k8s.io/karpenter/pkg/operator/options"
4842
coretest "sigs.k8s.io/karpenter/pkg/test"
49-
43+
. "sigs.k8s.io/karpenter/pkg/test/expectations"
44+
"sigs.k8s.io/karpenter/pkg/test/v1alpha1"
5045
. "sigs.k8s.io/karpenter/pkg/utils/testing"
5146

5247
"github.com/Azure/karpenter-provider-azure/pkg/apis"
5348
"github.com/Azure/karpenter-provider-azure/pkg/apis/v1beta1"
5449
"github.com/Azure/karpenter-provider-azure/pkg/operator/options"
5550
"github.com/Azure/karpenter-provider-azure/pkg/providers/instance"
5651
"github.com/Azure/karpenter-provider-azure/pkg/test"
57-
. "sigs.k8s.io/karpenter/pkg/test/expectations"
52+
"github.com/Azure/karpenter-provider-azure/pkg/utils"
5853
)
5954

6055
var ctx context.Context
@@ -176,16 +171,24 @@ var _ = Describe("CloudProvider", func() {
176171
Context("Drift", func() {
177172
var nodeClaim *karpv1.NodeClaim
178173
var pod *v1.Pod
174+
var node *v1.Node
175+
179176
BeforeEach(func() {
180177
instanceType := "Standard_D2_v2"
181178
ExpectApplied(ctx, env.Client, nodePool, nodeClass)
182179
pod = coretest.UnschedulablePod(coretest.PodOptions{
183180
NodeSelector: map[string]string{v1.LabelInstanceTypeStable: instanceType},
184181
})
185182
ExpectProvisioned(ctx, env.Client, cluster, cloudProvider, prov, pod)
186-
node := ExpectScheduled(ctx, env.Client, pod)
183+
node = ExpectScheduled(ctx, env.Client, pod)
187184
// KubeletVersion must be applied to the node to satisfy k8s drift
188185
node.Status.NodeInfo.KubeletVersion = "v" + nodeClass.Status.KubernetesVersion
186+
node.Labels[v1beta1.AKSLabelKubeletIdentityClientID] = "61f71907-753f-4802-a901-47361c3664f2" // random UUID
187+
// Context must have same kubelet client id
188+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{
189+
KubeletIdentityClientID: lo.ToPtr(node.Labels[v1beta1.AKSLabelKubeletIdentityClientID]),
190+
}))
191+
189192
ExpectApplied(ctx, env.Client, node)
190193
Expect(azureEnv.NetworkInterfacesAPI.NetworkInterfacesCreateOrUpdateBehavior.CalledWithInput.Len()).To(Equal(1))
191194
Expect(azureEnv.VirtualMachinesAPI.VirtualMachineCreateOrUpdateBehavior.CalledWithInput.Len()).To(Equal(1))
@@ -330,7 +333,7 @@ var _ = Describe("CloudProvider", func() {
330333
})
331334

332335
It("shouldn't error or be drifted when node is deleting", func() {
333-
node := ExpectNodeExists(ctx, env.Client, nodeClaim.Status.NodeName)
336+
node = ExpectNodeExists(ctx, env.Client, nodeClaim.Status.NodeName)
334337
node.Finalizers = append(node.Finalizers, test.TestingFinalizer)
335338
ExpectApplied(ctx, env.Client, node)
336339
Expect(env.Client.Delete(ctx, node)).ToNot(HaveOccurred())
@@ -362,5 +365,26 @@ var _ = Describe("CloudProvider", func() {
362365
Expect(drifted).To(Equal(K8sVersionDrift))
363366
})
364367
})
368+
369+
Context("Kubelet Client ID", func() {
370+
It("should NOT trigger drift if node doesn't have kubelet client ID label", func() {
371+
node.Labels[v1beta1.AKSLabelKubeletIdentityClientID] = "" // Not set
372+
373+
drifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
374+
Expect(err).ToNot(HaveOccurred())
375+
Expect(drifted).To(BeEmpty())
376+
})
377+
378+
It("should trigger drift if node kubelet client ID doesn't match options", func() {
379+
ctx = options.ToContext(ctx, test.Options(test.OptionsFields{
380+
KubeletIdentityClientID: lo.ToPtr("3824ff7a-93b6-40af-b861-2eb621ba437a"), // a different random UUID
381+
}))
382+
383+
drifted, err := cloudProvider.IsDrifted(ctx, nodeClaim)
384+
Expect(err).ToNot(HaveOccurred())
385+
Expect(drifted).To(Equal(KubeletIdentityDrift))
386+
})
387+
})
388+
365389
})
366390
})

pkg/providers/imagefamily/bootstrap/aksbootstrap.go

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/samber/lo"
2727
v1 "k8s.io/api/core/v1"
2828

29+
"github.com/Azure/karpenter-provider-azure/pkg/providers/imagefamily/labels"
2930
"github.com/Azure/karpenter-provider-azure/pkg/utils"
3031

3132
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -307,7 +308,7 @@ func (a AKS) applyOptions(nbv *NodeBootstrapVariables) {
307308

308309
// merge and stringify labels
309310
kubeletLabels := lo.Assign(getBaseKubeletNodeLabels(), a.Labels)
310-
getAgentbakerGeneratedLabels(a.ResourceGroup, kubeletLabels)
311+
labels.AddAgentBakerGeneratedLabels(a.ResourceGroup, a.KubeletIdentityClientID, kubeletLabels)
311312

312313
subnetParts, _ := utils.GetVnetSubnetIDComponents(a.SubnetID)
313314
nbv.Subnet = subnetParts.SubnetName
@@ -368,31 +369,6 @@ func getCustomDataFromNodeBootstrapVars(nbv *NodeBootstrapVariables) (string, er
368369
return buffer.String(), nil
369370
}
370371

371-
func getAgentbakerGeneratedLabels(nodeResourceGroup string, nodeLabels map[string]string) {
372-
nodeLabels["kubernetes.azure.com/role"] = "agent"
373-
nodeLabels["kubernetes.azure.com/cluster"] = normalizeResourceGroupNameForLabel(nodeResourceGroup)
374-
}
375-
376-
func normalizeResourceGroupNameForLabel(resourceGroupName string) string {
377-
truncated := resourceGroupName
378-
truncated = strings.ReplaceAll(truncated, "(", "-")
379-
truncated = strings.ReplaceAll(truncated, ")", "-")
380-
const maxLen = 63
381-
if len(truncated) > maxLen {
382-
truncated = truncated[0:maxLen]
383-
}
384-
385-
if strings.HasSuffix(truncated, "-") ||
386-
strings.HasSuffix(truncated, "_") ||
387-
strings.HasSuffix(truncated, ".") {
388-
if len(truncated) > 62 {
389-
return truncated[0:len(truncated)-1] + "z"
390-
}
391-
return truncated + "z"
392-
}
393-
return truncated
394-
}
395-
396372
func KubeletConfigToMap(kubeletConfig *KubeletConfiguration) map[string]string {
397373
args := make(map[string]string)
398374

0 commit comments

Comments
 (0)