Skip to content

Commit e9f1da5

Browse files
committed
Use retryOnConflict during CDClique cleanup logic and avoid informer cache for stale data
Signed-off-by: Shiva Krishna, Merla <smerla@nvidia.com>
1 parent 0f15809 commit e9f1da5

2 files changed

Lines changed: 69 additions & 29 deletions

File tree

cmd/compute-domain-controller/cdclique.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,13 @@ func (m *ComputeDomainCliqueManager) List() ([]*nvapi.ComputeDomainClique, error
128128
return m.lister.ComputeDomainCliques(m.config.driverNamespace).List(labels.Everything())
129129
}
130130

131+
// GetLive returns the ComputeDomainClique from the API server (not the informer cache).
132+
// Callers that perform read-modify-write updates should use this before Update to reduce
133+
// stale resourceVersion conflicts.
134+
func (m *ComputeDomainCliqueManager) GetLive(ctx context.Context, name string) (*nvapi.ComputeDomainClique, error) {
135+
return m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.driverNamespace).Get(ctx, name, metav1.GetOptions{})
136+
}
137+
131138
// Update updates a ComputeDomainClique and caches the result in the mutation cache.
132139
func (m *ComputeDomainCliqueManager) Update(ctx context.Context, clique *nvapi.ComputeDomainClique) (*nvapi.ComputeDomainClique, error) {
133140
updatedClique, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(clique.Namespace).Update(ctx, clique, metav1.UpdateOptions{})

cmd/compute-domain-controller/cdstatus.go

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"time"
2727

2828
corev1 "k8s.io/api/core/v1"
29+
"k8s.io/client-go/util/retry"
2930
"k8s.io/klog/v2"
3031

3132
nvapi "sigs.k8s.io/dra-driver-nvidia-gpu/api/nvidia.com/resource/v1beta1"
@@ -353,49 +354,81 @@ func (m *ComputeDomainStatusManager) cleanupClique(ctx context.Context, clique *
353354
return
354355
}
355356

356-
latest := m.cliqueManager.Get(cdUID, cliqueID)
357-
if latest == nil {
358-
latest = clique
357+
ns, name := clique.Namespace, clique.Name
358+
if ns == "" || name == "" {
359+
return
360+
}
361+
362+
// Quick exit if cache already matches desired state (avoids live Get on every tick).
363+
if cached := m.cliqueManager.Get(cdUID, cliqueID); cached != nil {
364+
if running := runningFabricNodesForClique(pods, cached, fabricCliqueCountForCD); daemonsEqual(cached.Daemons, filterDaemonsByRunningNodes(cached.Daemons, running)) {
365+
return
366+
}
359367
}
360368

361-
// Build set of node names that have a running fabric daemon pod for this clique.
369+
var removedLogged bool
370+
var lastRemoved []string
371+
var updateSucceeded bool
372+
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
373+
live, err := m.cliqueManager.GetLive(ctx, name)
374+
if err != nil {
375+
return err
376+
}
377+
runningNodes := runningFabricNodesForClique(pods, live, fabricCliqueCountForCD)
378+
updatedDaemons := filterDaemonsByRunningNodes(live.Daemons, runningNodes)
379+
if daemonsEqual(live.Daemons, updatedDaemons) {
380+
return nil
381+
}
382+
var removedNodes []string
383+
for _, daemon := range live.Daemons {
384+
if _, exists := runningNodes[daemon.NodeName]; !exists {
385+
removedNodes = append(removedNodes, daemon.NodeName)
386+
}
387+
}
388+
if !removedLogged {
389+
klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", ns, name, removedNodes)
390+
removedLogged = true
391+
}
392+
lastRemoved = removedNodes
393+
394+
newClique := live.DeepCopy()
395+
newClique.Daemons = updatedDaemons
396+
_, err = m.cliqueManager.Update(ctx, newClique)
397+
if err == nil {
398+
updateSucceeded = true
399+
}
400+
return err
401+
})
402+
if err != nil {
403+
klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", ns, name, err)
404+
return
405+
}
406+
if updateSucceeded {
407+
klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(lastRemoved), ns, name)
408+
}
409+
}
410+
411+
func runningFabricNodesForClique(pods []*corev1.Pod, clique *nvapi.ComputeDomainClique, fabricCliqueCountForCD int) map[string]struct{} {
362412
runningNodes := make(map[string]struct{})
363413
for _, pod := range pods {
364414
if pod.Spec.NodeName == "" {
365415
continue
366416
}
367-
if podCountsForCliqueFabricDaemon(pod, latest, fabricCliqueCountForCD) {
417+
if podCountsForCliqueFabricDaemon(pod, clique, fabricCliqueCountForCD) {
368418
runningNodes[pod.Spec.NodeName] = struct{}{}
369419
}
370420
}
421+
return runningNodes
422+
}
371423

372-
var updatedDaemons []*nvapi.ComputeDomainDaemonInfo
373-
var removedNodes []string
374-
375-
for _, daemon := range latest.Daemons {
424+
func filterDaemonsByRunningNodes(daemons []*nvapi.ComputeDomainDaemonInfo, runningNodes map[string]struct{}) []*nvapi.ComputeDomainDaemonInfo {
425+
var out []*nvapi.ComputeDomainDaemonInfo
426+
for _, daemon := range daemons {
376427
if _, exists := runningNodes[daemon.NodeName]; exists {
377-
updatedDaemons = append(updatedDaemons, daemon)
378-
} else {
379-
removedNodes = append(removedNodes, daemon.NodeName)
428+
out = append(out, daemon)
380429
}
381430
}
382-
383-
if daemonsEqual(latest.Daemons, updatedDaemons) {
384-
return
385-
}
386-
387-
klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", latest.Namespace, latest.Name, removedNodes)
388-
389-
// Update the clique with the filtered daemon list (use latest for resourceVersion)
390-
newClique := latest.DeepCopy()
391-
newClique.Daemons = updatedDaemons
392-
393-
if _, err := m.cliqueManager.Update(ctx, newClique); err != nil {
394-
klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", latest.Namespace, latest.Name, err)
395-
return
396-
}
397-
398-
klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(removedNodes), latest.Namespace, latest.Name)
431+
return out
399432
}
400433

401434
// filterStaleNodes removes nodes from CD status if their pod no longer exists.

0 commit comments

Comments
 (0)