Skip to content

Commit 45832d9

Browse files
committed
Admission refactor.
Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
1 parent cbe3c00 commit 45832d9

15 files changed

Lines changed: 314 additions & 230 deletions

File tree

cmd/admission/app/server.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,78 @@ package app
1818

1919
import (
2020
"crypto/tls"
21+
"fmt"
22+
"io/ioutil"
23+
"net/http"
24+
"os"
25+
"os/signal"
26+
"strconv"
27+
"syscall"
2128

2229
"k8s.io/client-go/kubernetes"
2330
"k8s.io/client-go/rest"
31+
"k8s.io/client-go/tools/clientcmd"
2432
"k8s.io/klog"
2533

2634
"volcano.sh/volcano/cmd/admission/app/options"
35+
"volcano.sh/volcano/pkg/admission/router"
2736
"volcano.sh/volcano/pkg/client/clientset/versioned"
2837
)
2938

39+
// Run start the service of admission controller.
40+
func Run(config *options.Config) error {
41+
42+
restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
43+
if err != nil {
44+
return fmt.Errorf("unable to build k8s config: %v", err)
45+
}
46+
47+
vClient := GetVolcanoClient(restConfig)
48+
router.ForEachAdmission(func(service *router.AdmissionService) {
49+
if service.Config != nil {
50+
service.Config.VolcanoClient = vClient
51+
service.Config.SchedulerName = config.SchedulerName
52+
}
53+
http.HandleFunc(service.Path, service.Handler)
54+
})
55+
56+
caBundle, err := ioutil.ReadFile(config.CaCertFile)
57+
if err != nil {
58+
return fmt.Errorf("unable to read cacert file: %v", err)
59+
}
60+
61+
err = options.RegisterWebhooks(config, GetClient(restConfig), caBundle)
62+
if err != nil {
63+
return fmt.Errorf("unable to register webhook configs: %v", err)
64+
}
65+
66+
webhookServeError := make(chan struct{})
67+
stopChannel := make(chan os.Signal)
68+
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
69+
70+
server := &http.Server{
71+
Addr: ":" + strconv.Itoa(config.Port),
72+
TLSConfig: ConfigTLS(config, restConfig),
73+
}
74+
go func() {
75+
err = server.ListenAndServeTLS("", "")
76+
if err != nil && err != http.ErrServerClosed {
77+
klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err)
78+
close(webhookServeError)
79+
}
80+
}()
81+
82+
select {
83+
case <-stopChannel:
84+
if err := server.Close(); err != nil {
85+
return fmt.Errorf("close admission server failed: %v", err)
86+
}
87+
return nil
88+
case <-webhookServeError:
89+
return fmt.Errorf("unknown webhook server error")
90+
}
91+
}
92+
3093
// GetClient Get a clientset with restConfig.
3194
func GetClient(restConfig *rest.Config) *kubernetes.Clientset {
3295
clientset, err := kubernetes.NewForConfig(restConfig)

cmd/admission/main.go

Lines changed: 8 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,25 @@ limitations under the License.
1616
package main
1717

1818
import (
19-
"io/ioutil"
20-
"net/http"
19+
"fmt"
2120
"os"
22-
"os/signal"
2321
"runtime"
24-
"strconv"
25-
"syscall"
2622
"time"
2723

2824
"github.com/spf13/pflag"
2925

3026
"k8s.io/apimachinery/pkg/util/wait"
3127
"k8s.io/apiserver/pkg/util/flag"
32-
"k8s.io/client-go/tools/clientcmd"
3328
"k8s.io/klog"
3429

3530
"volcano.sh/volcano/cmd/admission/app"
3631
"volcano.sh/volcano/cmd/admission/app/options"
37-
"volcano.sh/volcano/pkg/admission"
3832
"volcano.sh/volcano/pkg/version"
39-
)
40-
41-
func serveJobs(w http.ResponseWriter, r *http.Request) {
42-
admission.Serve(w, r, admission.AdmitJobs)
43-
}
4433

45-
func serveMutateJobs(w http.ResponseWriter, r *http.Request) {
46-
admission.Serve(w, r, admission.MutateJobs)
47-
}
34+
_ "volcano.sh/volcano/pkg/admission/jobs/mutate"
35+
_ "volcano.sh/volcano/pkg/admission/jobs/validate"
36+
_ "volcano.sh/volcano/pkg/admission/pods"
37+
)
4838

4939
var logFlushFreq = pflag.Duration("log-flush-frequency", 5*time.Second, "Maximum number of seconds between log flushes")
5040

