Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,4 @@ status:
kind: ""
plural: ""
conditions: []
storedVersions: []
storedVersions: []
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ import (
// +kubebuilder:validation:Enum=SupportsPersistentVolumeBlock;SupportsPersistentVolumeFilesystem;SupportsHighPerformanceLinkedClone;SupportsLinkedClone
type VolumeCapability string

const (
// SupportsVolumeModeBlock indicates that the policy supports PersistentVolume with Block volume mode.
SupportsVolumeModeBlock VolumeCapability = "SupportsPersistentVolumeBlock"
// SupportsVolumeModeFilesystem indicates that the policy supports PersistentVolume with Filesystem volume mode.
SupportsVolumeModeFilesystem VolumeCapability = "SupportsPersistentVolumeFilesystem"
// SupportsHighPerformanceLinkedClone indicates that the policy supports high-performance linked clones
// on vSAN ESA clusters with ESXi 9.1 or above hosts.
SupportsHighPerformanceLinkedClone VolumeCapability = "SupportsHighPerformanceLinkedClone"
// SupportsLinkedClone indicates that the policy supports linked clones.
SupportsLinkedClone VolumeCapability = "SupportsLinkedClone"
)

// Topology describes topology accessibility for the storage policy within the cluster.
type Topology struct {

Expand Down
3 changes: 3 additions & 0 deletions pkg/csi/service/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ const (
// StorageClassVsanFileServicePolicyLateBinding is the supervisor StorageClass for vSAN file service (late binding).
StorageClassVsanFileServicePolicyLateBinding = "vsan-file-service-policy-latebinding"

// StoragePolicyMarkerVsanFileService is the k8scompliantname for the vSAN file service marker policy.
StoragePolicyMarkerVsanFileService = "vsan-file-service-policy"

// FVSVolumeIDPrefix is the CSI volume ID prefix for the FVS FileVolume CR workflow
// (fv:<instance-namespace>:<filevolume-name>).
FVSVolumeIDPrefix = "fv:"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import (
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/csi/service/logger"
k8s "sigs.k8s.io/vsphere-csi-driver/v3/pkg/kubernetes"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/types"
"sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
cnsoperatorutil "sigs.k8s.io/vsphere-csi-driver/v3/pkg/syncer/cnsoperator/util"
)

// backOffDuration is a map of ClusterStoragePolicyInfo names to the time after
Expand Down Expand Up @@ -132,7 +132,7 @@ func newReconciler(mgr manager.Manager, configInfo *config.ConfigurationInfo,

func add(mgr manager.Manager, r *ReconcileClusterStoragePolicyInfo) error {
ctx, log := logger.GetNewContextWithLogger()
maxWorkerThreads := util.GetMaxWorkerThreads(ctx, workerThreadsEnvVar, defaultMaxWorkerThreads)
maxWorkerThreads := cnsoperatorutil.GetMaxWorkerThreads(ctx, workerThreadsEnvVar, defaultMaxWorkerThreads)
scVacPredicates := predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return e.Object != nil
Expand Down Expand Up @@ -650,35 +650,53 @@ func (r *ReconcileClusterStoragePolicyInfo) syncInfraSPIAttributes(ctx context.C
log.Infof("Syncing InfraSPI attributes for %q (policy ID: %s, name: %s)",
instance.Name, profile.ID, profile.Name)

// Populate topology capabilities for InfraSPI if policy uses zonal topology
err := r.populateTopologyCapabilities(ctx, instance, infraSPI, profile.ID, vc)
var overallErr error

// Populate topology capabilities for InfraSPI if policy uses zonal topology.
clusterDatastoreCache, err := r.populateTopologyCapabilitiesWithCache(ctx, instance, infraSPI, profile.ID, vc)
if err != nil {
log.Errorf("Failed to populate topology capabilities for profile %s: %v", profile.ID, err)
return err
overallErr = errors.Join(overallErr, err)
}

return nil
// Populate volume capabilities (filesystem support and high-performance linked clone).
// Reuse the cache from topology calculation to avoid duplicate vCenter calls.
if err := populateVolumeCapabilities(ctx, infraSPI, vc, profile.ID, r.topologyMgr, clusterDatastoreCache); err != nil {
log.Errorf("Failed to populate volume capabilities for profile %s: %v", profile.ID, err)
overallErr = errors.Join(overallErr, err)
}

return overallErr
}

// populateTopologyCapabilities populates topology information for the storage policy in InfraSPI
func (r *ReconcileClusterStoragePolicyInfo) populateTopologyCapabilities(ctx context.Context,
clusterSPI *clusterspiv1alpha1.ClusterStoragePolicyInfo,
infraSPI *infraspiv1alpha1.InfraStoragePolicyInfo, profileID string,
vc *cnsvsphere.VirtualCenter) error {
_, err := r.populateTopologyCapabilitiesWithCache(ctx, clusterSPI, infraSPI, profileID, vc)
return err
}

// populateTopologyCapabilitiesWithCache populates topology information and returns the cache for reuse
func (r *ReconcileClusterStoragePolicyInfo) populateTopologyCapabilitiesWithCache(ctx context.Context,
clusterSPI *clusterspiv1alpha1.ClusterStoragePolicyInfo,
infraSPI *infraspiv1alpha1.InfraStoragePolicyInfo, profileID string,
vc *cnsvsphere.VirtualCenter) (map[string][]*cnsvsphere.DatastoreInfo, error) {
log := logger.GetLogger(ctx)

// Get StorageClass that references this policy
storageClass, err := getStorageClassForPolicy(ctx, r.client, profileID)
if err != nil {
return fmt.Errorf("failed to get StorageClass for policy: %w", err)
return nil, fmt.Errorf("failed to get StorageClass for policy: %w", err)
}

// Get the StorageTopologyType parameter value from StorageClass
topologyType, err := getStorageTopologyType(ctx, storageClass)
if err != nil {
log.Errorf("Storage policy %s does not have StorageTopologyType parameter, skipping topology population: %v",
profileID, err)
return err
return nil, err
}

log.Infof("Storage policy %s has topology type: %q", profileID, topologyType)
Expand All @@ -690,17 +708,17 @@ func (r *ReconcileClusterStoragePolicyInfo) populateTopologyCapabilities(ctx con

log.Infof("Storage policy %s uses zonal topology, populating accessible zones", profileID)

// Get accessible zones directly using topology service
accessibleZones, err := getAccessibleZonesForPolicy(ctx, r.topologyMgr, vc, profileID)
// Get accessible zones directly using topology service and return the cache
accessibleZones, clusterDatastoreCache, err := getAccessibleZonesForPolicy(ctx, r.topologyMgr, vc, profileID)
if err != nil {
return fmt.Errorf("failed to get accessible zones: %w", err)
return nil, fmt.Errorf("failed to get accessible zones: %w", err)
}

// Update InfraSPI with topology information including accessible zones
infraSPI.Status.Topology.AccessibleZones = accessibleZones
log.Infof("Storage policy %s is accessible in zones: %v", profileID, accessibleZones)

return nil
return clusterDatastoreCache, nil
}

// setClusterSPIError sets error and records an event on the ClusterStoragePolicyInfo instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2394,7 +2394,7 @@ func TestGetAccessibleZonesForPolicy(t *testing.T) {
// the logic we can control and provide a simplified test structure

if tt.topologyMgr == nil {
result, err := getAccessibleZonesForPolicy(ctx, nil, mockVC, tt.profileID)
result, _, err := getAccessibleZonesForPolicy(ctx, nil, mockVC, tt.profileID)
if tt.expectedError != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedError)
Expand All @@ -2406,7 +2406,7 @@ func TestGetAccessibleZonesForPolicy(t *testing.T) {
}

if len(tt.topologyMgr.GetAZClustersMap(ctx)) == 0 {
result, err := getAccessibleZonesForPolicy(ctx, tt.topologyMgr, mockVC, tt.profileID)
result, _, err := getAccessibleZonesForPolicy(ctx, tt.topologyMgr, mockVC, tt.profileID)
require.NoError(t, err)
assert.Equal(t, tt.expectedZones, result)
return
Expand Down
Loading