Skip to content

Commit e5fd206

Browse files
Merge pull request #498 from lminzhw/volumes
refresh volumes logic
2 parents 5ab6963 + 6fa3a4b commit e5fd206

6 files changed

Lines changed: 83 additions & 43 deletions

File tree

pkg/admission/admission_controller.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"k8s.io/apimachinery/pkg/runtime/serializer"
3131
"k8s.io/apimachinery/pkg/util/validation/field"
3232

33+
"k8s.io/kubernetes/pkg/apis/core/validation"
3334
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
3435
vcver "volcano.sh/volcano/pkg/client/clientset/versioned"
3536
)
@@ -241,6 +242,14 @@ func ValidateIO(volumes []v1alpha1.VolumeSpec) (string, bool) {
241242
if _, found := volumeMap[volume.MountPath]; found {
242243
return fmt.Sprintf(" duplicated mountPath: %s;", volume.MountPath), true
243244
}
245+
if len(volume.VolumeClaimName) != 0 {
246+
if volume.VolumeClaim != nil {
247+
return fmt.Sprintf("Confilct: If you want to use an existing PVC, just specify VolumeClaimName. If you want to create a new PVC, you do not need to specify VolumeClaimName."), true
248+
}
249+
if errMsgs := validation.ValidatePersistentVolumeName(volume.VolumeClaimName, false); len(errMsgs) > 0 {
250+
return fmt.Sprintf("Illegal VolumeClaimName %s : %v", volume.VolumeClaimName, errMsgs), true
251+
}
252+
}
244253
volumeMap[volume.MountPath] = true
245254
}
246255
return "", false

pkg/controllers/job/helpers/helpers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ func MakePodName(jobName string, taskName string, index int) string {
4949
return fmt.Sprintf(PodNameFmt, jobName, taskName, index)
5050
}
5151

52-
func genRandomStr(l int) string {
52+
// GenRandomStr generate random str with specified length l
53+
func GenRandomStr(l int) string {
5354
str := "0123456789abcdefghijklmnopqrstuvwxyz"
5455
bytes := []byte(str)
5556
var result []byte
@@ -62,7 +63,7 @@ func genRandomStr(l int) string {
6263

6364
// MakeVolumeClaimName creates volume claim name
6465
func MakeVolumeClaimName(jobName string) string {
65-
return fmt.Sprintf(VolumeClaimFmt, jobName, genRandomStr(12))
66+
return fmt.Sprintf(VolumeClaimFmt, jobName, GenRandomStr(12))
6667
}
6768

6869
// GetJobKeyByReq gets the key for the job request

pkg/controllers/job/job_controller_actions.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package job
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"sort"
2223
"sync"
@@ -157,19 +158,19 @@ func (cc *Controller) createJob(job *vkv1.Job) (*vkv1.Job, error) {
157158
return nil, err
158159
}
159160

160-
if err := cc.createPodGroupIfNotExist(job); err != nil {
161-
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError),
162-
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
163-
return nil, err
164-
}
165-
166161
newJob, err := cc.createJobIOIfNotExist(job)
167162
if err != nil {
168163
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PVCError),
169164
fmt.Sprintf("Failed to create PVC, err: %v", err))
170165
return nil, err
171166
}
172167

168+
if err := cc.createPodGroupIfNotExist(newJob); err != nil {
169+
cc.recorder.Event(job, v1.EventTypeWarning, string(vkv1.PodGroupError),
170+
fmt.Sprintf("Failed to create PodGroup, err: %v", err))
171+
return nil, err
172+
}
173+
173174
return newJob, nil
174175
}
175176

@@ -336,18 +337,20 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
336337