@@ -64,66 +54,12 @@ func main() {
6454
go wait.Until(klog.Flush, *logFlushFreq, wait.NeverStop)
6555
defer klog.Flush()
6656

67-
http.HandleFunc(admission.AdmitJobPath, serveJobs)
68-
http.HandleFunc(admission.MutateJobPath, serveMutateJobs)
69-
7057
if err := config.CheckPortOrDie(); err != nil {
7158
klog.Fatalf("Configured port is invalid: %v", err)
7259
}
73-
addr := ":" + strconv.Itoa(config.Port)
74-
75-
restConfig, err := clientcmd.BuildConfigFromFlags(config.Master, config.Kubeconfig)
76-
if err != nil {
77-
klog.Fatalf("Unable to build k8s config: %v", err)
78-
}
79-
80-
admission.VolcanoClientSet = app.GetVolcanoClient(restConfig)
81-
82-
servePods(config)
83-
84-
caBundle, err := ioutil.ReadFile(config.CaCertFile)
85-
if err != nil {
86-
klog.Fatalf("Unable to read cacert file: %v", err)
87-
}
88-
89-
err = options.RegisterWebhooks(config, app.GetClient(restConfig), caBundle)
90-
if err != nil {
91-
klog.Fatalf("Unable to register webhook configs: %v", err)
92-
}
93-
94-
stopChannel := make(chan os.Signal)
95-
signal.Notify(stopChannel, syscall.SIGTERM, syscall.SIGINT)
96-
97-
server := &http.Server{
98-
Addr: addr,
99-
TLSConfig: app.ConfigTLS(config, restConfig),
100-
}
101-
webhookServeError := make(chan struct{})
102-
go func() {
103-
err = server.ListenAndServeTLS("", "")
104-
if err != nil && err != http.ErrServerClosed {
105-
klog.Fatalf("ListenAndServeTLS for admission webhook failed: %v", err)
106-
close(webhookServeError)
107-
}
108-
}()
109-
110-
select {
111-
case <-stopChannel:
112-
if err := server.Close(); err != nil {
113-
klog.Fatalf("Close admission server failed: %v", err)
114-
}
115-
return
116-
case <-webhookServeError:
117-
return
118-
}
119-
}
12060

121-
func servePods(config *options.Config) {
122-
admController := &admission.Controller{
123-
VcClients: admission.VolcanoClientSet,
124-
SchedulerName: config.SchedulerName,
61+
if err := app.Run(config); err != nil {
62+
fmt.Fprintf(os.Stderr, "%v\n", err)
63+
os.Exit(1)
12564
}
126-
http.HandleFunc(admission.AdmitPodPath, admController.ServerPods)
127-
128-
return
12965
}

hack/.golint_failures

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
volcano.sh/volcano/pkg/admission/jobs/mutate
2+
volcano.sh/volcano/pkg/admission/router
3+
volcano.sh/volcano/pkg/admission/schema
14
volcano.sh/volcano/pkg/apis/scheduling
25
volcano.sh/volcano/pkg/apis/scheduling/v1alpha1
36
volcano.sh/volcano/pkg/apis/scheduling/v1alpha2
Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"encoding/json"
@@ -25,6 +25,9 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/klog"
2727

28+
"volcano.sh/volcano/pkg/admission/router"
29+
"volcano.sh/volcano/pkg/admission/schema"
30+
"volcano.sh/volcano/pkg/admission/util"
2831
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
2932
)
3033

@@ -33,6 +36,11 @@ const (
3336
DefaultQueue = "default"
3437
)
3538

