diff --git a/cmd/compute-domain-controller/cdclique.go b/cmd/compute-domain-controller/cdclique.go index 73cce98dc..2c0c625b9 100644 --- a/cmd/compute-domain-controller/cdclique.go +++ b/cmd/compute-domain-controller/cdclique.go @@ -128,6 +128,13 @@ func (m *ComputeDomainCliqueManager) List() ([]*nvapi.ComputeDomainClique, error return m.lister.ComputeDomainCliques(m.config.driverNamespace).List(labels.Everything()) } +// GetLive returns the ComputeDomainClique from the API server (not the informer cache). +// Callers that perform read-modify-write updates should use this before Update to reduce +// stale resourceVersion conflicts. +func (m *ComputeDomainCliqueManager) GetLive(ctx context.Context, name string) (*nvapi.ComputeDomainClique, error) { + return m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.driverNamespace).Get(ctx, name, metav1.GetOptions{}) +} + // Update updates a ComputeDomainClique and caches the result in the mutation cache. func (m *ComputeDomainCliqueManager) Update(ctx context.Context, clique *nvapi.ComputeDomainClique) (*nvapi.ComputeDomainClique, error) { updatedClique, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(clique.Namespace).Update(ctx, clique, metav1.UpdateOptions{}) diff --git a/cmd/compute-domain-controller/cdstatus.go b/cmd/compute-domain-controller/cdstatus.go index 94e71a884..c9fc22c4c 100644 --- a/cmd/compute-domain-controller/cdstatus.go +++ b/cmd/compute-domain-controller/cdstatus.go @@ -21,10 +21,12 @@ import ( "fmt" "maps" "slices" + "strings" "sync" "time" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/util/retry" "k8s.io/klog/v2" nvapi "sigs.k8s.io/dra-driver-nvidia-gpu/api/nvidia.com/resource/v1beta1" @@ -46,12 +48,13 @@ type ComputeDomainStatusManager struct { cliqueManager *ComputeDomainCliqueManager podManager *DaemonSetPodManager + getComputeDomain GetComputeDomainFunc listComputeDomains ListComputeDomainsFunc updateComputeDomainStatus UpdateComputeDomainStatusFunc } // NewComputeDomainStatusManager creates a new ComputeDomainStatusManager. -func NewComputeDomainStatusManager(config *ManagerConfig, listComputeDomains ListComputeDomainsFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *ComputeDomainStatusManager { +func NewComputeDomainStatusManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, listComputeDomains ListComputeDomainsFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *ComputeDomainStatusManager { // Create cliqueManager if feature gate is enabled var cliqueManager *ComputeDomainCliqueManager if featuregates.Enabled(featuregates.ComputeDomainCliques) { @@ -65,6 +68,7 @@ func NewComputeDomainStatusManager(config *ManagerConfig, listComputeDomains Lis config: config, cliqueManager: cliqueManager, podManager: podManager, + getComputeDomain: getComputeDomain, listComputeDomains: listComputeDomains, updateComputeDomainStatus: updateComputeDomainStatus, } @@ -155,14 +159,9 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) { klog.Errorf("CDStatusSync: error listing cliques: %v", err) return } - - // Clean up stale entries from cliques in parallel - for _, clique := range cliques { - go m.cleanupClique(ctx, clique, pods) - } } - // Group cliques by CD UID + // Group cliques by CD UID (used for status sync and per-clique daemon cleanup) cliquesByCD := make(map[string][]*nvapi.ComputeDomainClique) for _, clique := range cliques { cdUID := clique.Labels[computeDomainLabelKey] @@ -172,6 +171,17 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) { cliquesByCD[cdUID] = append(cliquesByCD[cdUID], clique) } + if m.cliqueManager != nil { + // Clean up stale entries from cliques in parallel (pods scoped per clique) + for _, clique := range cliques { + cdUID := clique.Labels[computeDomainLabelKey] + if cdUID == "" { + continue + } + go m.cleanupClique(ctx, clique, pods, len(cliquesByCD[cdUID])) + } + } + // Group pods by CD UID and type (fabric-attached vs non-fabric-attached) fabricPodsByCD := make(map[string][]*corev1.Pod) nonFabricPodsByCD := make(map[string][]*corev1.Pod) @@ -206,6 +216,15 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) { // syncCD synchronizes node information to a single ComputeDomain's status. func (m *ComputeDomainStatusManager) syncCD(ctx context.Context, cd *nvapi.ComputeDomain, cliques []*nvapi.ComputeDomainClique, fabricPods []*corev1.Pod, nonFabricPods []*corev1.Pod) { + latestCD, err := m.getComputeDomain(string(cd.UID)) + if err != nil { + klog.Errorf("CDStatusSync: error getting ComputeDomain %s: %v", cd.Name, err) + return + } + if latestCD == nil { + return + } + var fabricNodes, nonFabricNodes, newNodes []*nvapi.ComputeDomainNode if m.cliqueManager != nil { @@ -215,27 +234,26 @@ func (m *ComputeDomainStatusManager) syncCD(ctx context.Context, cd *nvapi.Compu newNodes = slices.Concat(fabricNodes, nonFabricNodes) } else { // Feature gate disabled: filter stale fabric nodes + rebuild non-fabric nodes - fabricNodes = m.getNonStaleFabricNodes(cd.Status.Nodes, fabricPods) + fabricNodes = m.getNonStaleFabricNodes(latestCD.Status.Nodes, fabricPods) nonFabricNodes = m.buildNodesFromPods(nonFabricPods) newNodes = slices.Concat(fabricNodes, nonFabricNodes) } - // Check if update is needed - if m.nodesEqual(cd.Status.Nodes, newNodes) { + if m.nodesEqual(latestCD.Status.Nodes, newNodes) { return } - klog.V(6).Infof("CDStatusSync: syncing ComputeDomain %s/%s: fabric=%d non-fabric=%d", cd.Namespace, cd.Name, len(fabricNodes), len(nonFabricNodes)) + klog.V(6).Infof("CDStatusSync: syncing ComputeDomain %s/%s: fabric=%d non-fabric=%d", latestCD.Namespace, latestCD.Name, len(fabricNodes), len(nonFabricNodes)) - // Update status - newCD := cd.DeepCopy() + // Update status (use latest object for resourceVersion) + newCD := latestCD.DeepCopy() newCD.Status.Nodes = newNodes if _, err := m.updateComputeDomainStatus(ctx, newCD); err != nil { - klog.Errorf("CDStatusSync: error updating ComputeDomain %s status: %v", cd.Name, err) + klog.Errorf("CDStatusSync: error updating ComputeDomain %s status: %v", latestCD.Name, err) return } - klog.V(4).Infof("CDStatusSync: updated ComputeDomain %s/%s: total nodes=%d", cd.Namespace, cd.Name, len(newNodes)) + klog.V(4).Infof("CDStatusSync: updated ComputeDomain %s/%s: total nodes=%d", latestCD.Namespace, latestCD.Name, len(newNodes)) } // buildNodesFromCliques builds a nodes list from fabric-attached cliques. @@ -282,44 +300,135 @@ func (m *ComputeDomainStatusManager) buildNodesFromPods(pods []*corev1.Pod) []*n return nodes } -// cleanupClique removes stale daemon entries from a single clique. -func (m *ComputeDomainStatusManager) cleanupClique(ctx context.Context, clique *nvapi.ComputeDomainClique, pods []*corev1.Pod) { - // Build set of node names that have running daemon pods - runningNodes := make(map[string]struct{}) - for _, pod := range pods { - if pod.Spec.NodeName != "" { - runningNodes[pod.Spec.NodeName] = struct{}{} - } +// fabricCliqueIDFromClique returns the fabric clique ID from object labels, or from +// metadata name "." when the clique ID label is unset. +func fabricCliqueIDFromClique(clique *nvapi.ComputeDomainClique) string { + if clique == nil { + return "" + } + if id := clique.Labels[computeDomainCliqueLabelKey]; id != "" { + return id + } + cdUID := clique.Labels[computeDomainLabelKey] + if cdUID == "" { + return "" } + prefix := cdUID + "." + if strings.HasPrefix(clique.Name, prefix) { + return strings.TrimPrefix(clique.Name, prefix) + } + return "" +} - var updatedDaemons []*nvapi.ComputeDomainDaemonInfo - var removedNodes []string +// podCountsForCliqueFabricDaemon is true when this pod is the fabric-attached daemon +// for the same ComputeDomain and fabric clique as clique. Non-fabric pods (explicit +// empty clique label) are excluded. Pods without a clique label only match when the +// CD has a single fabric clique so attribution is unambiguous. +func podCountsForCliqueFabricDaemon(pod *corev1.Pod, clique *nvapi.ComputeDomainClique, fabricCliqueCountForCD int) bool { + if pod == nil || clique == nil { + return false + } + cdUID := clique.Labels[computeDomainLabelKey] + if cdUID == "" || pod.Labels[computeDomainLabelKey] != cdUID { + return false + } + podCliqueID, podHasCliqueLabel := pod.Labels[computeDomainCliqueLabelKey] + if podHasCliqueLabel && podCliqueID == "" { + return false + } + expected := fabricCliqueIDFromClique(clique) + if expected == "" { + return false + } + if podHasCliqueLabel && podCliqueID != "" { + return podCliqueID == expected + } + return fabricCliqueCountForCD == 1 +} - for _, daemon := range clique.Daemons { - if _, exists := runningNodes[daemon.NodeName]; exists { - updatedDaemons = append(updatedDaemons, daemon) - } else { - removedNodes = append(removedNodes, daemon.NodeName) - } +// cleanupClique removes stale daemon entries from a single clique. +func (m *ComputeDomainStatusManager) cleanupClique(ctx context.Context, clique *nvapi.ComputeDomainClique, pods []*corev1.Pod, fabricCliqueCountForCD int) { + cdUID := clique.Labels[computeDomainLabelKey] + cliqueID := fabricCliqueIDFromClique(clique) + if cdUID == "" || cliqueID == "" { + return } - // Nothing to clean up - if len(removedNodes) == 0 { + ns, name := clique.Namespace, clique.Name + if ns == "" || name == "" { return } - klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", clique.Namespace, clique.Name, removedNodes) + // Quick exit if cache already matches desired state (avoids live Get on every tick). + if cached := m.cliqueManager.Get(cdUID, cliqueID); cached != nil { + if running := runningFabricNodesForClique(pods, cached, fabricCliqueCountForCD); daemonsEqual(cached.Daemons, filterDaemonsByRunningNodes(cached.Daemons, running)) { + return + } + } - // Update the clique with the filtered daemon list - newClique := clique.DeepCopy() - newClique.Daemons = updatedDaemons + var removedLogged bool + var lastRemoved []string + var updateSucceeded bool + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + live, err := m.cliqueManager.GetLive(ctx, name) + if err != nil { + return err + } + runningNodes := runningFabricNodesForClique(pods, live, fabricCliqueCountForCD) + updatedDaemons := filterDaemonsByRunningNodes(live.Daemons, runningNodes) + if daemonsEqual(live.Daemons, updatedDaemons) { + return nil + } + var removedNodes []string + for _, daemon := range live.Daemons { + if _, exists := runningNodes[daemon.NodeName]; !exists { + removedNodes = append(removedNodes, daemon.NodeName) + } + } + if !removedLogged { + klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", ns, name, removedNodes) + removedLogged = true + } + lastRemoved = removedNodes - if _, err := m.cliqueManager.Update(ctx, newClique); err != nil { - klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", clique.Namespace, clique.Name, err) + newClique := live.DeepCopy() + newClique.Daemons = updatedDaemons + _, err = m.cliqueManager.Update(ctx, newClique) + if err == nil { + updateSucceeded = true + } + return err + }) + if err != nil { + klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", ns, name, err) return } + if updateSucceeded { + klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(lastRemoved), ns, name) + } +} - klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(removedNodes), clique.Namespace, clique.Name) +func runningFabricNodesForClique(pods []*corev1.Pod, clique *nvapi.ComputeDomainClique, fabricCliqueCountForCD int) map[string]struct{} { + runningNodes := make(map[string]struct{}) + for _, pod := range pods { + if pod.Spec.NodeName == "" { + continue + } + if podCountsForCliqueFabricDaemon(pod, clique, fabricCliqueCountForCD) { + runningNodes[pod.Spec.NodeName] = struct{}{} + } + } + return runningNodes +} + +func filterDaemonsByRunningNodes(daemons []*nvapi.ComputeDomainDaemonInfo, runningNodes map[string]struct{}) []*nvapi.ComputeDomainDaemonInfo { + var out []*nvapi.ComputeDomainDaemonInfo + for _, daemon := range daemons { + if _, exists := runningNodes[daemon.NodeName]; exists { + out = append(out, daemon) + } + } + return out } // filterStaleNodes removes nodes from CD status if their pod no longer exists. @@ -351,6 +460,23 @@ func (m *ComputeDomainStatusManager) getNonStaleFabricNodes(existingNodes []*nva return result } +// daemonsEqual checks if two daemon slices are semantically equal (per nodeName key). +func daemonsEqual(a, b []*nvapi.ComputeDomainDaemonInfo) bool { + aMap := make(map[string]nvapi.ComputeDomainDaemonInfo) + for _, d := range a { + if d != nil { + aMap[d.NodeName] = *d + } + } + bMap := make(map[string]nvapi.ComputeDomainDaemonInfo) + for _, d := range b { + if d != nil { + bMap[d.NodeName] = *d + } + } + return maps.Equal(aMap, bMap) +} + // nodesEqual checks if two slices of ComputeDomainNode are equal. func (m *ComputeDomainStatusManager) nodesEqual(a, b []*nvapi.ComputeDomainNode) bool { aMap := make(map[string]nvapi.ComputeDomainNode) diff --git a/cmd/compute-domain-controller/daemonset.go b/cmd/compute-domain-controller/daemonset.go index 4a3aeae97..db8c71db5 100644 --- a/cmd/compute-domain-controller/daemonset.go +++ b/cmd/compute-domain-controller/daemonset.go @@ -105,7 +105,7 @@ func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomai // Create ComputeDomainStatusManager to sync node info to CD status // - When feature gate ON: syncs from CDCliques + non-fabric-attached pods // - When feature gate OFF: syncs from non-fabric-attached pods + handles deletions - m.cdStatusManager = NewComputeDomainStatusManager(config, listComputeDomains, updateComputeDomainStatus) + m.cdStatusManager = NewComputeDomainStatusManager(config, getComputeDomain, listComputeDomains, updateComputeDomainStatus) m.cleanupManager = NewCleanupManager[*appsv1.DaemonSet](informer, getComputeDomain, m.cleanup) diff --git a/cmd/compute-domain-daemon/cdclique.go b/cmd/compute-domain-daemon/cdclique.go index f0b9461b7..18d0e10ea 100644 --- a/cmd/compute-domain-daemon/cdclique.go +++ b/cmd/compute-domain-daemon/cdclique.go @@ -18,20 +18,74 @@ package main import ( "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" "fmt" "maps" "sync" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/ptr" nvapi "sigs.k8s.io/dra-driver-nvidia-gpu/api/nvidia.com/resource/v1beta1" nvinformers "sigs.k8s.io/dra-driver-nvidia-gpu/pkg/nvidia.com/informers/externalversions" ) +// cdDaemonRowFieldManager identifies this node's SSA ownership of its entry in +// CDClique.spec.daemons (list map key nodeName). Must stay ≤128 printable chars. +func (m *ComputeDomainCliqueManager) cdDaemonRowFieldManager() string { + const p = "compute-domain-daemon:" + node := m.config.nodeName + if len(p)+len(node) <= 128 { + return p + node + } + sum := sha256.Sum256([]byte(node)) + return p + hex.EncodeToString(sum[:32]) +} + +func (m *ComputeDomainCliqueManager) cdDaemonOwnerFieldManager() string { + const p = "compute-domain-daemon-owner:" + uid := m.config.podUID + if len(p)+len(uid) <= 128 { + return p + uid + } + sum := sha256.Sum256([]byte(uid)) + return p + hex.EncodeToString(sum[:32]) +} + +func (m *ComputeDomainCliqueManager) patchCDCliqueSSA(ctx context.Context, patch map[string]any, fieldManager string) (*nvapi.ComputeDomainClique, error) { + patch["apiVersion"] = nvapi.SchemeGroupVersion.String() + patch["kind"] = nvapi.ComputeDomainCliqueKind + data, err := json.Marshal(patch) + if err != nil { + return nil, fmt.Errorf("marshal CDClique SSA patch: %w", err) + } + opts := metav1.PatchOptions{ + FieldManager: fieldManager, + Force: ptr.To(true), + } + return m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace). + Patch(ctx, m.cliqueName(), types.ApplyPatchType, data, opts) +} + +func (m *ComputeDomainCliqueManager) hasOwnerReferenceForPod(cd *nvapi.ComputeDomainClique) bool { + if cd == nil { + return false + } + for _, ref := range cd.OwnerReferences { + if string(ref.UID) == m.config.podUID { + return true + } + } + return false +} + // ComputeDomainCliqueManager watches ComputeDomainClique objects and updates them with // info about the ComputeDomain daemon running on this node. This is an alternative // to ComputeDomainStatusManager that works directly with CDClique objects instead @@ -218,6 +272,10 @@ func (m *ComputeDomainCliqueManager) ensureCliqueExists(ctx context.Context) err createdClique, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Create(ctx, newClique, metav1.CreateOptions{}) if err != nil { + if apierrors.IsAlreadyExists(err) { + klog.Infof("CDClique '%s' already exists (concurrent create)", m.cliqueName()) + return nil + } return fmt.Errorf("failed to create CDClique '%s': %w", m.cliqueName(), err) } m.mutationCache.Mutation(createdClique) @@ -319,14 +377,41 @@ func (m *ComputeDomainCliqueManager) syncDaemonInfoToClique(ctx context.Context, // across pod restarts. myDaemon.IPAddress = m.config.podIP - // Ensure this pod is an owner of the clique - m.ensureOwnerReference(newClique) - - // Update the clique and (upon success) store the latest version of the object - // (as returned by the API server) in the mutation cache. - newClique, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Update(ctx, newClique, metav1.UpdateOptions{}) + row := map[string]any{ + "nodeName": myDaemon.NodeName, + "ipAddress": myDaemon.IPAddress, + "cliqueID": myDaemon.CliqueID, + "index": myDaemon.Index, + "status": myDaemon.Status, + } + daemonPatch := map[string]any{ + "metadata": map[string]any{ + "name": m.cliqueName(), + "namespace": m.config.podNamespace, + }, + "daemons": []any{row}, + } + newClique, err := m.patchCDCliqueSSA(ctx, daemonPatch, m.cdDaemonRowFieldManager()) if err != nil { - return nil, fmt.Errorf("error updating CDClique: %w", err) + return nil, fmt.Errorf("error applying CDClique daemon row (SSA): %w", err) + } + if !m.hasOwnerReferenceForPod(newClique) { + ownerPatch := map[string]any{ + "metadata": map[string]any{ + "name": m.cliqueName(), + "namespace": m.config.podNamespace, + "ownerReferences": []any{map[string]any{ + "apiVersion": "v1", + "kind": "Pod", + "name": m.config.podName, + "uid": m.config.podUID, + }}, + }, + } + newClique, err = m.patchCDCliqueSSA(ctx, ownerPatch, m.cdDaemonOwnerFieldManager()) + if err != nil { + return nil, fmt.Errorf("error applying CDClique owner reference (SSA): %w", err) + } } m.mutationCache.Mutation(newClique) @@ -381,28 +466,64 @@ func (m *ComputeDomainCliqueManager) removeDaemonInfoFromClique(ctx context.Cont return nil } - // Create a deep copy and filter out the daemon with the current pod's IP address - newClique := clique.DeepCopy() - var newDaemons []*nvapi.ComputeDomainDaemonInfo - for _, d := range newClique.Daemons { - if d.IPAddress != m.config.podIP { - newDaemons = append(newDaemons, d) - } + removePatch := map[string]any{ + "metadata": map[string]any{ + "name": m.cliqueName(), + "namespace": m.config.podNamespace, + }, + "daemons": []any{}, } - newClique.Daemons = newDaemons - - // Update the clique and (upon success) store the latest version of the object - // (as returned by the API server) in the mutation cache. - newClique, err = m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Update(ctx, newClique, metav1.UpdateOptions{}) + newClique, err := m.patchCDCliqueSSA(ctx, removePatch, m.cdDaemonRowFieldManager()) if err != nil { - return fmt.Errorf("error updating CDClique: %w", err) + return fmt.Errorf("error applying CDClique daemon removal (SSA): %w", err) + } + // Rows created before SSA (client Update) are not owned by our field manager; fall back once. + if m.cliqueStillListsThisNode(newClique) { + newClique, err = m.removeDaemonRowByUpdate(ctx) + if err != nil { + return err + } } m.mutationCache.Mutation(newClique) - klog.Infof("Successfully removed daemon with IP %s from CDClique %s (from ComputeDomain %s/%s)", m.config.podIP, m.cliqueName(), m.config.computeDomainNamespace, m.config.computeDomainName) + klog.Infof("Successfully removed daemon for node %s from CDClique %s (from ComputeDomain %s/%s)", m.config.nodeName, m.cliqueName(), m.config.computeDomainNamespace, m.config.computeDomainName) return nil } +func (m *ComputeDomainCliqueManager) cliqueStillListsThisNode(cd *nvapi.ComputeDomainClique) bool { + if cd == nil { + return false + } + for _, d := range cd.Daemons { + if d.NodeName == m.config.nodeName { + return true + } + } + return false +} + +// removeDaemonRowByUpdate drops this node's daemon row using a live read and +// optimistic Update (legacy rows not owned by our SSA field manager). +func (m *ComputeDomainCliqueManager) removeDaemonRowByUpdate(ctx context.Context) (*nvapi.ComputeDomainClique, error) { + live, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Get(ctx, m.cliqueName(), metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get CDClique for legacy daemon removal: %w", err) + } + updated := live.DeepCopy() + var kept []*nvapi.ComputeDomainDaemonInfo + for _, d := range updated.Daemons { + if d.NodeName != m.config.nodeName { + kept = append(kept, d) + } + } + updated.Daemons = kept + out, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Update(ctx, updated, metav1.UpdateOptions{}) + if err != nil { + return nil, fmt.Errorf("error updating CDClique (legacy daemon removal): %w", err) + } + return out, nil +} + // If there was actually a change compared to the previously known set of // daemons: pass info to IMEX daemon controller. func (m *ComputeDomainCliqueManager) maybePushDaemonsUpdate(clique *nvapi.ComputeDomainClique) { @@ -464,11 +585,23 @@ func (m *ComputeDomainCliqueManager) updateDaemonStatus(ctx context.Context, rea // Update the status myDaemon.Status = status - // Update the clique and (upon success) store the latest version of the object - // (as returned by the API server) in the mutation cache. - newClique, err = m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.podNamespace).Update(ctx, newClique, metav1.UpdateOptions{}) + row := map[string]any{ + "nodeName": myDaemon.NodeName, + "ipAddress": myDaemon.IPAddress, + "cliqueID": myDaemon.CliqueID, + "index": myDaemon.Index, + "status": myDaemon.Status, + } + daemonPatch := map[string]any{ + "metadata": map[string]any{ + "name": m.cliqueName(), + "namespace": m.config.podNamespace, + }, + "daemons": []any{row}, + } + newClique, err = m.patchCDCliqueSSA(ctx, daemonPatch, m.cdDaemonRowFieldManager()) if err != nil { - return fmt.Errorf("error updating CDClique: %w", err) + return fmt.Errorf("error applying CDClique daemon status (SSA): %w", err) } m.mutationCache.Mutation(newClique)