Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/compute-domain-controller/cdclique.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ func (m *ComputeDomainCliqueManager) List() ([]*nvapi.ComputeDomainClique, error
return m.lister.ComputeDomainCliques(m.config.driverNamespace).List(labels.Everything())
}

// GetLive returns the ComputeDomainClique from the API server (not the informer cache).
// Callers that perform read-modify-write updates should use this before Update to reduce
// stale resourceVersion conflicts.
func (m *ComputeDomainCliqueManager) GetLive(ctx context.Context, name string) (*nvapi.ComputeDomainClique, error) {
return m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(m.config.driverNamespace).Get(ctx, name, metav1.GetOptions{})
}

// Update updates a ComputeDomainClique and caches the result in the mutation cache.
func (m *ComputeDomainCliqueManager) Update(ctx context.Context, clique *nvapi.ComputeDomainClique) (*nvapi.ComputeDomainClique, error) {
updatedClique, err := m.config.clientsets.Nvidia.ResourceV1beta1().ComputeDomainCliques(clique.Namespace).Update(ctx, clique, metav1.UpdateOptions{})
Expand Down
206 changes: 166 additions & 40 deletions cmd/compute-domain-controller/cdstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"fmt"
"maps"
"slices"
"strings"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"

nvapi "sigs.k8s.io/dra-driver-nvidia-gpu/api/nvidia.com/resource/v1beta1"
Expand All @@ -46,12 +48,13 @@ type ComputeDomainStatusManager struct {
cliqueManager *ComputeDomainCliqueManager
podManager *DaemonSetPodManager

getComputeDomain GetComputeDomainFunc
listComputeDomains ListComputeDomainsFunc
updateComputeDomainStatus UpdateComputeDomainStatusFunc
}

// NewComputeDomainStatusManager creates a new ComputeDomainStatusManager.
func NewComputeDomainStatusManager(config *ManagerConfig, listComputeDomains ListComputeDomainsFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *ComputeDomainStatusManager {
func NewComputeDomainStatusManager(config *ManagerConfig, getComputeDomain GetComputeDomainFunc, listComputeDomains ListComputeDomainsFunc, updateComputeDomainStatus UpdateComputeDomainStatusFunc) *ComputeDomainStatusManager {
// Create cliqueManager if feature gate is enabled
var cliqueManager *ComputeDomainCliqueManager
if featuregates.Enabled(featuregates.ComputeDomainCliques) {
Expand All @@ -65,6 +68,7 @@ func NewComputeDomainStatusManager(config *ManagerConfig, listComputeDomains Lis
config: config,
cliqueManager: cliqueManager,
podManager: podManager,
getComputeDomain: getComputeDomain,
listComputeDomains: listComputeDomains,
updateComputeDomainStatus: updateComputeDomainStatus,
}
Expand Down Expand Up @@ -155,14 +159,9 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) {
klog.Errorf("CDStatusSync: error listing cliques: %v", err)
return
}

// Clean up stale entries from cliques in parallel
for _, clique := range cliques {
go m.cleanupClique(ctx, clique, pods)
}
}

// Group cliques by CD UID
// Group cliques by CD UID (used for status sync and per-clique daemon cleanup)
cliquesByCD := make(map[string][]*nvapi.ComputeDomainClique)
for _, clique := range cliques {
cdUID := clique.Labels[computeDomainLabelKey]
Expand All @@ -172,6 +171,17 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) {
cliquesByCD[cdUID] = append(cliquesByCD[cdUID], clique)
}

if m.cliqueManager != nil {
// Clean up stale entries from cliques in parallel (pods scoped per clique)
for _, clique := range cliques {
cdUID := clique.Labels[computeDomainLabelKey]
if cdUID == "" {
continue
}
go m.cleanupClique(ctx, clique, pods, len(cliquesByCD[cdUID]))
}
}

// Group pods by CD UID and type (fabric-attached vs non-fabric-attached)
fabricPodsByCD := make(map[string][]*corev1.Pod)
nonFabricPodsByCD := make(map[string][]*corev1.Pod)
Expand Down Expand Up @@ -206,6 +216,15 @@ func (m *ComputeDomainStatusManager) sync(ctx context.Context) {

// syncCD synchronizes node information to a single ComputeDomain's status.
func (m *ComputeDomainStatusManager) syncCD(ctx context.Context, cd *nvapi.ComputeDomain, cliques []*nvapi.ComputeDomainClique, fabricPods []*corev1.Pod, nonFabricPods []*corev1.Pod) {
latestCD, err := m.getComputeDomain(string(cd.UID))
if err != nil {
klog.Errorf("CDStatusSync: error getting ComputeDomain %s: %v", cd.Name, err)
return
}
if latestCD == nil {
return
}

var fabricNodes, nonFabricNodes, newNodes []*nvapi.ComputeDomainNode

if m.cliqueManager != nil {
Expand All @@ -215,27 +234,26 @@ func (m *ComputeDomainStatusManager) syncCD(ctx context.Context, cd *nvapi.Compu
newNodes = slices.Concat(fabricNodes, nonFabricNodes)
} else {
// Feature gate disabled: filter stale fabric nodes + rebuild non-fabric nodes
fabricNodes = m.getNonStaleFabricNodes(cd.Status.Nodes, fabricPods)
fabricNodes = m.getNonStaleFabricNodes(latestCD.Status.Nodes, fabricPods)
nonFabricNodes = m.buildNodesFromPods(nonFabricPods)
newNodes = slices.Concat(fabricNodes, nonFabricNodes)
}

// Check if update is needed
if m.nodesEqual(cd.Status.Nodes, newNodes) {
if m.nodesEqual(latestCD.Status.Nodes, newNodes) {
return
}

klog.V(6).Infof("CDStatusSync: syncing ComputeDomain %s/%s: fabric=%d non-fabric=%d", cd.Namespace, cd.Name, len(fabricNodes), len(nonFabricNodes))
klog.V(6).Infof("CDStatusSync: syncing ComputeDomain %s/%s: fabric=%d non-fabric=%d", latestCD.Namespace, latestCD.Name, len(fabricNodes), len(nonFabricNodes))

// Update status
newCD := cd.DeepCopy()
// Update status (use latest object for resourceVersion)
newCD := latestCD.DeepCopy()
newCD.Status.Nodes = newNodes
if _, err := m.updateComputeDomainStatus(ctx, newCD); err != nil {
klog.Errorf("CDStatusSync: error updating ComputeDomain %s status: %v", cd.Name, err)
klog.Errorf("CDStatusSync: error updating ComputeDomain %s status: %v", latestCD.Name, err)
return
}

klog.V(4).Infof("CDStatusSync: updated ComputeDomain %s/%s: total nodes=%d", cd.Namespace, cd.Name, len(newNodes))
klog.V(4).Infof("CDStatusSync: updated ComputeDomain %s/%s: total nodes=%d", latestCD.Namespace, latestCD.Name, len(newNodes))
}

// buildNodesFromCliques builds a nodes list from fabric-attached cliques.
Expand Down Expand Up @@ -282,44 +300,135 @@ func (m *ComputeDomainStatusManager) buildNodesFromPods(pods []*corev1.Pod) []*n
return nodes
}

// cleanupClique removes stale daemon entries from a single clique.
func (m *ComputeDomainStatusManager) cleanupClique(ctx context.Context, clique *nvapi.ComputeDomainClique, pods []*corev1.Pod) {
// Build set of node names that have running daemon pods
runningNodes := make(map[string]struct{})
for _, pod := range pods {
if pod.Spec.NodeName != "" {
runningNodes[pod.Spec.NodeName] = struct{}{}
}
// fabricCliqueIDFromClique returns the fabric clique ID from object labels, or from
// metadata name "<computeDomainUID>.<cliqueID>" when the clique ID label is unset.
func fabricCliqueIDFromClique(clique *nvapi.ComputeDomainClique) string {
if clique == nil {
return ""
}
if id := clique.Labels[computeDomainCliqueLabelKey]; id != "" {
return id
}
cdUID := clique.Labels[computeDomainLabelKey]
if cdUID == "" {
return ""
}
prefix := cdUID + "."
if strings.HasPrefix(clique.Name, prefix) {
return strings.TrimPrefix(clique.Name, prefix)
}
return ""
}

var updatedDaemons []*nvapi.ComputeDomainDaemonInfo
var removedNodes []string
// podCountsForCliqueFabricDaemon is true when this pod is the fabric-attached daemon
// for the same ComputeDomain and fabric clique as clique. Non-fabric pods (explicit
// empty clique label) are excluded. Pods without a clique label only match when the
// CD has a single fabric clique so attribution is unambiguous.
func podCountsForCliqueFabricDaemon(pod *corev1.Pod, clique *nvapi.ComputeDomainClique, fabricCliqueCountForCD int) bool {
if pod == nil || clique == nil {
return false
}
cdUID := clique.Labels[computeDomainLabelKey]
if cdUID == "" || pod.Labels[computeDomainLabelKey] != cdUID {
return false
}
podCliqueID, podHasCliqueLabel := pod.Labels[computeDomainCliqueLabelKey]
if podHasCliqueLabel && podCliqueID == "" {
return false
}
expected := fabricCliqueIDFromClique(clique)
if expected == "" {
return false
}
if podHasCliqueLabel && podCliqueID != "" {
return podCliqueID == expected
}
return fabricCliqueCountForCD == 1
}

for _, daemon := range clique.Daemons {
if _, exists := runningNodes[daemon.NodeName]; exists {
updatedDaemons = append(updatedDaemons, daemon)
} else {
removedNodes = append(removedNodes, daemon.NodeName)
}
// cleanupClique removes stale daemon entries from a single clique.
func (m *ComputeDomainStatusManager) cleanupClique(ctx context.Context, clique *nvapi.ComputeDomainClique, pods []*corev1.Pod, fabricCliqueCountForCD int) {
cdUID := clique.Labels[computeDomainLabelKey]
cliqueID := fabricCliqueIDFromClique(clique)
if cdUID == "" || cliqueID == "" {
return
}

// Nothing to clean up
if len(removedNodes) == 0 {
ns, name := clique.Namespace, clique.Name
if ns == "" || name == "" {
return
}

klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", clique.Namespace, clique.Name, removedNodes)
// Quick exit if cache already matches desired state (avoids live Get on every tick).
if cached := m.cliqueManager.Get(cdUID, cliqueID); cached != nil {
if running := runningFabricNodesForClique(pods, cached, fabricCliqueCountForCD); daemonsEqual(cached.Daemons, filterDaemonsByRunningNodes(cached.Daemons, running)) {
return
}
}

// Update the clique with the filtered daemon list
newClique := clique.DeepCopy()
newClique.Daemons = updatedDaemons
var removedLogged bool
var lastRemoved []string
var updateSucceeded bool
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
live, err := m.cliqueManager.GetLive(ctx, name)
if err != nil {
return err
}
runningNodes := runningFabricNodesForClique(pods, live, fabricCliqueCountForCD)
updatedDaemons := filterDaemonsByRunningNodes(live.Daemons, runningNodes)
if daemonsEqual(live.Daemons, updatedDaemons) {
return nil
}
var removedNodes []string
for _, daemon := range live.Daemons {
if _, exists := runningNodes[daemon.NodeName]; !exists {
removedNodes = append(removedNodes, daemon.NodeName)
}
}
if !removedLogged {
klog.Infof("CliqueCleanup: removing stale daemon entries from clique %s/%s: %v", ns, name, removedNodes)
removedLogged = true
}
lastRemoved = removedNodes

if _, err := m.cliqueManager.Update(ctx, newClique); err != nil {
klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", clique.Namespace, clique.Name, err)
newClique := live.DeepCopy()
newClique.Daemons = updatedDaemons
_, err = m.cliqueManager.Update(ctx, newClique)
if err == nil {
updateSucceeded = true
}
return err
})
if err != nil {
klog.Errorf("CliqueCleanup: error updating ComputeDomainClique %s/%s: %v", ns, name, err)
return
}
if updateSucceeded {
klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(lastRemoved), ns, name)
}
}

klog.Infof("CliqueCleanup: successfully removed %d stale daemon entries from clique %s/%s", len(removedNodes), clique.Namespace, clique.Name)
func runningFabricNodesForClique(pods []*corev1.Pod, clique *nvapi.ComputeDomainClique, fabricCliqueCountForCD int) map[string]struct{} {
runningNodes := make(map[string]struct{})
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if podCountsForCliqueFabricDaemon(pod, clique, fabricCliqueCountForCD) {
runningNodes[pod.Spec.NodeName] = struct{}{}
}
}
return runningNodes
}

func filterDaemonsByRunningNodes(daemons []*nvapi.ComputeDomainDaemonInfo, runningNodes map[string]struct{}) []*nvapi.ComputeDomainDaemonInfo {
var out []*nvapi.ComputeDomainDaemonInfo
for _, daemon := range daemons {
if _, exists := runningNodes[daemon.NodeName]; exists {
out = append(out, daemon)
}
}
return out
}

// filterStaleNodes removes nodes from CD status if their pod no longer exists.
Expand Down Expand Up @@ -351,6 +460,23 @@ func (m *ComputeDomainStatusManager) getNonStaleFabricNodes(existingNodes []*nva
return result
}

// daemonsEqual checks if two daemon slices are semantically equal (per nodeName key).
func daemonsEqual(a, b []*nvapi.ComputeDomainDaemonInfo) bool {
aMap := make(map[string]nvapi.ComputeDomainDaemonInfo)
for _, d := range a {
if d != nil {
aMap[d.NodeName] = *d
}
}
bMap := make(map[string]nvapi.ComputeDomainDaemonInfo)
for _, d := range b {
if d != nil {
bMap[d.NodeName] = *d
}
}
return maps.Equal(aMap, bMap)
}

// nodesEqual checks if two slices of ComputeDomainNode are equal.
func (m *ComputeDomainStatusManager) nodesEqual(a, b []*nvapi.ComputeDomainNode) bool {
aMap := make(map[string]nvapi.ComputeDomainNode)
Expand Down
2 changes: 1 addition & 1 deletion cmd/compute-domain-controller/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func NewDaemonSetManager(config *ManagerConfig, getComputeDomain GetComputeDomai
// Create ComputeDomainStatusManager to sync node info to CD status
// - When feature gate ON: syncs from CDCliques + non-fabric-attached pods
// - When feature gate OFF: syncs from non-fabric-attached pods + handles deletions
m.cdStatusManager = NewComputeDomainStatusManager(config, listComputeDomains, updateComputeDomainStatus)
m.cdStatusManager = NewComputeDomainStatusManager(config, getComputeDomain, listComputeDomains, updateComputeDomainStatus)

m.cleanupManager = NewCleanupManager[*appsv1.DaemonSet](informer, getComputeDomain, m.cleanup)

Expand Down
Loading