337338
func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
338339
// If PVC does not exist, create them for Job.
339-
var needUpdate, nameExist bool
340+
var needUpdate bool
340341
volumes := job.Spec.Volumes
342+
if job.Status.ControlledResources == nil {
343+
job.Status.ControlledResources = make(map[string]string)
344+
}
341345
for index, volume := range volumes {
342-
nameExist = false
343346
vcName := volume.VolumeClaimName
344347
if len(vcName) == 0 {
345348
//NOTE(k82cn): Ensure never have duplicated generated names.
346349
for {
347350
vcName = vkjobhelpers.MakeVolumeClaimName(job.Name)
348351
exist, err := cc.checkPVCExist(job, vcName)
349352
if err != nil {
350-
return nil, err
353+
return job, err
351354
}
352355
if exist {
353356
continue
@@ -356,37 +359,40 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) (*vkv1.Job, error) {
356359
needUpdate = true
357360
break
358361
}
359-
} else {
360-
exist, err := cc.checkPVCExist(job, vcName)
361-
if err != nil {
362-
return nil, err
363-
}
364-
nameExist = exist
365-
}
366-
367-
if !nameExist {
368-
if job.Status.ControlledResources == nil {
369-
job.Status.ControlledResources = make(map[string]string)
370-
}
371362
if volume.VolumeClaim != nil {
372363
if err := cc.createPVC(job, vcName, volume.VolumeClaim); err != nil {
373-
return nil, err
364+
return job, err
374365
}
375366
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
376367
} else {
377368
job.Status.ControlledResources["volume-emptyDir-"+vcName] = vcName
378369
}
370+
} else {
371+
if job.Status.ControlledResources["volume-emptyDir-"+vcName] == vcName || job.Status.ControlledResources["volume-pvc-"+vcName] == vcName {
372+
continue
373+
}
374+
exist, err := cc.checkPVCExist(job, vcName)
375+
if err != nil {
376+
return job, err
377+
}
378+
if exist {
379+
job.Status.ControlledResources["volume-pvc-"+vcName] = vcName
380+
} else {
381+
msg := fmt.Sprintf("pvc %s is not found, the job will be in the Pending state until the PVC is created", vcName)
382+
glog.Error(msg)
383+
return job, errors.New(msg)
384+
}
379385
}
380386
}
381387
if needUpdate {
382388
newJob, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job)
383389
if err != nil {
384390
glog.Errorf("Failed to update Job %v/%v for volume claim name: %v ",
385391
job.Namespace, job.Name, err)
386-
return nil, err
392+
return job, err
387393
}
388-
newJob.Status = job.Status
389394

395+
newJob.Status = job.Status
390396
return newJob, err
391397
}
392398
return job, nil

pkg/controllers/job/job_controller_actions_test.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package job
1818

