Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
233 changes: 17 additions & 216 deletions cmd/admission/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ import (
"fmt"

"github.com/spf13/pflag"

"k8s.io/api/admissionregistration/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
admissionregistrationv1beta1 "k8s.io/client-go/kubernetes/typed/admissionregistration/v1beta1"
"k8s.io/klog"
)

const (
Expand All @@ -35,20 +28,17 @@ const (

// Config admission-controller server config.
type Config struct {
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
MutateWebhookConfigName string
MutateWebhookName string
ValidateWebhookConfigName string
ValidateWebhookName string
PrintVersion bool
AdmissionServiceName string
AdmissionServiceNamespace string
SchedulerName string
Master string
Kubeconfig string
CertFile string
KeyFile string
CaCertFile string
Port int
PrintVersion bool
WebhookName string
WebhookNamespace string
SchedulerName string
WebhookURL string
}

// NewConfig create new config
Expand All @@ -65,210 +55,21 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) {
"File containing the default x509 Certificate for HTTPS. (CA cert, if any, concatenated "+
"after server cert).")
fs.StringVar(&c.KeyFile, "tls-private-key-file", c.KeyFile, "File containing the default x509 private key matching --tls-cert-file.")
fs.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.")
fs.IntVar(&c.Port, "port", 443, "the port used by admission-controller-server.")
fs.StringVar(&c.MutateWebhookConfigName, "mutate-webhook-config-name", "",
"Name of the mutatingwebhookconfiguration resource in Kubernetes [Deprecated]: it will be generated when not specified.")
fs.StringVar(&c.MutateWebhookName, "mutate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
fs.StringVar(&c.ValidateWebhookConfigName, "validate-webhook-config-name", "",
"Name of the mutatingwebhookconfiguration resource in Kubernetes. [Deprecated]: it will be generated when not specified")
fs.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "",
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
fs.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
fs.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook")
fs.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service")

fs.StringVar(&c.CaCertFile, "ca-cert-file", c.CaCertFile, "File containing the x509 Certificate for HTTPS.")
fs.StringVar(&c.WebhookNamespace, "webhook-namespace", "", "The namespace of this webhook")
fs.StringVar(&c.WebhookName, "webhook-service-name", "", "The name of this webhook")
fs.StringVar(&c.WebhookURL, "webhook-url", "", "The url of this webhook")

fs.StringVar(&c.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
}

const (
// ValidateConfigName ValidatingWebhookConfiguration name format
ValidateConfigName = "%s-validate-job"
// MutateConfigName MutatingWebhookConfiguration name format
MutateConfigName = "%s-mutate-job"
// ValidateHookName Default name for webhooks in ValidatingWebhookConfiguration
ValidateHookName = "validatejob.volcano.sh"
// MutateHookName Default name for webhooks in MutatingWebhookConfiguration
MutateHookName = "mutatejob.volcano.sh"
// ValidatePodConfigName ValidatingWebhookPodConfiguration name format
ValidatePodConfigName = "%s-validate-pod"
// ValidatePodHookName Default name for webhooks in ValidatingWebhookPodConfiguration
ValidatePodHookName = "validatepod.volcano.sh"
)

// CheckPortOrDie check valid port range
func (c *Config) CheckPortOrDie() error {
if c.Port < 1 || c.Port > 65535 {
return fmt.Errorf("the port should be in the range of 1 and 65535")
}
return nil
}

func useGeneratedNameIfRequired(configured, generated string) string {
if configured != "" {
return configured
}
return generated
}

// RegisterWebhooks register webhooks for admission service
func RegisterWebhooks(c *Config, clienset *kubernetes.Clientset, cabundle []byte) error {
ignorePolicy := v1beta1.Ignore

//Prepare validate webhooks
path := "/jobs"
JobValidateHooks := v1beta1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.ValidateWebhookConfigName,
fmt.Sprintf(ValidateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.ValidateWebhookName, ValidateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create, v1beta1.Update},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}

if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
[]v1beta1.ValidatingWebhookConfiguration{JobValidateHooks}); err != nil {
return err
}

//Prepare mutate jobs
path = "/mutating-jobs"
JobMutateHooks := v1beta1.MutatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired(c.MutateWebhookConfigName,
fmt.Sprintf(MutateConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired(c.MutateWebhookName, MutateHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create},
Rule: v1beta1.Rule{
APIGroups: []string{"batch.volcano.sh"},
APIVersions: []string{"v1alpha1"},
Resources: []string{"jobs"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}

if err := registerMutateWebhook(clienset.AdmissionregistrationV1beta1().MutatingWebhookConfigurations(),
[]v1beta1.MutatingWebhookConfiguration{JobMutateHooks}); err != nil {
return err
}

// Prepare validate pods
path = "/pods"
PodValidateHooks := v1beta1.ValidatingWebhookConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: useGeneratedNameIfRequired("",
fmt.Sprintf(ValidatePodConfigName, c.AdmissionServiceName)),
},
Webhooks: []v1beta1.Webhook{{
Name: useGeneratedNameIfRequired("", ValidatePodHookName),
Rules: []v1beta1.RuleWithOperations{
{
Operations: []v1beta1.OperationType{v1beta1.Create},
Rule: v1beta1.Rule{
APIGroups: []string{""},
APIVersions: []string{"v1"},
Resources: []string{"pods"},
},
},
},
ClientConfig: v1beta1.WebhookClientConfig{
Service: &v1beta1.ServiceReference{
Name: c.AdmissionServiceName,
Namespace: c.AdmissionServiceNamespace,
Path: &path,
},
CABundle: cabundle,
},
FailurePolicy: &ignorePolicy,
}},
}

if err := registerValidateWebhook(clienset.AdmissionregistrationV1beta1().ValidatingWebhookConfigurations(),
[]v1beta1.ValidatingWebhookConfiguration{PodValidateHooks}); err != nil {
return err
}

return nil

}

func registerMutateWebhook(client admissionregistrationv1beta1.MutatingWebhookConfigurationInterface,
webhooks []v1beta1.MutatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
klog.Infof("Updating MutatingWebhookConfiguration %v", hook)
existing.Webhooks = hook.Webhooks
if _, err := client.Update(existing); err != nil {
return err
}
} else {
klog.Infof("Creating MutatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
return nil
}

func registerValidateWebhook(client admissionregistrationv1beta1.ValidatingWebhookConfigurationInterface,
webhooks []v1beta1.ValidatingWebhookConfiguration) error {
for _, hook := range webhooks {
existing, err := client.Get(hook.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
if err == nil && existing != nil {
existing.Webhooks = hook.Webhooks
klog.Infof("Updating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Update(existing); err != nil {
return err
}
} else {
klog.Infof("Creating ValidatingWebhookConfiguration %v", hook)
if _, err := client.Create(&hook); err != nil {
return err
}
}
}
return nil
}
97 changes: 60 additions & 37 deletions cmd/admission/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,60 +17,83 @@ limitations under the License.
package app

import (
"crypto/tls"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"

"volcano.sh/volcano/cmd/admission/app/options"
"volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/admission/router"
"volcano.sh/volcano/pkg/version"
)

// GetClient Get a clientset with restConfig.
func GetClient(restConfig *rest.Config) *kubernetes.Clientset {
clientset, err := kubernetes.NewForConfig(restConfig)
// Run start the service of admission controller.
func Run(config *options.Config) error {
if config.PrintVersion {
version.PrintVersionAndExit()
return nil
}

if config.WebhookURL == "" && config.WebhookNamespace == "" && config.WebhookName == "" {
return fmt.Errorf("failed to start webhooks as both 'url' and 'namespace/name' of webhook are empty")
}

restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
if err != nil {
klog.Fatal(err)
return fmt.Errorf("unable to build k8s config: %v", err)
}
return clientset
}

// GetVolcanoClient get a clientset for volcano
func GetVolcanoClient(restConfig *rest.Config) *versioned.Clientset {
clientset, err := versioned.NewForConfig(restConfig)
caBundle, err := ioutil.ReadFile(config.CaCertFile)
if err != nil {
klog.Fatal(err)
return fmt.Errorf("unable to read cacert file (%s): %v", config.CaCertFile, err)
}
return clientset
}

// ConfigTLS is a helper function that generate tls certificates from directly defined tls config or kubeconfig
// These are passed in as command line for cluster certification. If tls config is passed in, we use the directly
// defined tls config, else use that defined in kubeconfig
func ConfigTLS(config *options.Config, restConfig *rest.Config) *tls.Config {
if len(config.CertFile) != 0 && len(config.KeyFile) != 0 {
sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
if err != nil {
klog.Fatal(err)
vClient := getVolcanoClient(restConfig)
kubeClient := getKubeClient(restConfig)
router.ForEachAdmission(func(service *router.AdmissionService) {
if service.Config != nil {
service.Config.VolcanoClient = vClient
service.Config.SchedulerName = config.SchedulerName
}

return &tls.Config{
Certificates: []tls.Certificate{sCert},
}
}
klog.V(3).Infof("Registered '%s' as webhook.", service.Path)
http.HandleFunc(service.Path, service.Handler)

klog.V(3).Infof("Registered configuration for webhook <%s>", service.Path)
registerWebhookConfig(kubeClient, config, service, caBundle)
})

webhookServeError := make(chan struct{})
stopChannel := make(chan os.Signal)
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)

if len(restConfig.CertData) != 0 && len(restConfig.KeyData) != 0 {
sCert, err := tls.X509KeyPair(restConfig.CertData, restConfig.KeyData)
if err != nil {
klog.Fatal(err)
server := &http.Server{
Addr: ":" + strconv.Itoa(config.Port),
TLSConfig: configTLS(config, restConfig),
}
go func() {
err = server.ListenAndServeTLS("", "")
if err != nil && err != http.ErrServerClosed {
klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err)
close(webhookServeError)
}

return &tls.Config{
Certificates: []tls.Certificate{sCert},
klog.Info("Volcano Webhook manager started.")
}()

select {
case <-stopChannel:
if err := server.Close(); err != nil {
return fmt.Errorf("close admission server failed: %v", err)
}
return nil
case <-webhookServeError:
return fmt.Errorf("unknown webhook server error")
}

klog.Fatal("tls: failed to find any tls config data")
return &tls.Config{}
}
Loading