39+
var Service = &router.AdmissionService{
40+
Path: "/mutating-jobs",
41+
Func: MutateJobs,
42+
}
43+
3644
type patchOperation struct {
3745
Op string `json:"op"`
3846
Path string `json:"path"`
@@ -43,9 +51,9 @@ type patchOperation struct {
4351
func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
4452
klog.V(3).Infof("mutating jobs")
4553

46-
job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
54+
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
4755
if err != nil {
48-
return ToAdmissionResponse(err)
56+
return util.ToAdmissionResponse(err)
4957
}
5058

5159
reviewResponse := v1beta1.AdmissionResponse{}
@@ -58,7 +66,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5866
break
5967
default:
6068
err = fmt.Errorf("expect operation to be 'CREATE' ")
61-
return ToAdmissionResponse(err)
69+
return util.ToAdmissionResponse(err)
6270
}
6371

6472
if err != nil {
@@ -73,7 +81,7 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7381
return &reviewResponse
7482
}
7583

76-
func createPatch(job v1alpha1.Job) ([]byte, error) {
84+
func createPatch(job *v1alpha1.Job) ([]byte, error) {
7785
var patch []patchOperation
7886
pathQueue := patchDefaultQueue(job)
7987
if pathQueue != nil {
@@ -86,7 +94,7 @@ func createPatch(job v1alpha1.Job) ([]byte, error) {
8694
return json.Marshal(patch)
8795
}
8896

89-
func patchDefaultQueue(job v1alpha1.Job) *patchOperation {
97+
func patchDefaultQueue(job *v1alpha1.Job) *patchOperation {
9098
//Add default queue if not specified.
9199
if job.Spec.Queue == "" {
92100
return &patchOperation{Op: "add", Path: "/spec/queue", Value: DefaultQueue}

pkg/admission/mutate_job_test.go renamed to pkg/admission/jobs/mutate/mutate_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package mutate
1818

1919
import (
2020
"testing"
Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package admission
17+
package validate
1818

1919
import (
2020
"fmt"
@@ -30,23 +30,33 @@ import (
3030
k8scorev1 "k8s.io/kubernetes/pkg/apis/core/v1"
3131
k8scorevalid "k8s.io/kubernetes/pkg/apis/core/validation"
3232

33+
"volcano.sh/volcano/pkg/admission/router"
34+
"volcano.sh/volcano/pkg/admission/schema"
35+
"volcano.sh/volcano/pkg/admission/util"
3336
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
34-
vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
3537
"volcano.sh/volcano/pkg/controllers/job/plugins"
3638
)
3739

38-
// VolcanoClientSet is volcano clientset
39-
// TODO: make it as package local var.
40-
var VolcanoClientSet vcclientset.Interface
40+
func init() {
41+
router.RegisterAdmission(service)
42+
}
43+
44+
var service = &router.AdmissionService{
45+
Path: "/jobs",
46+
Func: AdmitJobs,
47+
48+
Config: config,
49+
}
50+
51+
var config = &router.AdmissionServiceConfig{}
4152

4253
// AdmitJobs is to admit jobs and return response
4354
func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
44-
4555
klog.V(3).Infof("admitting jobs -- %s", ar.Request.Operation)
4656

47-
job, err := DecodeJob(ar.Request.Object, ar.Request.Resource)
57+
job, err := schema.DecodeJob(ar.Request.Object, ar.Request.Resource)
4858
if err != nil {
49-
return ToAdmissionResponse(err)
59+
return util.ToAdmissionResponse(err)
5060
}
5161
var msg string
5262
reviewResponse := v1beta1.AdmissionResponse{}
@@ -57,14 +67,14 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
5767
msg = validateJob(job, &reviewResponse)
5868
break
5969
case v1beta1.Update:
60-
_, err := DecodeJob(ar.Request.OldObject, ar.Request.Resource)
70+
_, err := schema.DecodeJob(ar.Request.OldObject, ar.Request.Resource)
6171
if err != nil {
62-
return ToAdmissionResponse(err)
72+
return util.ToAdmissionResponse(err)
6373
}
6474
break
6575
default:
6676
err := fmt.Errorf("expect operation to be 'CREATE' or 'UPDATE'")
67-
return ToAdmissionResponse(err)
77+
return util.ToAdmissionResponse(err)
6878
}
6979

7080
if !reviewResponse.Allowed {
@@ -73,8 +83,7 @@ func AdmitJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
7383
return &reviewResponse
7484
}
7585

76-
func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {
77-
86+
func validateJob(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) string {
7887
var msg string
7988
taskNames := map[string]string{}
8089
var totalReplicas int32
@@ -151,9 +160,9 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
151160
}
152161

153162
// Check whether Queue already present or not
154-
if _, err := VolcanoClientSet.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
163+
if _, err := config.VolcanoClient.SchedulingV1alpha2().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
155164
// TODO: deprecate v1alpha1
156-
if _, err := VolcanoClientSet.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
165+
if _, err := config.VolcanoClient.SchedulingV1alpha1().Queues().Get(job.Spec.Queue, metav1.GetOptions{}); err != nil {
157166
msg = msg + fmt.Sprintf(" unable to find job queue: %v", err)
158167
}
159168
}
@@ -165,7 +174,7 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
165174
return msg
166175
}
167176

168-
func validateTaskTemplate(task v1alpha1.TaskSpec, job v1alpha1.Job, index int) string {
177+
func validateTaskTemplate(task v1alpha1.TaskSpec, job *v1alpha1.Job, index int) string {
169178
var v1PodTemplate v1.PodTemplate
170179
v1PodTemplate.Template = *task.Template.DeepCopy()
171180
k8scorev1.SetObjectDefaults_PodTemplate(&v1PodTemplate)

0 commit comments

Comments
 (0)