Skip to content

Commit 07a3c24

Browse files
committed
Admission refactor.
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
1 parent cbe3c00 commit 07a3c24

File tree

16 files changed

+347
-243
lines changed

16 files changed

+347
-243
lines changed

cmd/admission/app/options/options.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type Config struct {
4646
ValidateWebhookConfigName string
4747
ValidateWebhookName string
4848
PrintVersion bool
49+
PrintYAML bool
4950
AdmissionServiceName string
5051
AdmissionServiceNamespace string
5152
SchedulerName string
@@ -76,6 +77,7 @@ func (c *Config) AddFlags(fs *pflag.FlagSet) {
7677
fs.StringVar(&c.ValidateWebhookName, "validate-webhook-name", "",
7778
"Name of the webhook entry in the webhook config. [Deprecated]: it will be generated when not specified")
7879
fs.BoolVar(&c.PrintVersion, "version", false, "Show version and quit")
80+
fs.BoolVar(&c.PrintYAML, "yaml", false, "Show the yaml of all webhook")
7981
fs.StringVar(&c.AdmissionServiceNamespace, "webhook-namespace", "default", "The namespace of this webhook")
8082
fs.StringVar(&c.AdmissionServiceName, "webhook-service-name", "admission-service", "The name of this admission service")
8183
fs.StringVar(&c.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
@@ -235,7 +237,7 @@ func registerMutateWebhook(client admissionregistrationv1beta1.MutatingWebhookCo
235237
return err
236238
}
237239
if err == nil && existing != nil {
238-
klog.Infof("Updating MutatingWebhookConfiguration %v", hook)
240+
klog.V(4).Infof("Updating MutatingWebhookConfiguration %v", hook)
239241
existing.Webhooks = hook.Webhooks
240242
if _, err := client.Update(existing); err != nil {
241243
return err
@@ -259,12 +261,12 @@ func registerValidateWebhook(client admissionregistrationv1beta1.ValidatingWebho
259261
}
260262
if err == nil && existing != nil {
261263
existing.Webhooks = hook.Webhooks
262-
klog.Infof("Updating ValidatingWebhookConfiguration %v", hook)
264+
klog.V(4).Infof("Updating ValidatingWebhookConfiguration %v", hook)
263265
if _, err := client.Update(existing); err != nil {
264266
return err
265267
}
266268
} else {
267-
klog.Infof("Creating ValidatingWebhookConfiguration %v", hook)
269+
klog.V(4).Infof("Creating ValidatingWebhookConfiguration %v", hook)
268270
if _, err := client.Create(&hook); err != nil {
269271
return err
270272
}

cmd/admission/app/server.go

Lines changed: 87 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,99 @@ package app
1818

1919
import (
2020
"crypto/tls"
21+
"fmt"
22+
"io/ioutil"
23+
"net/http"
24+
"os"
25+
"os/signal"
26+
"strconv"
27+
"syscall"
2128

2229
"k8s.io/client-go/kubernetes"
2330
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/tools/clientcmd"
2432
"k8s.io/klog"
25-
33+
2634
"volcano.sh/volcano/cmd/admission/app/options"
35+
"volcano.sh/volcano/pkg/admission/router"
2736
"volcano.sh/volcano/pkg/client/clientset/versioned"
37+
"volcano.sh/volcano/pkg/version"
2838
)
2939

40+
// Run start the service of admission controller.
41+
func Run(config *options.Config) error {
42+
if config.PrintVersion {
43+
version.PrintVersionAndExit()
44+
return nil
45+
}
46+
47+
if config.PrintYAML {
48+
router.ForEachAdmission(func(service *router.AdmissionService) {
49+
fmt.Print("---\n")
50+
fmt.Printf("%s\n", service.Path)
51+
})
52+
return nil
53+
}
54+
55+
restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
56+
if err != nil {
57+
return fmt.Errorf("unable to build k8s config: %v", err)
58+
}
59+
60+
vClient := getVolcanoClient(restConfig)
61+
router.ForEachAdmission(func(service *router.AdmissionService) {
62+
if service.Config != nil {
63+
service.Config.VolcanoClient = vClient
64+
service.Config.SchedulerName = config.SchedulerName
65+
}
66+
67+
klog.V(3).Infof("Registered '%s' as webhook.", service.Path)
68+
http.HandleFunc(service.Path, service.Handler)
69+
})
70+
71+
caBundle, err := ioutil.ReadFile(config.CaCertFile)
72+
if err != nil {
73+
return fmt.Errorf("unable to read cacert file: %v", err)
74+
}
75+
76+
err = options.RegisterWebhooks(config, getClient(restConfig), caBundle)
77+
if err != nil {
78+
return fmt.Errorf("unable to register webhook configs: %v", err)
79+
}
80+
81+
klog.V(3).Info("Registered all webhooks to the api-server.")
82+
83+
webhookServeError := make(chan struct{})
84+
stopChannel := make(chan os.Signal)
85+
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
86+
87+
server := &http.Server{
88+
Addr: ":" + strconv.Itoa(config.Port),
89+
TLSConfig: configTLS(config, restConfig),
90+
}
91+
go func() {
92+
err = server.ListenAndServeTLS("", "")
93+
if err != nil && err != http.ErrServerClosed {
94+
klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err)
95+
close(webhookServeError)
96+
}
97+
98+
klog.Info("Volcano Webhook manager started.")
99+
}()
100+
101+
select {
102+
case <-stopChannel:
103+
if err := server.Close(); err != nil {
104+
return fmt.Errorf("close admission server failed: %v", err)
105+
}
106+
return nil
107+
case <-webhookServeError:
108+
return fmt.Errorf("unknown webhook server error")
109+
}
110+
}
111+
30112
// GetClient Get a clientset with restConfig.
31-
func GetClient(restConfig *rest.Config) *kubernetes.Clientset {
113+
func getClient(restConfig *rest.Config) *kubernetes.Clientset {
32114
clientset, err := kubernetes.NewForConfig(restConfig)
33115
if err != nil {
34116
klog.Fatal(err)
@@ -37,18 +119,18 @@ func GetClient(restConfig *rest.Config) *kubernetes.Clientset {
37119
}
38120

39121
// GetVolcanoClient get a clientset for volcano
40-
func GetVolcanoClient(restConfig *rest.Config) *versioned.Clientset {
122+
func getVolcanoClient(restConfig *rest.Config) *versioned.Clientset {
41123
clientset, err := versioned.NewForConfig(restConfig)
42124
if err != nil {
43125
klog.Fatal(err)
44126
}
45127
return clientset
46128
}
47129

48-
// ConfigTLS is a helper function that generate tls certificates from directly defined tls config or kubeconfig
130+
// configTLS is a helper function that generate tls certificates from directly defined tls config or kubeconfig
49131
// These are passed in as command line for cluster certification. If tls config is passed in, we use the directly
50132
// defined tls config, else use that defined in kubeconfig
51-
func ConfigTLS(config *options.Config, restConfig *rest.Config) *tls.Config {
133+
func configTLS(config *options.Config, restConfig *rest.Config) *tls.Config {
52134
if len(config.CertFile) != 0 && len(config.KeyFile) != 0 {
53135
sCert, err := tls.LoadX509KeyPair(config.CertFile, config.KeyFile)
54136
if err != nil {

cmd/admission/main.go

Lines changed: 8 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,24 @@ limitations under the License.
1616
package main
1717

1818
import (
19-
"io/ioutil"
20-
"net/http"
19+
"fmt"
2120
"os"
22-
"os/signal"
2321
"runtime"
24-
"strconv"
25-
"syscall"
2622
"time"
2723

2824
"github.com/spf13/pflag"
2925

3026
"k8s.io/apimachinery/pkg/util/wait"
3127
"k8s.io/apiserver/pkg/util/flag"
32-
"k8s.io/client-go/tools/clientcmd"
3328
"k8s.io/klog"
3429

3530
"volcano.sh/volcano/cmd/admission/app"
3631
"volcano.sh/volcano/cmd/admission/app/options"
37-
"volcano.sh/volcano/pkg/admission"
38-
"volcano.sh/volcano/pkg/version"
39-
)
40-
41-
func serveJobs(w http.ResponseWriter, r *http.Request) {
42-
admission.Serve(w, r, admission.AdmitJobs)
43-
}
4432

45-
func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
46-
admission.Serve(w, r, admission.MutateJobs)
47-
}
33+
_ "volcano.sh/volcano/pkg/admission/jobs/mutate"
34+
_ "volcano.sh/volcano/pkg/admission/jobs/validate"
35+
_ "volcano.sh/volcano/pkg/admission/pods"
36+
)
4837

4938
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
5039

@@ -57,73 +46,15 @@ func main() {
5746

5847
flag.InitFlags()
5948

60-
if config.PrintVersion {
61-
version.PrintVersionAndExit()
62-
}
63-
6449
go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop)
6550
defer klog.Flush()
6651

67-
http.HandleFunc(admission.AdmitJobPath, serveJobs)
68-
http.HandleFunc(admission.MutateJobPath, serveMutateJobs)
69-
7052
if err := config.CheckPortOrDie(); err != nil {
7153
klog.Fatalf("Configured port is invalid: %v", err)
7254
}
73-
addr := ":" + strconv.Itoa(config.Port)
74-
75-
restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
76-
if err != nil {
77-
klog.Fatalf("Unable to build k8s config: %v", err)
78-
}
79-
80-
admission.VolcanoClientSet = app.GetVolcanoClient(restConfig)
81-
82-
servePods(config)
8355

84-
caBundle, err := ioutil.ReadFile(config.CaCertFile)
85-
if err != nil {
86-
klog.Fatalf("Unable to read cacert file: %v", err)
56+
if err := app.Run(config); err != nil {
57+
fmt.Fprintf(os.Stderr, "%v\n", err)
58+
os.Exit(1)
8759
}
88-
89-
err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
90-
if err != nil {
91-
klog.Fatalf("Unable to register webhook configs: %v", err)
92-
}
93-
94-
stopChannel := make(chan os.Signal)
95-
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
96-
97-
server := &http.Server{
98-
Addr: addr,
99-
TLSConfig: app.ConfigTLS(config, restConfig),
100-
}
101-
webhookServeError := make(chan struct{})
102-
go func() {
103-
err = server.ListenAndServeTLS("", "")
104-
if err != nil && err != http.ErrServerClosed {
105-
klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err)
106-
close(webhookServeError)
107-
}
108-
}()
109-
110-
select {
111-
case <-stopChannel:
112-
if err := server.Close(); err != nil {
113-
klog.Fatalf("Close admission server failed: %v", err)
114-
}
115-
return
116-
case <-webhookServeError:
117-
return
118-
}
119-
}
120-
121-
func servePods(config *options.Config) {
122-
admController := &admission.Controller{
123-
VcClients: admission.VolcanoClientSet,
124-
SchedulerName: config.SchedulerName,
125-
}
126-
http.HandleFunc(admission.AdmitPodPath, admController.ServerPods)
127-
128-
return
12960
}

hack/.golint_failures

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
volcano.sh/volcano/pkg/admission/jobs/mutate
2+
volcano.sh/volcano/pkg/admission/router
3+
volcano.sh/volcano/pkg/admission/schema
14
volcano.sh/volcano/pkg/apis/scheduling
25
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
36
volcano.sh/volcano/pkg/apis/scheduling/v1alpha2
Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"encoding/json"
@@ -25,6 +25,9 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/klog"
2727

28+
"volcano.sh/volcano/pkg/admission/router"
29+
"volcano.sh/volcano/pkg/admission/schema"
30+
"volcano.sh/volcano/pkg/admission/util"
2831
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
2932
)
3033

@@ -33,6 +36,15 @@ const (
3336
DefaultQueue = "default"
3437
)
3538

39+
func init() {
40+
router.RegisterAdmission(service)
41+
}
42+
43+
var service = &router.AdmissionService{
44+
Path: "/mutating-jobs",
45+
Func: MutateJobs,
46+
}
47+
3648
type patchOperation struct {
3749
Op string `json:"op"`
3850
Path string `json:"path"`
@@ -43,9 +55,9 @@ type patchOperation struct {
4355
func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
4456
klog.V(3).Infof("mutating jobs")
4557

46-
job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
58+
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
4759
if err != nil {
48-
return ToAdmissionResponse(err)
60+
return util.ToAdmissionResponse(err)
4961
}
5062

5163
reviewResponse := v1beta1.AdmissionResponse{}
@@ -58,7 +70,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5870
break
5971
default:
6072
err = fmt.Errorf("expect operation to be 'CREATE' ")
61-
return ToAdmissionResponse(err)
73+
return util.ToAdmissionResponse(err)
6274
}
6375

6476
if err != nil {
@@ -73,7 +85,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7385
return &reviewResponse
7486
}
7587

76-
func createPatch(job v1alpha1.Job) ([]byte, error) {
88+
func createPatch(job *v1alpha1.Job) ([]byte, error) {
7789
var patch []patchOperation
7890
pathQueue := patchDefaultQueue(job)
7991
if pathQueue != nil {
@@ -86,7 +98,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) {
8698
return json.Marshal(patch)
8799
}
88100

89-
func patchDefaultQueue(job v1alpha1.Job) *patchOperation {
101+
func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
90102
//Add default queue if not specified.
91103
if job.Spec.Queue == "" {
92104
return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue}

pkg/admission/mutate_job_test.go renamed to pkg/admission/jobs/mutate/mutate_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"testing"

0 commit comments

Comments
 (0)