Skip to content

Commit cd3cb03

Browse files
committed
Admission refactor.
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
1 parent 9077081 commit cd3cb03

File tree

13 files changed

+262
-194
lines changed

13 files changed

+262
-194
lines changed

cmd/admission/main.go

Lines changed: 26 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ limitations under the License.
1616
package main
1717

1818
import (
19-
"io/ioutil"
2019
"net/http"
2120
"os"
2221
"os/signal"
@@ -34,17 +33,13 @@ import (
3433

3534
"volcano.sh/volcano/cmd/admission/app"
3635
"volcano.sh/volcano/cmd/admission/app/options"
37-
"volcano.sh/volcano/pkg/admission"
36+
"volcano.sh/volcano/pkg/admission/router"
3837
"volcano.sh/volcano/pkg/version"
39-
)
40-
41-
func serveJobs(w http.ResponseWriter, r *http.Request) {
42-
admission.Serve(w, r, admission.AdmitJobs)
43-
}
4438

45-
func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
46-
admission.Serve(w, r, admission.MutateJobs)
47-
}
39+
_ "volcano.sh/volcano/pkg/admission/jobs/mutate"
40+
_ "volcano.sh/volcano/pkg/admission/jobs/validate"
41+
_ "volcano.sh/volcano/pkg/admission/pods"
42+
)
4843

4944
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
5045

@@ -64,41 +59,43 @@ func main() {
6459
go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop)
6560
defer klog.Flush()
6661

67-
http.HandleFunc(admission.AdmitJobPath, serveJobs)
68-
http.HandleFunc(admission.MutateJobPath, serveMutateJobs)
69-
7062
if err := config.CheckPortOrDie(); err != nil {
7163
klog.Fatalf("Configured port is invalid: %v", err)
7264
}
73-
addr := ":" + strconv.Itoa(config.Port)
7465

7566
restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
7667
if err != nil {
7768
klog.Fatalf("Unable to build k8s config: %v", err)
7869
}
7970

80-
admission.VolcanoClientSet = app.GetVolcanoClient(restConfig)
81-
82-
servePods(config)
83-
84-
caBundle, err := ioutil.ReadFile(config.CaCertFile)
85-
if err != nil {
86-
klog.Fatalf("Unable to read cacert file: %v", err)
87-
}
88-
89-
err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
90-
if err != nil {
91-
klog.Fatalf("Unable to register webhook configs: %v", err)
92-
}
71+
vClient := app.GetVolcanoClient(restConfig)
72+
router.ForEachAdmission(func(service *router.AdmissionService) {
73+
if service.Config != nil {
74+
service.Config.VolcanoClient = vClient
75+
service.Config.SchedulerName = config.SchedulerName
76+
}
77+
http.HandleFunc(service.Path, service.Handler)
78+
})
79+
80+
//
81+
//caBundle, err := ioutil.ReadFile(config.CaCertFile)
82+
//if err != nil {
83+
// klog.Fatalf("Unable to read cacert file: %v", err)
84+
//}
85+
////
86+
////err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
87+
////if err != nil {
88+
//// klog.Fatalf("Unable to register webhook configs: %v", err)
89+
////}
9390

91+
webhookServeError := make(chan struct{})
9492
stopChannel := make(chan os.Signal)
9593
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
9694

9795
server := &http.Server{
98-
Addr: addr,
96+
Addr: ":" + strconv.Itoa(config.Port),
9997
TLSConfig: app.ConfigTLS(config, restConfig),
10098
}
101-
webhookServeError := make(chan struct{})
10299
go func() {
103100
err = server.ListenAndServeTLS("", "")
104101
if err != nil && err != http.ErrServerClosed {
@@ -117,13 +114,3 @@ func main() {
117114
return
118115
}
119116
}
120-
121-
func servePods(config *options.Config) {
122-
admController := &admission.Controller{
123-
VcClients: admission.VolcanoClientSet,
124-
SchedulerName: config.SchedulerName,
125-
}
126-
http.HandleFunc(admission.AdmitPodPath, admController.ServerPods)
127-
128-
return
129-
}
Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"encoding/json"
@@ -25,6 +25,9 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/klog"
2727

28+
"volcano.sh/volcano/pkg/admission/router"
29+
"volcano.sh/volcano/pkg/admission/schema"
30+
"volcano.sh/volcano/pkg/admission/util"
2831
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
2932
)
3033

@@ -33,6 +36,11 @@ const (
3336
DefaultQueue = "default"
3437
)
3538

