diff --git a/cmd/admission/app/options/options.go b/cmd/admission/app/options/options.go index 98dd1bd8f01..6c533279a9d 100644 --- a/cmd/admission/app/options/options.go +++ b/cmd/admission/app/options/options.go @@ -20,13 +20,6 @@ import ( "fmt" "github.com/spf13/pflag" - - "k8s.io/api/admissionregistration/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - admissionregistrationv1beta1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1" - "k8s.io/klog" ) const ( @@ -35,20 +28,17 @@ const ( // Config admission-controller server config. type Config struct { - Master string - Kubeconfig string - CertFile string - KeyFile string - CaCertFile string - Port int - MutateWebhookConfigName string - MutateWebhookName string - ValidateWebhookConfigName string - ValidateWebhookName string - PrintVersion bool - AdmissionServiceName string - AdmissionServiceNamespace string - SchedulerName string + Master string + Kubeconfig string + CertFile string + KeyFile string + CaCertFile string + Port int + PrintVersion bool + WebhookName string + WebhookNamespace string + SchedulerName string + WebhookURL string } // NewConfig create new config @@ -65,37 +55,17 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) { "File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+ "after server cert).") fs.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.") - fs.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.") fs.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.") - fs.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "", - "Name of the mutatingwebhookconfiguration resource in Kubernetes [Deprecated]: it will be generated when not specified.") - fs.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "", - "Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified") - fs.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "", - "Name of the mutatingwebhookconfiguration resource in Kubernetes. [Deprecated]: it will be generated when not specified") - fs.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "", - "Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified") fs.BoolVar(&c.PrintVersion, "version", false, "Show version and quit") - fs.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook") - fs.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service") + + fs.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.") + fs.StringVar(&c.WebhookNamespace, "webhook-namespace", "", "The namespace of this webhook") + fs.StringVar(&c.WebhookName, "webhook-service-name", "", "The name of this webhook") + fs.StringVar(&c.WebhookURL, "webhook-url", "", "The url of this webhook") + fs.StringVar(&c.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name") } -const ( - // ValidateConfigName ValidatingWebhookConfiguration name format - ValidateConfigName = "%s-validate-job" - // MutateConfigName MutatingWebhookConfiguration name format - MutateConfigName = "%s-mutate-job" - // ValidateHookName Default name for webhooks in ValidatingWebhookConfiguration - ValidateHookName = "validatejob.volcano.sh" - // MutateHookName Default name for webhooks in MutatingWebhookConfiguration - MutateHookName = "mutatejob.volcano.sh" - // ValidatePodConfigName ValidatingWebhookPodConfiguration name format - ValidatePodConfigName = "%s-validate-pod" - // ValidatePodHookName Default name for webhooks in ValidatingWebhookPodConfiguration - ValidatePodHookName = "validatepod.volcano.sh" -) - // CheckPortOrDie check valid port range func (c *Config) CheckPortOrDie() error { if c.Port < 1 || c.Port > 65535 { @@ -103,172 +73,3 @@ func (c *Config) CheckPortOrDie() error { } return nil } - -func useGeneratedNameIfRequired(configured, generated string) string { - if configured != "" { - return configured - } - return generated -} - -// RegisterWebhooks register webhooks for admission service -func RegisterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte) error { - ignorePolicy := v1beta1.Ignore - - //Prepare validate webhooks - path := "/jobs" - JobValidateHooks := v1beta1.ValidatingWebhookConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: useGeneratedNameIfRequired(c.ValidateWebhookConfigName, - fmt.Sprintf(ValidateConfigName, c.AdmissionServiceName)), - }, - Webhooks: []v1beta1.Webhook{{ - Name: useGeneratedNameIfRequired(c.ValidateWebhookName, ValidateHookName), - Rules: []v1beta1.RuleWithOperations{ - { - Operations: []v1beta1.OperationType{v1beta1.Create, v1beta1.Update}, - Rule: v1beta1.Rule{ - APIGroups: []string{"batch.volcano.sh"}, - APIVersions: []string{"v1alpha1"}, - Resources: []string{"jobs"}, - }, - }, - }, - ClientConfig: v1beta1.WebhookClientConfig{ - Service: &v1beta1.ServiceReference{ - Name: c.AdmissionServiceName, - Namespace: c.AdmissionServiceNamespace, - Path: &path, - }, - CABundle: cabundle, - }, - FailurePolicy: &ignorePolicy, - }}, - } - - if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), - []v1beta1.ValidatingWebhookConfiguration{JobValidateHooks}); err != nil { - return err - } - - //Prepare mutate jobs - path = "/mutating-jobs" - JobMutateHooks := v1beta1.MutatingWebhookConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: useGeneratedNameIfRequired(c.MutateWebhookConfigName, - fmt.Sprintf(MutateConfigName, c.AdmissionServiceName)), - }, - Webhooks: []v1beta1.Webhook{{ - Name: useGeneratedNameIfRequired(c.MutateWebhookName, MutateHookName), - Rules: []v1beta1.RuleWithOperations{ - { - Operations: []v1beta1.OperationType{v1beta1.Create}, - Rule: v1beta1.Rule{ - APIGroups: []string{"batch.volcano.sh"}, - APIVersions: []string{"v1alpha1"}, - Resources: []string{"jobs"}, - }, - }, - }, - ClientConfig: v1beta1.WebhookClientConfig{ - Service: &v1beta1.ServiceReference{ - Name: c.AdmissionServiceName, - Namespace: c.AdmissionServiceNamespace, - Path: &path, - }, - CABundle: cabundle, - }, - FailurePolicy: &ignorePolicy, - }}, - } - - if err := registerMutateWebhook(clienset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(), - []v1beta1.MutatingWebhookConfiguration{JobMutateHooks}); err != nil { - return err - } - - // Prepare validate pods - path = "/pods" - PodValidateHooks := v1beta1.ValidatingWebhookConfiguration{ - ObjectMeta: metav1.ObjectMeta{ - Name: useGeneratedNameIfRequired("", - fmt.Sprintf(ValidatePodConfigName, c.AdmissionServiceName)), - }, - Webhooks: []v1beta1.Webhook{{ - Name: useGeneratedNameIfRequired("", ValidatePodHookName), - Rules: []v1beta1.RuleWithOperations{ - { - Operations: []v1beta1.OperationType{v1beta1.Create}, - Rule: v1beta1.Rule{ - APIGroups: []string{""}, - APIVersions: []string{"v1"}, - Resources: []string{"pods"}, - }, - }, - }, - ClientConfig: v1beta1.WebhookClientConfig{ - Service: &v1beta1.ServiceReference{ - Name: c.AdmissionServiceName, - Namespace: c.AdmissionServiceNamespace, - Path: &path, - }, - CABundle: cabundle, - }, - FailurePolicy: &ignorePolicy, - }}, - } - - if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(), - []v1beta1.ValidatingWebhookConfiguration{PodValidateHooks}); err != nil { - return err - } - - return nil - -} - -func registerMutateWebhook(client admissionregistrationv1beta1.MutatingWebhookConfigurationInterface, - webhooks []v1beta1.MutatingWebhookConfiguration) error { - for _, hook := range webhooks { - existing, err := client.Get(hook.Name, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - if err == nil && existing != nil { - klog.Infof("Updating MutatingWebhookConfiguration %v", hook) - existing.Webhooks = hook.Webhooks - if _, err := client.Update(existing); err != nil { - return err - } - } else { - klog.Infof("Creating MutatingWebhookConfiguration %v", hook) - if _, err := client.Create(&hook); err != nil { - return err - } - } - } - return nil -} - -func registerValidateWebhook(client admissionregistrationv1beta1.ValidatingWebhookConfigurationInterface, - webhooks []v1beta1.ValidatingWebhookConfiguration) error { - for _, hook := range webhooks { - existing, err := client.Get(hook.Name, metav1.GetOptions{}) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - if err == nil && existing != nil { - existing.Webhooks = hook.Webhooks - klog.Infof("Updating ValidatingWebhookConfiguration %v", hook) - if _, err := client.Update(existing); err != nil { - return err - } - } else { - klog.Infof("Creating ValidatingWebhookConfiguration %v", hook) - if _, err := client.Create(&hook); err != nil { - return err - } - } - } - return nil -} diff --git a/cmd/admission/app/server.go b/cmd/admission/app/server.go index 14e4b045c15..c54acc23cb8 100644 --- a/cmd/admission/app/server.go +++ b/cmd/admission/app/server.go @@ -17,60 +17,83 @@ limitations under the License. package app import ( - "crypto/tls" + "fmt" + "io/ioutil" + "net/http" + "os" + "os/signal" + "strconv" + "syscall" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" "volcano.sh/volcano/cmd/admission/app/options" - "volcano.sh/volcano/pkg/client/clientset/versioned" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/version" ) -// GetClient Get a clientset with restConfig. -func GetClient(restConfig *rest.Config) *kubernetes.Clientset { - clientset, err := kubernetes.NewForConfig(restConfig) +// Run start the service of admission controller. +func Run(config *options.Config) error { + if config.PrintVersion { + version.PrintVersionAndExit() + return nil + } + + if config.WebhookURL == "" && config.WebhookNamespace == "" && config.WebhookName == "" { + return fmt.Errorf("failed to start webhooks as both 'url' and 'namespace/name' of webhook are empty") + } + + restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig) if err != nil { - klog.Fatal(err) + return fmt.Errorf("unable to build k8s config: %v", err) } - return clientset -} -// GetVolcanoClient get a clientset for volcano -func GetVolcanoClient(restConfig *rest.Config) *versioned.Clientset { - clientset, err := versioned.NewForConfig(restConfig) + caBundle, err := ioutil.ReadFile(config.CaCertFile) if err != nil { - klog.Fatal(err) + return fmt.Errorf("unable to read cacert file (%s): %v", config.CaCertFile, err) } - return clientset -} -// ConfigTLS is a helper function that generate tls certificates from directly defined tls config or kubeconfig -// These are passed in as command line for cluster certification. If tls config is passed in, we use the directly -// defined tls config, else use that defined in kubeconfig -func ConfigTLS(config *options.Config, restConfig *rest.Config) *tls.Config { - if len(config.CertFile) != 0 && len(config.KeyFile) != 0 { - sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile) - if err != nil { - klog.Fatal(err) + vClient := getVolcanoClient(restConfig) + kubeClient := getKubeClient(restConfig) + router.ForEachAdmission(func(service *router.AdmissionService) { + if service.Config != nil { + service.Config.VolcanoClient = vClient + service.Config.SchedulerName = config.SchedulerName } - return &tls.Config{ - Certificates: []tls.Certificate{sCert}, - } - } + klog.V(3).Infof("Registered '%s' as webhook.", service.Path) + http.HandleFunc(service.Path, service.Handler) + + klog.V(3).Infof("Registered configuration for webhook <%s>", service.Path) + registerWebhookConfig(kubeClient, config, service, caBundle) + }) + + webhookServeError := make(chan struct{}) + stopChannel := make(chan os.Signal) + signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT) - if len(restConfig.CertData) != 0 && len(restConfig.KeyData) != 0 { - sCert, err := tls.X509KeyPair(restConfig.CertData, restConfig.KeyData) - if err != nil { - klog.Fatal(err) + server := &http.Server{ + Addr: ":" + strconv.Itoa(config.Port), + TLSConfig: configTLS(config, restConfig), + } + go func() { + err = server.ListenAndServeTLS("", "") + if err != nil && err != http.ErrServerClosed { + klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err) + close(webhookServeError) } - return &tls.Config{ - Certificates: []tls.Certificate{sCert}, + klog.Info("Volcano Webhook manager started.") + }() + + select { + case <-stopChannel: + if err := server.Close(); err != nil { + return fmt.Errorf("close admission server failed: %v", err) } + return nil + case <-webhookServeError: + return fmt.Errorf("unknown webhook server error") } - - klog.Fatal("tls: failed to find any tls config data") - return &tls.Config{} } diff --git a/cmd/admission/app/util.go b/cmd/admission/app/util.go new file mode 100644 index 00000000000..132e88ff4ed --- /dev/null +++ b/cmd/admission/app/util.go @@ -0,0 +1,185 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "crypto/tls" + "regexp" + "strings" + + "k8s.io/api/admissionregistration/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog" + + "volcano.sh/volcano/cmd/admission/app/options" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/client/clientset/versioned" +) + +func registerWebhookConfig(kubeClient *kubernetes.Clientset, config *options.Config, service *router.AdmissionService, caBundle []byte) { + clientConfig := v1beta1.WebhookClientConfig{ + CABundle: caBundle, + } + if config.WebhookURL != "" { + url := config.WebhookURL + service.Path + clientConfig.URL = &url + klog.Infof("The URL of webhook manager is <%s>.", url) + } + if config.WebhookName != "" && config.WebhookNamespace != "" { + clientConfig.Service = &v1beta1.ServiceReference{ + Name: config.WebhookName, + Namespace: config.WebhookNamespace, + Path: &service.Path, + } + klog.Infof("The service of webhook manager is <%s/%s/%s>.", + config.WebhookName, config.WebhookNamespace, service.Path) + } + if service.MutatingConfig != nil { + for i := range service.MutatingConfig.Webhooks { + service.MutatingConfig.Webhooks[i].ClientConfig = clientConfig + } + + service.MutatingConfig.ObjectMeta.Name = webhookConfigName(config.WebhookName, service.Path) + + if err := registerMutateWebhook(kubeClient, service.MutatingConfig); err != nil { + klog.Errorf("Failed to register mutating admission webhook (%s): %v", + service.Path, err) + } else { + klog.V(3).Infof("Registered mutating webhook for path <%s>.", service.Path) + } + } + if service.ValidatingConfig != nil { + for i := range service.ValidatingConfig.Webhooks { + service.ValidatingConfig.Webhooks[i].ClientConfig = clientConfig + } + + service.ValidatingConfig.ObjectMeta.Name = webhookConfigName(config.WebhookName, service.Path) + + if err := registerValidateWebhook(kubeClient, service.ValidatingConfig); err != nil { + klog.Errorf("Failed to register validating admission webhook (%s): %v", + service.Path, err) + } else { + klog.V(3).Infof("Registered validating webhook for path <%s>.", service.Path) + } + } +} + +// getKubeClient Get a clientset with restConfig. +func getKubeClient(restConfig *rest.Config) *kubernetes.Clientset { + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + klog.Fatal(err) + } + return clientset +} + +// GetVolcanoClient get a clientset for volcano +func getVolcanoClient(restConfig *rest.Config) *versioned.Clientset { + clientset, err := versioned.NewForConfig(restConfig) + if err != nil { + klog.Fatal(err) + } + return clientset +} + +// configTLS is a helper function that generate tls certificates from directly defined tls config or kubeconfig +// These are passed in as command line for cluster certification. If tls config is passed in, we use the directly +// defined tls config, else use that defined in kubeconfig +func configTLS(config *options.Config, restConfig *rest.Config) *tls.Config { + if len(config.CertFile) != 0 && len(config.KeyFile) != 0 { + sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile) + if err != nil { + klog.Fatal(err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{sCert}, + } + } + + if len(restConfig.CertData) != 0 && len(restConfig.KeyData) != 0 { + sCert, err := tls.X509KeyPair(restConfig.CertData, restConfig.KeyData) + if err != nil { + klog.Fatal(err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{sCert}, + } + } + + klog.Fatal("tls: failed to find any tls config data") + return &tls.Config{} +} + +func registerMutateWebhook(clientset *kubernetes.Clientset, hook *v1beta1.MutatingWebhookConfiguration) error { + client := clientset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations() + existing, err := client.Get(hook.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if err == nil && existing != nil { + klog.V(4).Infof("Updating MutatingWebhookConfiguration %v", hook) + existing.Webhooks = hook.Webhooks + if _, err := client.Update(existing); err != nil { + return err + } + } else { + klog.V(4).Infof("Creating MutatingWebhookConfiguration %v", hook) + if _, err := client.Create(hook); err != nil { + return err + } + } + + return nil +} + +func registerValidateWebhook(clientset *kubernetes.Clientset, hook *v1beta1.ValidatingWebhookConfiguration) error { + client := clientset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations() + + existing, err := client.Get(hook.Name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if err == nil && existing != nil { + existing.Webhooks = hook.Webhooks + klog.V(4).Infof("Updating ValidatingWebhookConfiguration %v", hook) + if _, err := client.Update(existing); err != nil { + return err + } + } else { + klog.V(4).Infof("Creating ValidatingWebhookConfiguration %v", hook) + if _, err := client.Create(hook); err != nil { + return err + } + } + + return nil +} + +func webhookConfigName(name, path string) string { + if name == "" { + name = "webhook" + } + + re := regexp.MustCompile(`-+`) + raw := strings.Join([]string{name, strings.ReplaceAll(path, "/", "-")}, "-") + return re.ReplaceAllString(raw, "-") +} diff --git a/cmd/admission/main.go b/cmd/admission/main.go index 37bc9e2ff4f..13ca613565f 100644 --- a/cmd/admission/main.go +++ b/cmd/admission/main.go @@ -16,35 +16,24 @@ limitations under the License. package main import ( - "io/ioutil" - "net/http" + "fmt" "os" - "os/signal" "runtime" - "strconv" - "syscall" "time" "github.com/spf13/pflag" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/util/flag" - "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" "volcano.sh/volcano/cmd/admission/app" "volcano.sh/volcano/cmd/admission/app/options" - "volcano.sh/volcano/pkg/admission" - "volcano.sh/volcano/pkg/version" -) - -func serveJobs(w http.ResponseWriter, r *http.Request) { - admission.Serve(w, r, admission.AdmitJobs) -} -func serveMutateJobs(w http.ResponseWriter, r *http.Request) { - admission.Serve(w, r, admission.MutateJobs) -} + _ "volcano.sh/volcano/pkg/admission/jobs/mutate" + _ "volcano.sh/volcano/pkg/admission/jobs/validate" + _ "volcano.sh/volcano/pkg/admission/pods" +) var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes") @@ -57,73 +46,15 @@ func main() { flag.InitFlags() - if config.PrintVersion { - version.PrintVersionAndExit() - } - go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop) defer klog.Flush() - http.HandleFunc(admission.AdmitJobPath, serveJobs) - http.HandleFunc(admission.MutateJobPath, serveMutateJobs) - if err := config.CheckPortOrDie(); err != nil { klog.Fatalf("Configured port is invalid: %v", err) } - addr := ":" + strconv.Itoa(config.Port) - - restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig) - if err != nil { - klog.Fatalf("Unable to build k8s config: %v", err) - } - - admission.VolcanoClientSet = app.GetVolcanoClient(restConfig) - - servePods(config) - caBundle, err := ioutil.ReadFile(config.CaCertFile) - if err != nil { - klog.Fatalf("Unable to read cacert file: %v", err) + if err := app.Run(config); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) } - - err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle) - if err != nil { - klog.Fatalf("Unable to register webhook configs: %v", err) - } - - stopChannel := make(chan os.Signal) - signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT) - - server := &http.Server{ - Addr: addr, - TLSConfig: app.ConfigTLS(config, restConfig), - } - webhookServeError := make(chan struct{}) - go func() { - err = server.ListenAndServeTLS("", "") - if err != nil && err != http.ErrServerClosed { - klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err) - close(webhookServeError) - } - }() - - select { - case <-stopChannel: - if err := server.Close(); err != nil { - klog.Fatalf("Close admission server failed: %v", err) - } - return - case <-webhookServeError: - return - } -} - -func servePods(config *options.Config) { - admController := &admission.Controller{ - VcClients: admission.VolcanoClientSet, - SchedulerName: config.SchedulerName, - } - http.HandleFunc(admission.AdmitPodPath, admController.ServerPods) - - return } diff --git a/hack/.golint_failures b/hack/.golint_failures index 6a566edd03c..b8ea2a469b6 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -1,3 +1,6 @@ +volcano.sh/volcano/pkg/admission/jobs/mutate +volcano.sh/volcano/pkg/admission/router +volcano.sh/volcano/pkg/admission/schema volcano.sh/volcano/pkg/apis/scheduling volcano.sh/volcano/pkg/apis/scheduling/v1alpha1 volcano.sh/volcano/pkg/apis/scheduling/v1alpha2 diff --git a/pkg/admission/mutate_job.go b/pkg/admission/jobs/mutate/mutate_job.go similarity index 71% rename from pkg/admission/mutate_job.go rename to pkg/admission/jobs/mutate/mutate_job.go index 8fba26f7e0d..f94647ea2ea 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/jobs/mutate/mutate_job.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package mutate import ( "encoding/json" @@ -22,9 +22,13 @@ import ( "strconv" "k8s.io/api/admission/v1beta1" + whv1beta1 "k8s.io/api/admissionregistration/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) @@ -33,6 +37,31 @@ const ( DefaultQueue = "default" ) +func init() { + router.RegisterAdmission(service) +} + +var service = &router.AdmissionService{ + Path: "/mutating-jobs", + Func: MutateJobs, + + MutatingConfig: &whv1beta1.MutatingWebhookConfiguration{ + Webhooks: []whv1beta1.Webhook{{ + Name: "mutatejob.volcano.sh", + Rules: []whv1beta1.RuleWithOperations{ + { + Operations: []whv1beta1.OperationType{whv1beta1.Create}, + Rule: whv1beta1.Rule{ + APIGroups: []string{"batch.volcano.sh"}, + APIVersions: []string{"v1alpha1"}, + Resources: []string{"jobs"}, + }, + }, + }, + }}, + }, +} + type patchOperation struct { Op string `json:"op"` Path string `json:"path"` @@ -43,9 +72,9 @@ type patchOperation struct { func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { klog.V(3).Infof("mutating jobs") - job, err := DecodeJob(ar.Request.Object, ar.Request.Resource) + job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } reviewResponse := v1beta1.AdmissionResponse{} @@ -58,7 +87,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { break default: err = fmt.Errorf("expect operation to be 'CREATE' ") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if err != nil { @@ -73,7 +102,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { return &reviewResponse } -func createPatch(job v1alpha1.Job) ([]byte, error) { +func createPatch(job *v1alpha1.Job) ([]byte, error) { var patch []patchOperation pathQueue := patchDefaultQueue(job) if pathQueue != nil { @@ -86,7 +115,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) { return json.Marshal(patch) } -func patchDefaultQueue(job v1alpha1.Job) *patchOperation { +func patchDefaultQueue(job *v1alpha1.Job) *patchOperation { //Add default queue if not specified. if job.Spec.Queue == "" { return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue} diff --git a/pkg/admission/mutate_job_test.go b/pkg/admission/jobs/mutate/mutate_job_test.go similarity index 99% rename from pkg/admission/mutate_job_test.go rename to pkg/admission/jobs/mutate/mutate_job_test.go index f9ba79a9e6d..e6fa8820779 100644 --- a/pkg/admission/mutate_job_test.go +++ b/pkg/admission/jobs/mutate/mutate_job_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package mutate import ( "testing" diff --git a/pkg/admission/admit_job.go b/pkg/admission/jobs/validate/admit_job.go similarity index 77% rename from pkg/admission/admit_job.go rename to pkg/admission/jobs/validate/admit_job.go index a34e8a3bf36..4d69ed360bf 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/jobs/validate/admit_job.go @@ -14,13 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "fmt" "strings" "k8s.io/api/admission/v1beta1" + whv1beta1 "k8s.io/api/admissionregistration/v1beta1" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/validation" @@ -30,23 +31,49 @@ import ( k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1" k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" "volcano.sh/volcano/pkg/controllers/job/plugins" ) -// VolcanoClientSet is volcano clientset -// TODO: make it as package local var. -var VolcanoClientSet vcclientset.Interface +func init() { + router.RegisterAdmission(service) +} + +var service = &router.AdmissionService{ + Path: "/jobs", + Func: AdmitJobs, + + Config: config, + + ValidatingConfig: &whv1beta1.ValidatingWebhookConfiguration{ + Webhooks: []whv1beta1.Webhook{{ + Name: "validatejob.volcano.sh", + Rules: []whv1beta1.RuleWithOperations{ + { + Operations: []whv1beta1.OperationType{whv1beta1.Create, whv1beta1.Update}, + Rule: whv1beta1.Rule{ + APIGroups: []string{"batch.volcano.sh"}, + APIVersions: []string{"v1alpha1"}, + Resources: []string{"jobs"}, + }, + }, + }, + }}, + }, +} + +var config = &router.AdmissionServiceConfig{} // AdmitJobs is to admit jobs and return response func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { - klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation) - job, err := DecodeJob(ar.Request.Object, ar.Request.Resource) + job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } var msg string reviewResponse := v1beta1.AdmissionResponse{} @@ -57,14 +84,14 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { msg = validateJob(job, &reviewResponse) break case v1beta1.Update: - _, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource) + _, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } break default: err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if !reviewResponse.Allowed { @@ -73,8 +100,7 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { return &reviewResponse } -func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string { - +func validateJob(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string { var msg string taskNames := map[string]string{} var totalReplicas int32 @@ -151,9 +177,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st } // Check whether Queue already present or not - if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { // TODO: deprecate v1alpha1 - if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { + if _, err := config.VolcanoClient.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil { msg = msg + fmt.Sprintf(" unable to find job queue: %v", err) } } @@ -165,7 +191,7 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st return msg } -func validateTaskTemplate(task v1alpha1.TaskSpec, job v1alpha1.Job, index int) string { +func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string { var v1PodTemplate v1.PodTemplate v1PodTemplate.Template = *task.Template.DeepCopy() k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate) diff --git a/pkg/admission/admit_job_test.go b/pkg/admission/jobs/validate/admit_job_test.go similarity index 99% rename from pkg/admission/admit_job_test.go rename to pkg/admission/jobs/validate/admit_job_test.go index f1e82439250..fdba61d2e1d 100644 --- a/pkg/admission/admit_job_test.go +++ b/pkg/admission/jobs/validate/admit_job_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "strings" @@ -1046,15 +1046,15 @@ func TestValidateExecution(t *testing.T) { }, } // create fake volcano clientset - VolcanoClientSet = fakeclient.NewSimpleClientset() + config.VolcanoClient = fakeclient.NewSimpleClientset() //create default queue - _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Create(&defaultqueue) + _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Create(&defaultqueue) if err != nil { t.Error("Queue Creation Failed") } - ret := validateJob(testCase.Job, &testCase.reviewResponse) + ret := validateJob(&testCase.Job, &testCase.reviewResponse) //fmt.Printf("test-case name:%s, ret:%v testCase.reviewResponse:%v \n", testCase.Name, ret,testCase.reviewResponse) if testCase.ExpectErr == true && ret == "" { t.Errorf("Expect error msg :%s, but got nil.", testCase.ret) diff --git a/pkg/admission/admission_controller.go b/pkg/admission/jobs/validate/util.go similarity index 71% rename from pkg/admission/admission_controller.go rename to pkg/admission/jobs/validate/util.go index c7f9794607c..3c986bb555f 100644 --- a/pkg/admission/admission_controller.go +++ b/pkg/admission/jobs/validate/util.go @@ -14,55 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package validate import ( "fmt" "github.com/hashicorp/go-multierror" - "k8s.io/api/admission/v1beta1" - admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/klog" "k8s.io/kubernetes/pkg/apis/core/validation" batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" - vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned" ) -const ( - // AdmitJobPath is the pattern for the jobs admission - AdmitJobPath = "/jobs" - // MutateJobPath is the pattern for the mutating jobs - MutateJobPath = "/mutating-jobs" - // AdmitPodPath is the pattern for the pods admission - AdmitPodPath = "/pods" - // CONTENTTYPE http content-type - CONTENTTYPE = "Content-Type" - // APPLICATIONJSON json content - APPLICATIONJSON = "application/json" -) - -//The AdmitFunc returns response -type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse - -// Controller the Admission Controller type -type Controller struct { - VcClients vcclientset.Interface - SchedulerName string -} - -var scheme = runtime.NewScheme() - -//Codecs is for retrieving serializers for the supported wire formats -//and conversion wrappers to define preferred internal and external versions. -var Codecs = serializer.NewCodecFactory(scheme) - // policyEventMap defines all policy events and whether to allow external use var policyEventMap = map[batchv1alpha1.Event]bool{ batchv1alpha1.AnyEvent: true, @@ -86,45 +50,6 @@ var policyActionMap = map[batchv1alpha1.Action]bool{ batchv1alpha1.EnqueueAction: false, } -func init() { - addToScheme(scheme) -} - -func addToScheme(scheme *runtime.Scheme) { - corev1.AddToScheme(scheme) - admissionregistrationv1beta1.AddToScheme(scheme) -} - -//ToAdmissionResponse updates the admission response with the input error -func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse { - klog.Error(err) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } -} - -//DecodeJob decodes the job using deserializer from the raw object -func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (batchv1alpha1.Job, error) { - jobResource := metav1.GroupVersionResource{Group: batchv1alpha1.SchemeGroupVersion.Group, Version: batchv1alpha1.SchemeGroupVersion.Version, Resource: "jobs"} - raw := object.Raw - job := batchv1alpha1.Job{} - - if resource != jobResource { - err := fmt.Errorf("expect resource to be %s", jobResource) - return job, err - } - - deserializer := Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(raw, nil, &job); err != nil { - return job, err - } - klog.V(3).Infof("the job struct is %+v", job) - - return job, nil -} - func validatePolicies(policies []batchv1alpha1.LifecyclePolicy, fldPath *field.Path) error { var err error policyEvents := map[batchv1alpha1.Event]struct{}{} diff --git a/pkg/admission/admit_pod.go b/pkg/admission/pods/admit_pod.go similarity index 56% rename from pkg/admission/admit_pod.go rename to pkg/admission/pods/admit_pod.go index 09d891f498c..ee7b638fda3 100644 --- a/pkg/admission/admit_pod.go +++ b/pkg/admission/pods/admit_pod.go @@ -14,38 +14,64 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package pods import ( "fmt" - "net/http" "strings" "k8s.io/api/admission/v1beta1" + whv1beta1 "k8s.io/api/admissionregistration/v1beta1" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" + "volcano.sh/volcano/pkg/admission/router" + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha1" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" ) -// ServerPods is to server pods -func (c *Controller) ServerPods(w http.ResponseWriter, r *http.Request) { - Serve(w, r, c.AdmitPods) +func init() { + router.RegisterAdmission(service) } +var service = &router.AdmissionService{ + Path: "/pods", + Func: AdmitPods, + + Config: config, + + ValidatingConfig: &whv1beta1.ValidatingWebhookConfiguration{ + Webhooks: []whv1beta1.Webhook{{ + Name: "validatepod.volcano.sh", + Rules: []whv1beta1.RuleWithOperations{ + { + Operations: []whv1beta1.OperationType{whv1beta1.Create}, + Rule: whv1beta1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + }, + }, + }, + }}, + }, +} + +var config = &router.AdmissionServiceConfig{} + // AdmitPods is to admit pods and return response -func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { +func AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { klog.V(3).Infof("admitting pods -- %s", ar.Request.Operation) - pod, err := decodePod(ar.Request.Object, ar.Request.Resource) + pod, err := schema.DecodePod(ar.Request.Object, ar.Request.Resource) if err != nil { - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } var msg string @@ -54,11 +80,11 @@ func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionRes switch ar.Request.Operation { case v1beta1.Create: - msg = c.validatePod(pod, &reviewResponse) + msg = validatePod(pod, &reviewResponse) break default: err := fmt.Errorf("expect operation to be 'CREATE'") - return ToAdmissionResponse(err) + return util.ToAdmissionResponse(err) } if !reviewResponse.Allowed { @@ -67,31 +93,12 @@ func (c *Controller) AdmitPods(ar v1beta1.AdmissionReview) *v1beta1.AdmissionRes return &reviewResponse } -func decodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (v1.Pod, error) { - podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} - raw := object.Raw - pod := v1.Pod{} - - if resource != podResource { - err := fmt.Errorf("expect resource to be %s", podResource) - return pod, err - } - - deserializer := Codecs.UniversalDeserializer() - if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { - return pod, err - } - klog.V(3).Infof("the pod struct is %+v", pod) - - return pod, nil -} - // allow pods to create when // 1. schedulerName of pod isn't volcano // 2. pod has Podgroup whose phase isn't Pending // 3. normal pods whose schedulerName is volcano don't have podgroup -func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { - if pod.Spec.SchedulerName != c.SchedulerName { +func validatePod(pod *v1.Pod, reviewResponse *v1beta1.AdmissionResponse) string { + if pod.Spec.SchedulerName != config.SchedulerName { return "" } @@ -103,7 +110,7 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe pgName = pod.Annotations[v1alpha2.GroupNameAnnotationKey] } if pgName != "" { - if err := c.checkPGPhase(pod, pgName, true); err != nil { + if err := checkPGPhase(pod, pgName, true); err != nil { msg = err.Error() reviewResponse.Allowed = false } @@ -111,8 +118,8 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe } // normal pod, SN == volcano - pgName = helpers.GeneratePodgroupName(&pod) - if err := c.checkPGPhase(pod, pgName, false); err != nil { + pgName = helpers.GeneratePodgroupName(pod) + if err := checkPGPhase(pod, pgName, false); err != nil { msg = err.Error() reviewResponse.Allowed = false } @@ -120,13 +127,13 @@ func (c *Controller) validatePod(pod v1.Pod, reviewResponse *v1beta1.AdmissionRe return msg } -func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVCJob bool) error { - pg, err := c.VcClients.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) +func checkPGPhase(pod *v1.Pod, pgName string, isVCJob bool) error { + pg, err := config.VolcanoClient.SchedulingV1alpha2().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) if err != nil { - pg, err := c.VcClients.SchedulingV1alpha1().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) + pg, err := config.VolcanoClient.SchedulingV1alpha1().PodGroups(pod.Namespace).Get(pgName, metav1.GetOptions{}) if err != nil { if isVCJob || (!isVCJob && !apierrors.IsNotFound(err)) { - return fmt.Errorf("Failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) + return fmt.Errorf("failed to get PodGroup for pod <%s/%s>: %v", pod.Namespace, pod.Name, err) } return nil } @@ -137,6 +144,6 @@ func (c *Controller) checkPGPhase(pod v1.Pod, pgName string, isVCJob bool) error if pg.Status.Phase != v1alpha2.PodGroupPending { return nil } - return fmt.Errorf("Failed to create pod <%s/%s>, because the podgroup phase is Pending", + return fmt.Errorf("failed to create pod <%s/%s> as the podgroup phase is Pending", pod.Namespace, pod.Name) } diff --git a/pkg/admission/admit_pod_test.go b/pkg/admission/pods/admit_pod_test.go similarity index 88% rename from pkg/admission/admit_pod_test.go rename to pkg/admission/pods/admit_pod_test.go index b9b42f1fb04..f71f0cf30f5 100644 --- a/pkg/admission/admit_pod_test.go +++ b/pkg/admission/pods/admit_pod_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package pods import ( "strings" @@ -84,7 +84,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "Failed to create pod , because the podgroup phase is Pending", + ret: "failed to create pod as the podgroup phase is Pending", ExpectErr: true, }, // validate volcano pod with volcano scheduler @@ -106,7 +106,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: "Failed to create pod , because the podgroup phase is Pending", + ret: "failed to create pod as the podgroup phase is Pending", ExpectErr: true, }, // validate volcano pod with volcano scheduler when get pg failed @@ -128,7 +128,7 @@ func TestValidatePod(t *testing.T) { }, reviewResponse: v1beta1.AdmissionResponse{Allowed: false}, - ret: `Failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, + ret: `failed to get PodGroup for pod : podgroups.scheduling "podgroup-p1" not found`, ExpectErr: true, disabledPG: true, }, @@ -150,21 +150,17 @@ func TestValidatePod(t *testing.T) { } // create fake volcano clientset - VolcanoClientSet = vcclient.NewSimpleClientset() + config.VolcanoClient = vcclient.NewSimpleClientset() + config.SchedulerName = "volcano" if !testCase.disabledPG { - _, err := VolcanoClientSet.SchedulingV1alpha2().PodGroups(namespace).Create(pg) + _, err := config.VolcanoClient.SchedulingV1alpha2().PodGroups(namespace).Create(pg) if err != nil { t.Error("PG Creation Failed") } } - c := Controller{ - VcClients: VolcanoClientSet, - SchedulerName: "volcano", - } - - ret := c.validatePod(testCase.Pod, &testCase.reviewResponse) + ret := validatePod(&testCase.Pod, &testCase.reviewResponse) if testCase.ExpectErr == true && ret == "" { t.Errorf("%s: test case Expect error msg :%s, but got nil.", testCase.Name, testCase.ret) diff --git a/pkg/admission/router/factory.go b/pkg/admission/router/factory.go new file mode 100644 index 00000000000..6461fefea31 --- /dev/null +++ b/pkg/admission/router/factory.go @@ -0,0 +1,36 @@ +package router + +import ( + "fmt" + "net/http" + "sync" +) + +type AdmissionHandler func(w http.ResponseWriter, r *http.Request) + +var admissionMap = make(map[string]*AdmissionService) +var admissionMutex sync.Mutex + +func RegisterAdmission(service *AdmissionService) error { + admissionMutex.Lock() + defer admissionMutex.Unlock() + + if _, found := admissionMap[service.Path]; found { + return fmt.Errorf("duplicated admission service for %s", service.Path) + } + + // Also register handler to the service. + service.Handler = func(w http.ResponseWriter, r *http.Request) { + Serve(w, r, service.Func) + } + + admissionMap[service.Path] = service + + return nil +} + +func ForEachAdmission(handler func(*AdmissionService)) { + for _, f := range admissionMap { + handler(f) + } +} diff --git a/pkg/admission/router/interface.go b/pkg/admission/router/interface.go new file mode 100644 index 00000000000..050ddb276b0 --- /dev/null +++ b/pkg/admission/router/interface.go @@ -0,0 +1,29 @@ +package router + +import ( + "k8s.io/api/admission/v1beta1" + whv1beta1 "k8s.io/api/admissionregistration/v1beta1" + "k8s.io/client-go/kubernetes" + + "volcano.sh/volcano/pkg/client/clientset/versioned" +) + +//The AdmitFunc returns response +type AdmitFunc func(v1beta1.AdmissionReview) *v1beta1.AdmissionResponse + +type AdmissionServiceConfig struct { + SchedulerName string + KubeClient kubernetes.Interface + VolcanoClient versioned.Interface +} + +type AdmissionService struct { + Path string + Func AdmitFunc + Handler AdmissionHandler + + ValidatingConfig *whv1beta1.ValidatingWebhookConfiguration + MutatingConfig *whv1beta1.MutatingWebhookConfiguration + + Config *AdmissionServiceConfig +} diff --git a/pkg/admission/server.go b/pkg/admission/router/server.go similarity index 85% rename from pkg/admission/server.go rename to pkg/admission/router/server.go index f61a8d9d66c..5132606a928 100644 --- a/pkg/admission/server.go +++ b/pkg/admission/router/server.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package admission +package router import ( "encoding/json" @@ -24,8 +24,17 @@ import ( "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog" + + "volcano.sh/volcano/pkg/admission/schema" + "volcano.sh/volcano/pkg/admission/util" ) +// CONTENTTYPE http content-type +var CONTENTTYPE = "Content-Type" + +// APPLICATIONJSON json content +var APPLICATIONJSON = "application/json" + // Serve the http request func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { var body []byte @@ -44,9 +53,9 @@ func Serve(w http.ResponseWriter, r *http.Request, admit AdmitFunc) { var reviewResponse *v1beta1.AdmissionResponse ar := v1beta1.AdmissionReview{} - deserializer := Codecs.UniversalDeserializer() + deserializer := schema.Codecs.UniversalDeserializer() if _, _, err := deserializer.Decode(body, nil, &ar); err != nil { - reviewResponse = ToAdmissionResponse(err) + reviewResponse = util.ToAdmissionResponse(err) } else { reviewResponse = admit(ar) } diff --git a/pkg/admission/schema/schema.go b/pkg/admission/schema/schema.go new file mode 100644 index 00000000000..20d60005bbd --- /dev/null +++ b/pkg/admission/schema/schema.go @@ -0,0 +1,69 @@ +package schema + +import ( + "fmt" + + "k8s.io/api/admission/v1beta1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/klog" + corev1 "k8s.io/kubernetes/pkg/apis/core/v1" + + batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +func init() { + addToScheme(scheme) +} + +var scheme = runtime.NewScheme() + +//Codecs is for retrieving serializers for the supported wire formats +//and conversion wrappers to define preferred internal and external versions. +var Codecs = serializer.NewCodecFactory(scheme) + +func addToScheme(scheme *runtime.Scheme) { + corev1.AddToScheme(scheme) + v1beta1.AddToScheme(scheme) +} + +//DecodeJob decodes the job using deserializer from the raw object +func DecodeJob(object runtime.RawExtension, resource metav1.GroupVersionResource) (*batchv1alpha1.Job, error) { + jobResource := metav1.GroupVersionResource{Group: batchv1alpha1.SchemeGroupVersion.Group, Version: batchv1alpha1.SchemeGroupVersion.Version, Resource: "jobs"} + raw := object.Raw + job := batchv1alpha1.Job{} + + if resource != jobResource { + err := fmt.Errorf("expect resource to be %s", jobResource) + return &job, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &job); err != nil { + return &job, err + } + klog.V(3).Infof("the job struct is %+v", job) + + return &job, nil +} + +func DecodePod(object runtime.RawExtension, resource metav1.GroupVersionResource) (*v1.Pod, error) { + podResource := metav1.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} + raw := object.Raw + pod := v1.Pod{} + + if resource != podResource { + err := fmt.Errorf("expect resource to be %s", podResource) + return &pod, err + } + + deserializer := Codecs.UniversalDeserializer() + if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { + return &pod, err + } + klog.V(3).Infof("the pod struct is %+v", pod) + + return &pod, nil +} diff --git a/pkg/admission/util/util.go b/pkg/admission/util/util.go new file mode 100644 index 00000000000..388039806a8 --- /dev/null +++ b/pkg/admission/util/util.go @@ -0,0 +1,15 @@ +package util + +import "k8s.io/api/admission/v1beta1" +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import "k8s.io/klog" + +//ToAdmissionResponse updates the admission response with the input error +func ToAdmissionResponse(err error) *v1beta1.AdmissionResponse { + klog.Error(err) + return &v1beta1.AdmissionResponse{ + Result: &metav1.Status{ + Message: err.Error(), + }, + } +} diff --git a/test/e2e/admission.go b/test/e2e/admission.go index 02531eecb72..28ce91dfa05 100644 --- a/test/e2e/admission.go +++ b/test/e2e/admission.go @@ -225,6 +225,6 @@ var _ = Describe("Job E2E Test: Test Admission service", func() { Expect(err).NotTo(HaveOccurred()) _, err = context.kubeclient.CoreV1().Pods(namespace).Create(pod) - Expect(err.Error()).Should(ContainSubstring(`Failed to create pod , because the podgroup phase is Pending`)) + Expect(err.Error()).Should(ContainSubstring(`failed to create pod as the podgroup phase is Pending`)) }) })