1919
import (
20+
"errors"
2021
"fmt"
2122
"testing"
2223

@@ -295,7 +296,7 @@ func TestCreateJobIOIfNotExistFunc(t *testing.T) {
295296
ExpextVal error
296297
}{
297298
{
298-
Name: "Create Job IO success case",
299+
Name: "Create Job IO case",
299300
Job: &v1alpha1.Job{
300301
ObjectMeta: metav1.ObjectMeta{
301302
Name: "job1",
@@ -309,7 +310,7 @@ func TestCreateJobIOIfNotExistFunc(t *testing.T) {
309310
},
310311
},
311312
},
312-
ExpextVal: nil,
313+
ExpextVal: errors.New("pvc pvc1 is not found, the job will be in the Pending state until the PVC is created"),
313314
},
314315
}
315316

@@ -319,8 +320,14 @@ func TestCreateJobIOIfNotExistFunc(t *testing.T) {
319320
fakeController := newFakeController()
320321

321322
job, err := fakeController.createJobIOIfNotExist(testcase.Job)
322-
if err != testcase.ExpextVal {
323-
t.Errorf("Expected Return value to be : %s, but got: %s in testcase %d", testcase.ExpextVal, err, i)
323+
if testcase.ExpextVal == nil {
324+
if err != nil {
325+
t.Errorf("Expected Return value to be : %v, but got: %v in testcase %d", testcase.ExpextVal, err, i)
326+
}
327+
} else {
328+
if err == nil || err.Error() != testcase.ExpextVal.Error() {
329+
t.Errorf("Expected Return value to be : %v, but got: %v in testcase %d", testcase.ExpextVal.Error(), err.Error(), i)
330+
}
324331
}
325332

326333
if len(job.Spec.Volumes) == 0 {

pkg/controllers/job/job_controller_util.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,32 +59,35 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod {
5959
pod.Spec.SchedulerName = job.Spec.SchedulerName
6060
}
6161

62-
volumeMap := make(map[string]bool)
62+
volumeMap := make(map[string]string)
6363
for _, volume := range job.Spec.Volumes {
6464
vcName := volume.VolumeClaimName
65+
name := fmt.Sprintf("%s-%s", job.Name, vkjobhelpers.GenRandomStr(12))
6566
if _, ok := volumeMap[vcName]; !ok {
66-
if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok && volume.VolumeClaim == nil {
67+
if _, ok := job.Status.ControlledResources["volume-emptyDir-"+vcName]; ok {
6768
volume := v1.Volume{
68-
Name: vcName,
69+
Name: name,
6970
}
7071
volume.EmptyDir = &v1.EmptyDirVolumeSource{}
7172
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
7273
} else {
7374
volume := v1.Volume{
74-
Name: vcName,
75+
Name: name,
7576
}
7677
volume.PersistentVolumeClaim = &v1.PersistentVolumeClaimVolumeSource{
7778
ClaimName: vcName,
7879
}
7980
pod.Spec.Volumes = append(pod.Spec.Volumes, volume)
8081
}
81-
volumeMap[vcName] = true
82+
volumeMap[vcName] = name
83+
} else {
84+
name = volumeMap[vcName]
8285
}
8386

8487
for i, c := range pod.Spec.Containers {
8588
vm := v1.VolumeMount{
8689
MountPath: volume.MountPath,
87-
Name: vcName,
90+
Name: name,
8891
}
8992
pod.Spec.Containers[i].VolumeMounts = append(c.VolumeMounts, vm)
9093
}

test/e2e/job_controlled_resource.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,17 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() {
125125

126126
_, err1 := context.kubeclient.CoreV1().PersistentVolumeClaims(namespace).Create(&pvc)
127127
Expect(err1).NotTo(HaveOccurred(), "pvc creation")
128+
129+
pvSpec := &v12.PersistentVolumeClaimSpec{
130+
Resources: v12.ResourceRequirements{
131+
Requests: v12.ResourceList{
132+
v12.ResourceName(v12.ResourceStorage): resource.MustParse("1Gi"),
133+
},
134+
},
135+
AccessModes: []v12.PersistentVolumeAccessMode{
136+
v12.ReadWriteOnce,
137+
},
138+
}
128139
job := createJob(context, &jobSpec{
129140
namespace: namespace,
130141
name: jobName,
@@ -139,9 +150,12 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() {
139150
},
140151
volumes: []v1alpha1.VolumeSpec{
141152
{
142-
MountPath: "/mounttwo",
153+
MountPath: "/mountone",
143154
VolumeClaimName: pvcName,
144-
VolumeClaim: &pvc.Spec,
155+
},
156+
{
157+
MountPath: "/mounttwo",
158+
VolumeClaim: pvSpec,
145159
},
146160
},
147161
})
@@ -152,12 +166,12 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() {
152166
job, err = context.vcclient.BatchV1alpha1().Jobs(namespace).Get(jobName, metav1.GetOptions{})
153167
Expect(err).NotTo(HaveOccurred())
154168

155-
Expect(len(job.Spec.Volumes)).To(Equal(1),
169+
Expect(len(job.Spec.Volumes)).To(Equal(2),
156170
" volume should be created")
157-
for _, volume := range job.Spec.Volumes {
158-
Expect(volume.VolumeClaimName).Should((Equal(pvcName)),
159-
"PVC name should not be generated .")
160-
}
171+
Expect(job.Spec.Volumes[0].VolumeClaimName).Should(Equal(pvcName),
172+
"volume 1 PVC name should not be generated .")
173+
Expect(job.Spec.Volumes[1].VolumeClaimName).Should(Not(Equal("")),
174+
"volume 0 PVC name should be generated.")
161175
})
162176

163177
It("Generate PodGroup and valid minResource when creating job", func() {

0 commit comments

Comments
 (0)