39+
var Service = &router.AdmissionService{
40+
Path: "/mutating-jobs",
41+
Func: MutateJobs,
42+
}
43+
3644
type patchOperation struct {
3745
Op string `json:"op"`
3846
Path string `json:"path"`
@@ -43,9 +51,9 @@ type patchOperation struct {
4351
func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
4452
klog.V(3).Infof("mutating jobs")
4553

46-
job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
54+
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
4755
if err != nil {
48-
return ToAdmissionResponse(err)
56+
return util.ToAdmissionResponse(err)
4957
}
5058

5159
reviewResponse := v1beta1.AdmissionResponse{}
@@ -58,7 +66,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5866
break
5967
default:
6068
err = fmt.Errorf("expect operation to be 'CREATE' ")
61-
return ToAdmissionResponse(err)
69+
return util.ToAdmissionResponse(err)
6270
}
6371

6472
if err != nil {
@@ -73,7 +81,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7381
return &reviewResponse
7482
}
7583

76-
func createPatch(job v1alpha1.Job) ([]byte, error) {
84+
func createPatch(job *v1alpha1.Job) ([]byte, error) {
7785
var patch []patchOperation
7886
pathQueue := patchDefaultQueue(job)
7987
if pathQueue != nil {
@@ -86,7 +94,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) {
8694
return json.Marshal(patch)
8795
}
8896

89-
func patchDefaultQueue(job v1alpha1.Job) *patchOperation {
97+
func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
9098
//Add default queue if not specified.
9199
if job.Spec.Queue == "" {
92100
return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue}

pkg/admission/mutate_job_test.go renamed to pkg/admission/jobs/mutate/mutate_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"testing"
Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package validate
1818

1919
import (
2020
"fmt"
@@ -30,23 +30,33 @@ import (
3030
k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1"
3131
k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation"
3232

33+
"volcano.sh/volcano/pkg/admission/router"
34+
"volcano.sh/volcano/pkg/admission/schema"
35+
"volcano.sh/volcano/pkg/admission/util"
3336
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
34-
vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
3537
"volcano.sh/volcano/pkg/controllers/job/plugins"
3638
)
3739

38-
// VolcanoClientSet is volcano clientset
39-
// TODO: make it as package local var.
40-
var VolcanoClientSet vcclientset.Interface
40+
func init() {
41+
router.RegisterAdmission(service)
42+
}
43+
44+
var service = &router.AdmissionService{
45+
Path: "/jobs",
46+
Func: AdmitJobs,
47+
48+
Config: config,
49+
}
50+
51+
var config = &router.AdmissionServiceConfig{}
4152

4253
// AdmitJobs is to admit jobs and return response
4354
func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
44-
4555
klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation)
4656

47-
job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
57+
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
4858
if err != nil {
49-
return ToAdmissionResponse(err)
59+
return util.ToAdmissionResponse(err)
5060
}
5161
var msg string
5262
reviewResponse := v1beta1.AdmissionResponse{}
@@ -57,14 +67,14 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5767
msg = validateJob(job, &reviewResponse)
5868
break
5969
case v1beta1.Update:
60-
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
70+
_, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource)
6171
if err != nil {
62-
return ToAdmissionResponse(err)
72+
return util.ToAdmissionResponse(err)
6373
}
6474
break
6575
default:
6676
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
67-
return ToAdmissionResponse(err)
77+
return util.ToAdmissionResponse(err)
6878
}
6979

7080
if !reviewResponse.Allowed {
@@ -73,8 +83,7 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7383
return &reviewResponse
7484
}
7585

76-
func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {
77-
86+
func validateJob(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {
7887
var msg string
7988
taskNames := map[string]string{}
8089
var totalReplicas int32
@@ -151,9 +160,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
151160
}
152161

153162
// Check whether Queue already present or not
154-
if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
163+
if _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
155164
// TODO: deprecate v1alpha1
156-
if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
165+
if _, err := config.VolcanoClient.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
157166
msg = msg + fmt.Sprintf(" unable to find job queue: %v", err)
158167
}
159168
}
@@ -165,7 +174,7 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
165174
return msg
166175
}
167176

168-
func validateTaskTemplate(task v1alpha1.TaskSpec, job v1alpha1.Job, index int) string {
177+
func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string {
169178
var v1PodTemplate v1.PodTemplate
170179
v1PodTemplate.Template = *task.Template.DeepCopy()
171180
k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate)

pkg/admission/admit_job_test.go renamed to pkg/admission/jobs/validate/admit_job_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package validate
1818

1919
import (
2020
"strings"
@@ -1046,15 +1046,15 @@ func TestValidateExecution(t *testing.T) {
10461046
},
10471047
}
10481048
// create fake volcano clientset
1049-
VolcanoClientSet = fakeclient.NewSimpleClientset()
1049+
config.VolcanoClient = fakeclient.NewSimpleClientset()
10501050

10511051
//create default queue
1052-
_, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue)
1052+
_, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Create(&defaultqueue)
10531053
if err != nil {
10541054
t.Error("Queue Creation Failed")
10551055
}
10561056

1057-
ret := validateJob(testCase.Job, &testCase.reviewResponse)
1057+
ret := validateJob(&testCase.Job, &testCase.reviewResponse)
10581058
//fmt.Printf("test-case name:%s, ret:%v testCase.reviewResponse:%v \n", testCase.Name, ret,testCase.reviewResponse)
10591059
if testCase.ExpectErr == true && ret == "" {
10601060
t.Errorf("Expect error msg :%s, but got nil.", testCase.ret)

0 commit comments

Comments
 (0)