Skip to content

Commit f45671d

Browse files
committed
add checker and fix lease create bug
1 parent ebf9586 commit f45671d

File tree

16 files changed

+110
-32
lines changed

16 files changed

+110
-32
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ The design architecture of this project is based on [openkruise/controllermesh](
1616
3. **Circuit breaker and rate limiter**: Not only Kubernetes operation requests, but also other external operation requests.
1717
4. **Multicluster routing and sharding**: This feature is supported by [kusionstack/kaera(karbour)]()
1818

19-
<p align="center"><img width="800" src="./docs/img/img2.png"/></p>
19+
<p align="center"><img width="800" src="./docs/img/img4.png"/></p>
2020

2121
## Quick Start
2222
Visit [Quick Start]().

artifacts/images/manager.Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Build the manager binary
2-
FROM golang:1.19 as builder
2+
FROM golang:1.20 as builder
33

44
WORKDIR /workspace
55

artifacts/images/proxy.Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.19 as builder
1+
FROM golang:1.20 as builder
22

33
WORKDIR /workspace
44

@@ -33,6 +33,6 @@ RUN useradd -m --uid 1359 kridge-proxy && \
3333
echo "kridge-proxy ALL=NOPASSWD: ALL" >> /etc/sudoers
3434
WORKDIR /
3535
COPY artifacts/scripts/proxy-poststart.sh /poststart.sh
36-
36+
RUN mkdir /kridge && chmod 777 /kridge
3737
COPY --from=builder /workspace/kridge-proxy .
3838
ENTRYPOINT ["/kridge-proxy"]

docs/installation.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
2+
## Installation
3+
4+
**Install by helm**
5+
```shell
6+
# Firstly add charts repository if you haven't do this.
7+
$ helm repo add kusionstack https://kusionstack.github.io/charts/
8+
9+
# [Optional]
10+
$ helm repo update
11+
12+
# Install the latest version.
13+
$ helm install kridge kusionstack/kridge --version 0.0.x
14+
15+
# Uninstall
16+
$ helm uninstall kridge
17+
```
18+

e2e/customoperator/app/controller/pod_recorder.go

+15-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
v1 "k8s.io/api/core/v1"
2525
"k8s.io/client-go/util/workqueue"
26+
"k8s.io/klog/v2"
2627
ctrl "sigs.k8s.io/controller-runtime"
2728
"sigs.k8s.io/controller-runtime/pkg/client"
2829
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -37,6 +38,7 @@ var (
3738
// ManagerStateReconciler reconciles a ManagerState object
3839
type PodReconciler struct {
3940
client.Client
41+
DirectorClient client.Client
4042
}
4143

4244
func (r *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, err error) {
@@ -50,8 +52,8 @@ func (r *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
5052
// default handler.EnqueueRequestForObject
5153
return ctrl.NewControllerManagedBy(mgr).
5254
For(&v1.Pod{}).
53-
Watches(&source.Kind{Type: &v1.Pod{}}, &enqueueHandler{Client: r.Client, kind: "Pod"}).
54-
Watches(&source.Kind{Type: &v1.ConfigMap{}}, &enqueueHandler{Client: r.Client, kind: "ConfigMap"}).
55+
Watches(&source.Kind{Type: &v1.Pod{}}, &enqueueHandler{Client: r.DirectorClient, kind: "Pod"}).
56+
Watches(&source.Kind{Type: &v1.ConfigMap{}}, &enqueueHandler{Client: r.DirectorClient, kind: "ConfigMap"}).
5557
Complete(r)
5658
}
5759

@@ -61,17 +63,24 @@ type enqueueHandler struct {
6163
}
6264

6365
func (e *enqueueHandler) Create(event event.CreateEvent, q workqueue.RateLimitingInterface) {
64-
add(e.Client, event.Object.GetNamespace(), e.kind)
66+
Add(e.Client, event.Object.GetNamespace(), event.Object.GetName(), e.kind)
6567
}
6668

6769
func (e *enqueueHandler) Update(event event.UpdateEvent, q workqueue.RateLimitingInterface) {
68-
add(e.Client, event.ObjectNew.GetNamespace(), e.kind)
70+
Add(e.Client, event.ObjectNew.GetNamespace(), event.ObjectNew.GetName(), e.kind)
6971
}
7072

7173
func (e *enqueueHandler) Delete(event event.DeleteEvent, q workqueue.RateLimitingInterface) {
72-
add(e.Client, event.Object.GetNamespace(), e.kind)
74+
Add(e.Client, event.Object.GetNamespace(), event.Object.GetName(), e.kind)
7375
}
7476

7577
func (e *enqueueHandler) Generic(event event.GenericEvent, q workqueue.RateLimitingInterface) {
76-
add(e.Client, event.Object.GetNamespace(), e.kind)
78+
Add(e.Client, event.Object.GetNamespace(), event.Object.GetName(), e.kind)
79+
}
80+
81+
func Add(c client.Client, namespace, name, kind string) {
82+
klog.Infof("handle event %s %s/%s", kind, namespace, name)
83+
if err := add(c, namespace, kind); err != nil {
84+
klog.Errorf("fail to record event %s %s/%s, %v", kind, namespace, name, err)
85+
}
7786
}

e2e/customoperator/app/controller/util.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"time"
99

1010
v1 "k8s.io/api/core/v1"
11-
"k8s.io/apimachinery/pkg/api/errors"
1211
"k8s.io/apimachinery/pkg/types"
1312
"k8s.io/apimachinery/pkg/util/sets"
13+
"k8s.io/apimachinery/pkg/util/wait"
1414
"k8s.io/client-go/util/retry"
1515
"k8s.io/klog/v2"
1616
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -25,21 +25,29 @@ func initConfigMap(c client.Client) *v1.ConfigMap {
2525
cm := &v1.ConfigMap{}
2626
cm.Name = cmName
2727
cm.Namespace = podNamespace
28-
c.Create(context.TODO(), cm)
28+
cm.Data = map[string]string{}
29+
if err := c.Create(context.TODO(), cm); err != nil {
30+
klog.Errorf("Failed to create cm %v", err)
31+
}
2932
return cm
3033
}
3134

35+
var Retry = wait.Backoff{
36+
Steps: 50,
37+
Duration: 10 * time.Millisecond,
38+
Factor: 1.0,
39+
Jitter: 0.1,
40+
}
41+
3242
func add(c client.Client, namespace, kind string) error {
3343
if namespace == "" || kind == "" {
3444
return nil
3545
}
3646
key := podName
37-
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
47+
return retry.RetryOnConflict(Retry, func() error {
3848
cm := &v1.ConfigMap{}
3949
if err := c.Get(context.TODO(), types.NamespacedName{Name: cmName, Namespace: podNamespace}, cm); err != nil {
40-
if !errors.IsNotFound(err) {
41-
return err
42-
}
50+
klog.Errorf("Failed to get cm %v, try init", err)
4351
cm = initConfigMap(c)
4452
}
4553
if cm.Data == nil {
@@ -78,7 +86,7 @@ func Checker(ctx context.Context, c client.Client) {
7886
}
7987

8088
if err := c.Get(context.TODO(), types.NamespacedName{Name: cmName, Namespace: podNamespace}, cm); err != nil {
81-
klog.Errorf("fail to get configMap %s", cmName)
89+
klog.Errorf("fail to get configMap %s, %v", cmName, err)
8290
continue
8391
}
8492
if cm.Data == nil {

e2e/customoperator/app/main.go

+19-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package main
1818

1919
import (
2020
"flag"
21+
"fmt"
2122
"math/rand"
2223
_ "net/http/pprof"
2324
"os"
@@ -28,10 +29,12 @@ import (
2829
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2930
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3031
_ "k8s.io/client-go/plugin/pkg/client/auth"
32+
"k8s.io/client-go/rest"
3133
"k8s.io/client-go/tools/leaderelection/resourcelock"
3234
"k8s.io/klog/v2"
3335
"k8s.io/klog/v2/klogr"
3436
ctrl "sigs.k8s.io/controller-runtime"
37+
"sigs.k8s.io/controller-runtime/pkg/client"
3538
"sigs.k8s.io/controller-runtime/pkg/healthz"
3639
"sigs.k8s.io/controller-runtime/pkg/manager"
3740

@@ -85,11 +88,12 @@ func main() {
8588
os.Exit(1)
8689
}
8790

88-
go func() {
89-
setupLog.Info("wait webhook ready")
91+
directorClient := NewDirectorClientFromManager(mgr, "checker")
9092

93+
go func() {
9194
if err = (&appcontroller.PodReconciler{
92-
Client: mgr.GetClient(),
95+
Client: mgr.GetClient(),
96+
DirectorClient: directorClient,
9397
}).SetupWithManager(mgr); err != nil {
9498
setupLog.Error(err, "unable to create controller", "controller", "Recorder")
9599
os.Exit(1)
@@ -98,11 +102,22 @@ func main() {
98102

99103
go func() {
100104
<-time.After(20 * time.Second)
101-
appcontroller.Checker(ctx, mgr.GetClient())
105+
appcontroller.Checker(ctx, directorClient)
102106
}()
103107
setupLog.Info("starting manager")
104108
if err := mgr.Start(ctx); err != nil {
105109
setupLog.Error(err, "problem running manager")
106110
os.Exit(1)
107111
}
108112
}
113+
114+
func NewDirectorClientFromManager(mgr manager.Manager, name string) client.Client {
115+
cfg := rest.CopyConfig(mgr.GetConfig())
116+
cfg.UserAgent = fmt.Sprintf("director-client/%s", name)
117+
118+
c, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
119+
if err != nil {
120+
panic(err)
121+
}
122+
return c
123+
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/KusionStack/kridge
22

3-
go 1.19
3+
go 1.20
44

55
require (
66
github.com/go-logr/zapr v1.2.3

pkg/cmd/proxy/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ func main() {
8282
{
8383
opts := proxyapiserver.NewOptions()
8484
opts.Config = rest.CopyConfig(cfg)
85+
// Certs generated by proxy-init.sh
8586
opts.SecureServingOptions.ServerCert.CertKey.KeyFile = "/var/run/secrets/kubernetes.io/serviceaccount/kridge/tls.key"
8687
opts.SecureServingOptions.ServerCert.CertKey.CertFile = "/var/run/secrets/kubernetes.io/serviceaccount/kridge/tls.crt"
8788
opts.SecureServingOptions.BindAddress = net.ParseIP("127.0.0.1")

pkg/manager/controllers/patchrunnable/labelpatch_runnable.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func (p *PatchRunnable) Start(ctx context.Context) error {
140140
klog.Errorf("fail to load group version kind, %v", err)
141141
return err
142142
}
143+
klog.Infof("load group version kind %s", util.DumpJSON(gvks))
143144
p.groupVersionKinds = gvks
144145
p.invalid = sets.Set[string]{}
145146

@@ -170,6 +171,7 @@ func (p *PatchRunnable) Start(ctx context.Context) error {
170171
}()
171172
}
172173
wg.Wait()
174+
klog.Infof("finished patching label")
173175
return nil
174176
}
175177

@@ -191,7 +193,14 @@ func (p *PatchRunnable) getAllResources(namespaceList *v1.NamespaceList, itemCh
191193
}()
192194
}
193195
wg.Wait()
194-
close(itemCh)
196+
defer klog.Infof("finish collect resources")
197+
for {
198+
if len(itemCh) == 0 {
199+
close(itemCh)
200+
return
201+
}
202+
<-time.After(5 * time.Second)
203+
}
195204
}
196205

197206
func (p *PatchRunnable) getResourceInNamespace(namespace *v1.Namespace, itemCh chan<- patchItem) {

pkg/manager/controllers/patchrunnable/resource.go

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func LoadGroupVersionKind(c client.Client, discoveryClient *discovery.DiscoveryC
4949
cm := &v1.ConfigMap{}
5050
if err := c.Get(context.TODO(), types.NamespacedName{Name: *configMapName, Namespace: configMapNamespace}, cm); err != nil {
5151
if errors.IsNotFound(err) {
52+
klog.Infof("ConfigMap %s/%s not found", configMapNamespace, *configMapName)
5253
return result, nil
5354
}
5455
return result, err

pkg/proxy/apiserver/handler.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
var (
5252
upgradeSubresources = sets.NewString("exec", "attach")
5353
enableIpTable = os.Getenv(constants.EnvIPTable) == "true"
54+
enableWebhookProxy = os.Getenv(constants.EnvEnableWebHookProxy) == "true"
5455
)
5556

5657
type Proxy struct {
@@ -62,8 +63,10 @@ type Proxy struct {
6263

6364
func NewProxy(opts *Options) (*Proxy, error) {
6465
var servingInfo *server.SecureServingInfo
65-
if err := opts.ApplyTo(&servingInfo); err != nil {
66-
return nil, fmt.Errorf("error apply options %s: %v", utils.DumpJSON(opts), err)
66+
if enableIpTable {
67+
if err := opts.ApplyTo(&servingInfo); err != nil {
68+
return nil, fmt.Errorf("error apply options %s: %v", utils.DumpJSON(opts), err)
69+
}
6770
}
6871

6972
tp, err := rest.TransportFor(opts.Config)
@@ -131,10 +134,15 @@ type handler struct {
131134
electionHandler leaderelection.Handler
132135
}
133136

137+
func getReqInfoStr(r *apirequest.RequestInfo) string {
138+
return fmt.Sprintf("RequestInfo: { Path: %s, APIGroup: %s, Resource: %s, Subresource: %s, Verb: %s, Namespace: %s, Name: %s, APIVersion: %s }", r.Path, r.APIGroup, r.Resource, r.Subresource, r.Verb, r.Namespace, r.Name, r.APIVersion)
139+
}
140+
134141
func (h *handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
135142
startTime := time.Now()
136-
137143
requestInfo, ok := apirequest.RequestInfoFrom(r.Context())
144+
klog.Infof("handle http req %s", r.URL.String())
145+
klog.Infof(getReqInfoStr(requestInfo))
138146
if !ok {
139147
klog.Errorf("%s %s %s, no request info in context", r.Method, r.Header.Get("Content-Type"), r.URL)
140148
http.Error(rw, "no request info in context", http.StatusBadRequest)

pkg/proxy/leaderelection/handler.go

+1
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ func (h *handler) Handle(req *request.RequestInfo, r *http.Request) (handled boo
9393
}
9494

9595
if adp.GetName() != h.lockName {
96+
adp.EncodeInto(r)
9697
return false, nil, nil
9798
}
9899

pkg/proxy/proto/strorage.go

+6
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ type storage struct {
4040
currentSpecFile *os.File
4141
}
4242

43+
func init() {
44+
if err := os.MkdirAll("/home/kridge-proxy/kridge", 0777); err != nil {
45+
klog.Error(err)
46+
}
47+
}
48+
4349
func newStorage() (*storage, error) {
4450
var err error
4551
s := &storage{}

pkg/utils/http/reverse_proxy.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -429,7 +429,9 @@ func (p *ReverseProxy) flushInterval(req *http.Request, res *http.Response) time
429429
if resCT == "text/event-stream" {
430430
return -1 // negative means immediately
431431
}
432-
432+
if res.ContentLength == -1 {
433+
return -1
434+
}
433435
// TODO: more specific cases? e.g. res.ContentLength == -1?
434436
return p.FlushInterval
435437
}

pkg/webhook/pod/injector.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@ func (h *MutatingHandler) injectByShardingConfig(ctx context.Context, pod *v1.Po
142142
Name: constants.ProxyContainerName,
143143
Image: *proxyImage,
144144
ImagePullPolicy: imagePullPolicy,
145-
Args: []string{
146-
"--v=" + strconv.Itoa(int(*proxyLogLevel)),
145+
Args: []string{
146+
//"--v=" + strconv.Itoa(int(*proxyLogLevel)),
147147
},
148148
Env: []v1.EnvVar{
149149
{Name: constants.EnvPodName, ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.name"}}},
@@ -172,9 +172,9 @@ func (h *MutatingHandler) injectByShardingConfig(ctx context.Context, pod *v1.Po
172172
},
173173
},
174174
SecurityContext: &v1.SecurityContext{
175-
Privileged: utilpointer.Bool(true), // This can be false, but true help us debug more easier.
176-
ReadOnlyRootFilesystem: utilpointer.Bool(true),
177-
RunAsUser: utilpointer.Int64(int64(constants.ProxyUserID)),
175+
Privileged: utilpointer.Bool(true), // This can be false, but true help us debug more easier.
176+
//ReadOnlyRootFilesystem: utilpointer.Bool(true),
177+
RunAsUser: utilpointer.Int64(int64(constants.ProxyUserID)),
178178
},
179179
}
180180

0 commit comments

Comments
 (0)