@@ -30,9 +30,10 @@ import (
30
30
"google.golang.org/grpc/codes"
31
31
"google.golang.org/grpc/status"
32
32
33
- proxmox "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/cluster"
33
+ cluster "github.com/sergelogvinov/proxmox-cloud-controller-manager/pkg/cluster"
34
34
"github.com/sergelogvinov/proxmox-csi-plugin/pkg/helpers/ptr"
35
35
"github.com/sergelogvinov/proxmox-csi-plugin/pkg/metrics"
36
+ "github.com/sergelogvinov/proxmox-csi-plugin/pkg/proxmox"
36
37
"github.com/sergelogvinov/proxmox-csi-plugin/pkg/tools"
37
38
volume "github.com/sergelogvinov/proxmox-csi-plugin/pkg/volume"
38
39
@@ -60,22 +61,22 @@ var controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
60
61
61
62
// ControllerService is the controller service for the CSI driver
62
63
type ControllerService struct {
63
- Cluster * proxmox .Cluster
64
+ Cluster * cluster .Cluster
64
65
Kclient clientkubernetes.Interface
65
- Provider proxmox .Provider
66
+ Provider cluster .Provider
66
67
volumeLocks sync.Mutex
67
68
68
69
csi.UnimplementedControllerServer
69
70
}
70
71
71
72
// NewControllerService returns a new controller service
72
73
func NewControllerService (kclient * clientkubernetes.Clientset , cloudConfig string ) (* ControllerService , error ) {
73
- cfg , err := proxmox .ReadCloudConfigFromFile (cloudConfig )
74
+ cfg , err := cluster .ReadCloudConfigFromFile (cloudConfig )
74
75
if err != nil {
75
76
return nil , fmt .Errorf ("failed to read config: %v" , err )
76
77
}
77
78
78
- cluster , err := proxmox .NewCluster (& cfg , nil )
79
+ cluster , err := cluster .NewCluster (& cfg , nil )
79
80
if err != nil {
80
81
return nil , fmt .Errorf ("failed to create proxmox cluster client: %v" , err )
81
82
}
@@ -110,7 +111,7 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
110
111
111
112
klog .V (5 ).InfoS ("CreateVolume: parameters" , "parameters" , params )
112
113
113
- _ , err := ExtractAndDefaultParameters (params )
114
+ paramsSC , err := ExtractAndDefaultParameters (params )
114
115
if err != nil {
115
116
return nil , status .Error (codes .InvalidArgument , err .Error ())
116
117
}
@@ -192,6 +193,33 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
192
193
vmr .SetNode (zone )
193
194
vmr .SetVmType ("qemu" )
194
195
196
+ if paramsSC .Replicate != nil && * paramsSC .Replicate {
197
+ if storageConfig ["type" ].(string ) != "zfspool" { //nolint:errcheck
198
+ return nil , status .Error (codes .Internal , "error: storage type is not zfs in replication mode" )
199
+ }
200
+
201
+ vmr , err = cl .GetVmRefByName (pvc )
202
+ if err != nil {
203
+ id , err := cl .GetNextID (vmID + 1 )
204
+ if err != nil {
205
+ klog .ErrorS (err , "CreateVolume: failed to get next id" , "cluster" , region )
206
+
207
+ return nil , status .Error (codes .Internal , err .Error ())
208
+ }
209
+
210
+ vmr = pxapi .NewVmRef (id )
211
+ vmr .SetNode (zone )
212
+ vmr .SetVmType ("qemu" )
213
+
214
+ mc := metrics .NewMetricContext ("CreateVm" )
215
+ if err := proxmox .CreateQemuVM (cl , vmr , pvc ); mc .ObserveRequest (err ) != nil {
216
+ klog .ErrorS (err , "CreateVolume: failed to create vm" , "cluster" , region )
217
+
218
+ return nil , status .Error (codes .Internal , err .Error ())
219
+ }
220
+ }
221
+ }
222
+
195
223
vol := volume .NewVolume (region , zone , params [StorageIDKey ], fmt .Sprintf ("vm-%d-%s" , vmr .VmId (), pvc ))
196
224
if storageConfig ["path" ] != nil && storageConfig ["path" ].(string ) != "" { //nolint:errcheck
197
225
vol = volume .NewVolume (region , zone , params [StorageIDKey ], fmt .Sprintf ("%d/vm-%d-%s.raw" , vmr .VmId (), vmr .VmId (), pvc ))
@@ -218,6 +246,35 @@ func (d *ControllerService) CreateVolume(_ context.Context, request *csi.CreateV
218
246
return nil , status .Error (codes .AlreadyExists , "volume already exists with same name and different capacity" )
219
247
}
220
248
249
+ if paramsSC .Replicate != nil && * paramsSC .Replicate {
250
+ _ , err := attachVolume (cl , vmr , vol .Storage (), vol .Disk (), paramsSC .ToMap ())
251
+ if err != nil {
252
+ klog .ErrorS (err , "CreateVolume: failed to attach volume" , "cluster" , region , "volumeID" , vol .VolumeID (), "vmID" , vmr .VmId ())
253
+
254
+ return nil , status .Error (codes .Internal , err .Error ())
255
+ }
256
+
257
+ if paramsSC .ReplicateZones != "" {
258
+ var replicaZone string
259
+
260
+ for _ , z := range strings .Split (paramsSC .ReplicateZones , "," ) {
261
+ if z != zone {
262
+ replicaZone = z
263
+
264
+ break
265
+ }
266
+ }
267
+
268
+ if replicaZone != "" {
269
+ if err := proxmox .SetQemuVMReplication (cl , vmr , replicaZone , paramsSC .ReplicateSchedule ); err != nil {
270
+ klog .ErrorS (err , "CreateVolume: failed to set replication" , "cluster" , region , "zone" , replicaZone , "volumeID" , vol .VolumeID (), "vmID" , vmr .VmId ())
271
+
272
+ return nil , status .Error (codes .Internal , err .Error ())
273
+ }
274
+ }
275
+ }
276
+ }
277
+
221
278
volID := vol .VolumeID ()
222
279
if storageConfig ["shared" ] != nil && int (storageConfig ["shared" ].(float64 )) == 1 { //nolint:errcheck
223
280
volID = vol .VolumeSharedID ()
@@ -279,6 +336,25 @@ func (d *ControllerService) DeleteVolume(_ context.Context, request *csi.DeleteV
279
336
return nil , status .Error (codes .Internal , err .Error ())
280
337
}
281
338
339
+ if vmr .VmId () != vmID {
340
+ config , err := cl .GetVmConfig (vmr )
341
+ if err != nil {
342
+ klog .ErrorS (err , "DeleteVolume: failed to get vm config" , "cluster" , vol .Cluster (), "volumeName" , vol .Disk ())
343
+ }
344
+
345
+ if config != nil {
346
+ vmName := config ["name" ].(string ) //nolint:errcheck
347
+ if vmName != "" && strings .HasSuffix (vol .Disk (), vmName ) {
348
+ mc := metrics .NewMetricContext ("deleteVm" )
349
+ if err := proxmox .DeleteQemuVM (cl , vmr ); mc .ObserveRequest (err ) != nil {
350
+ klog .ErrorS (err , "DeleteVolume: failed to delete vm" , "cluster" , vol .Cluster (), "volumeName" , vol .Disk ())
351
+
352
+ return nil , status .Error (codes .Internal , fmt .Sprintf ("failed to delete volume: %s" , vol .Disk ()))
353
+ }
354
+ }
355
+ }
356
+ }
357
+
282
358
mc := metrics .NewMetricContext ("deleteVolume" )
283
359
if _ , err := cl .DeleteVolume (vmr , vol .Storage (), vol .Disk ()); mc .ObserveRequest (err ) != nil {
284
360
klog .ErrorS (err , "DeleteVolume: failed to delete volume" , "cluster" , vol .Cluster (), "volumeName" , vol .Disk ())
@@ -729,7 +805,7 @@ func (d *ControllerService) getVMRefbyNodeID(ctx context.Context, cl *pxapi.Clie
729
805
return nil , status .Error (codes .InvalidArgument , err .Error ())
730
806
}
731
807
732
- if d .Provider == proxmox .ProviderCapmox {
808
+ if d .Provider == cluster .ProviderCapmox {
733
809
vmr , _ , err = d .Cluster .FindVMByUUID (node .Status .NodeInfo .SystemUUID )
734
810
if err != nil {
735
811
return nil , status .Error (codes .Internal , err .Error ())
0 commit comments