Skip to content

Commit 6fec6e4

Browse files
committed
enable circuitbreaker and fix ut
1 parent bb35637 commit 6fec6e4

File tree

14 files changed

+444
-121
lines changed

14 files changed

+444
-121
lines changed

config/crd/bases/ctrlmesh.kusionstack.io_circuitbreakers.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ spec:
123123
type: array
124124
required:
125125
- apiGroups
126-
- namespaces
127126
- resources
128127
- verbs
129128
type: object
@@ -155,7 +154,6 @@ spec:
155154
required:
156155
- bucket
157156
- name
158-
- recoverPolicy
159157
- triggerPolicy
160158
type: object
161159
type: array

pkg/apis/ctrlmesh/utils/conv/conv.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,17 @@ func ConvertLimiting(limit *ctrlmeshv1alpha1.Limiting) *ctrlmeshproto.RateLimiti
103103
case ctrlmeshv1alpha1.TriggerPolicyForceOpened:
104104
protoLimit.TriggerPolicy = ctrlmeshproto.RateLimiting_TRIGGER_POLICY_FORCE_OPENED
105105
}
106-
protoLimit.RecoverPolicy = &ctrlmeshproto.RateLimiting_RecoverPolicy{}
107-
switch limit.RecoverPolicy.RecoverType {
108-
case ctrlmeshv1alpha1.RecoverPolicyManual:
109-
protoLimit.RecoverPolicy.Type = ctrlmeshproto.RateLimiting_RECOVER_POLICY_MANUAL
110-
case ctrlmeshv1alpha1.RecoverPolicySleepingWindow:
111-
protoLimit.RecoverPolicy.Type = ctrlmeshproto.RateLimiting_RECOVER_POLICY_SLEEPING_WINDOW
112-
}
113-
if limit.RecoverPolicy.SleepingWindowSize != nil {
114-
protoLimit.RecoverPolicy.SleepingWindowSize = *limit.RecoverPolicy.SleepingWindowSize
106+
if limit.RecoverPolicy != nil {
107+
protoLimit.RecoverPolicy = &ctrlmeshproto.RateLimiting_RecoverPolicy{}
108+
switch limit.RecoverPolicy.RecoverType {
109+
case ctrlmeshv1alpha1.RecoverPolicyManual:
110+
protoLimit.RecoverPolicy.Type = ctrlmeshproto.RateLimiting_RECOVER_POLICY_MANUAL
111+
case ctrlmeshv1alpha1.RecoverPolicySleepingWindow:
112+
protoLimit.RecoverPolicy.Type = ctrlmeshproto.RateLimiting_RECOVER_POLICY_SLEEPING_WINDOW
113+
}
114+
if limit.RecoverPolicy.SleepingWindowSize != nil {
115+
protoLimit.RecoverPolicy.SleepingWindowSize = *limit.RecoverPolicy.SleepingWindowSize
116+
}
115117
}
116118
if limit.Properties != nil {
117119
protoLimit.Properties = make(map[string]string, len(limit.Properties))

pkg/apis/ctrlmesh/v1alpha1/throttlingconfig_types.go

+7-26
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type ResourceRule struct {
3131
// Verb is a list of kubernetes resource API verbs, like: get, list, watch, create, update, delete, proxy. "*" means all.
3232
Verbs []string `json:"verbs"`
3333
// Namespaces is a list of namespaces the rule applies to. "*" means all.
34-
Namespaces []string `json:"namespaces"`
34+
Namespaces []string `json:"namespaces,omitempty"`
3535
}
3636

3737
// RestRule defines the target rest resource of the limiting policy
@@ -45,7 +45,7 @@ type RestRule struct {
4545
// TriggerPolicy defines how the circuit-breaking policy triggered from 'Closed' to 'Opened'
4646
type TriggerPolicy string
4747

48-
var (
48+
const (
4949
TriggerPolicyNormal TriggerPolicy = "Normal"
5050
TriggerPolicyLimiterOnly TriggerPolicy = "LimiterOnly"
5151
TriggerPolicyForceOpened TriggerPolicy = "ForceOpened"
@@ -60,33 +60,27 @@ type RecoverPolicy struct {
6060

6161
type RecoverType string
6262

63-
var (
63+
const (
6464
RecoverPolicyManual RecoverType = "Manual"
6565
RecoverPolicySleepingWindow RecoverType = "SleepingWindow"
6666
)
6767

6868
// InterceptType defines how the circuit-breaking traffic intercept from 'White' to 'Black'
6969
type InterceptType string
7070

71-
var (
71+
const (
7272
InterceptTypeWhitelist InterceptType = "Whitelist"
7373
InterceptTypeBlacklist InterceptType = "Blacklist"
7474
)
7575

7676
// ContentType defines how the circuit-breaking traffic intercept content type from 'Normal' to 'Regexp'
7777
type ContentType string
7878

79-
var (
79+
const (
8080
ContentTypeNormal ContentType = "Normal"
8181
ContentTypeRegexp ContentType = "Regexp"
8282
)
8383

84-
type ValidatePolicy string
85-
86-
var (
87-
AfterHttpSuccess ValidatePolicy = "AfterHttpSuccess"
88-
)
89-
9084
// Bucket defines the whole token bucket of the policy
9185
type Bucket struct {
9286
// Burst is the max token number of the bucket
@@ -110,7 +104,7 @@ type Limiting struct {
110104
// TriggerPolicy defines how the circuit-breaking policy triggered from 'Closed' to 'Opened'
111105
TriggerPolicy TriggerPolicy `json:"triggerPolicy"`
112106
// RecoverPolicy defines how the circuit-breaking policy recovered from 'Opened' to 'Closed'
113-
RecoverPolicy RecoverPolicy `json:"recoverPolicy"`
107+
RecoverPolicy *RecoverPolicy `json:"recoverPolicy,omitempty"`
114108
// ValidatePolicy determine the opportunity to validate req
115109
//ValidatePolicy ValidatePolicy `json:"validatePolicy,omitempty"`
116110
// Properties defines the additional properties of the policy, like: SleepingWindowSize
@@ -162,7 +156,7 @@ type LimitingSnapshot struct {
162156
// CircuitBreakerStatus defines the observed state of CircuitBreaker
163157
type CircuitBreakerStatus struct {
164158
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
165-
LastUpdatedTime metav1.Time `json:"lastUpdatedTime,omitempty"`
159+
LastUpdatedTime *metav1.Time `json:"lastUpdatedTime,omitempty"`
166160
CurrentSpecHash string `json:"currentSpecHash,omitempty"`
167161
TargetStatus []*TargetStatus `json:"targetStatus,omitempty"`
168162
}
@@ -202,16 +196,3 @@ type CircuitBreakerList struct {
202196
func init() {
203197
SchemeBuilder.Register(&CircuitBreaker{}, &CircuitBreakerList{})
204198
}
205-
206-
//func (in *CircuitBreakerStatus) UpdateSnapshots(podIp string, snapshot []*LimitingSnapshot) {
207-
// var newSnapshot []*LimitingSnapshot
208-
// for _, sp := range snapshot {
209-
// newSnapshot = append(newSnapshot, sp.DeepCopy())
210-
// }
211-
// for _, sp := range in.LimitingSnapshots {
212-
// if sp.Endpoint != podIp {
213-
// newSnapshot = append(newSnapshot, sp.DeepCopy())
214-
// }
215-
// }
216-
// in.LimitingSnapshots = newSnapshot
217-
//}

pkg/apis/ctrlmesh/v1alpha1/zz_generated.deepcopy.go

+9-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/cmd/manager/main.go

+7
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/KusionStack/controller-mesh/pkg/cachelimiter"
4040
"github.com/KusionStack/controller-mesh/pkg/client"
4141
"github.com/KusionStack/controller-mesh/pkg/grpcregistry"
42+
"github.com/KusionStack/controller-mesh/pkg/manager/controllers/circuitbreaker"
4243
"github.com/KusionStack/controller-mesh/pkg/manager/controllers/managerstate"
4344
"github.com/KusionStack/controller-mesh/pkg/manager/controllers/patchrunnable"
4445
"github.com/KusionStack/controller-mesh/pkg/manager/controllers/shardingconfigserver"
@@ -154,6 +155,12 @@ func main() {
154155
setupLog.Error(err, "unable to create controller", "controller", "ManagerState")
155156
os.Exit(1)
156157
}
158+
if err = (&circuitbreaker.CircuitBreakerReconciler{
159+
Client: mgr.GetClient(),
160+
}).SetupWithManager(mgr); err != nil {
161+
setupLog.Error(err, "unable to create controller", "controller", "CircuitBreaker")
162+
os.Exit(1)
163+
}
157164

158165
if err = patchrunnable.SetupWithManager(mgr); err != nil {
159166
setupLog.Error(err, "unable to create runnable", "runnable", "PatchRunnable")

pkg/manager/controllers/circuitbreaker/cache.go

+96-63
Original file line numberDiff line numberDiff line change
@@ -17,89 +17,122 @@ limitations under the License.
1717
package circuitbreaker
1818

1919
import (
20+
"context"
2021
"sync"
22+
"time"
23+
24+
v1 "k8s.io/api/core/v1"
25+
"k8s.io/apimachinery/pkg/types"
26+
"k8s.io/apimachinery/pkg/util/sets"
27+
"k8s.io/client-go/util/workqueue"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
30+
"github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/proto"
2131
)
2232

2333
var (
24-
defaultPodCache = &podCache{
25-
currentHash: make(map[string]string),
34+
defaultPodConfigCache = &podCache{
35+
currentHash: make(map[string]sets.Set[string]),
2636
}
27-
//defaultHashCache = &hashCache{
28-
// store: make(map[string]sets.String),
29-
//}
3037
)
3138

3239
type podCache struct {
33-
currentHash map[string]string
40+
currentHash map[string]sets.Set[string]
3441
mu sync.RWMutex
3542
}
3643

37-
func cacheKey(namespace, podName, cbName string) string {
38-
return namespace + "/" + podName + "/" + cbName
44+
func cacheKey(namespace, podName string) string {
45+
return namespace + "/" + podName
3946
}
4047

41-
func (c *podCache) Get(key string) string {
42-
c.mu.RLock()
43-
defer c.mu.RUnlock()
44-
return c.currentHash[key]
48+
func (c *podCache) Has(namespace, podName, cbHash string) bool {
49+
key := cacheKey(namespace, podName)
50+
configs, ok := c.currentHash[key]
51+
if !ok {
52+
return false
53+
}
54+
return configs.Has(cbHash)
4555
}
4656

47-
func (c *podCache) Update(key string, hash string) {
57+
func (c *podCache) Add(namespace, podName, cbHash string) {
4858
c.mu.Lock()
4959
defer c.mu.Unlock()
50-
c.currentHash[key] = hash
60+
key := cacheKey(namespace, podName)
61+
_, ok := c.currentHash[key]
62+
if !ok {
63+
c.currentHash[key] = sets.New[string](cbHash)
64+
return
65+
}
66+
c.currentHash[key].Insert(cbHash)
5167
}
5268

53-
func (c *podCache) Delete(key string) {
69+
func (c *podCache) Delete(namespace, podName string, cbHash ...string) {
5470
c.mu.Lock()
5571
defer c.mu.Unlock()
56-
delete(c.currentHash, key)
72+
key := cacheKey(namespace, podName)
73+
if len(cbHash) == 0 {
74+
delete(c.currentHash, key)
75+
return
76+
}
77+
_, ok := c.currentHash[key]
78+
if !ok {
79+
return
80+
}
81+
c.currentHash[key].Delete(cbHash...)
82+
}
83+
84+
func newDeleteProcessor(c client.Client, ctx context.Context) *deleteProcessor {
85+
processor := &deleteProcessor{
86+
q: workqueue.NewNamedDelayingQueue("delete-processor"),
87+
ctx: ctx,
88+
processFunc: func(item deleteItem) error {
89+
po := &v1.Pod{}
90+
if err := c.Get(ctx, types.NamespacedName{
91+
Namespace: item.Namespace,
92+
Name: item.PodName,
93+
}, po); err != nil {
94+
return client.IgnoreNotFound(err)
95+
}
96+
if po.DeletionTimestamp != nil {
97+
return nil
98+
}
99+
return deletePodConfig(ctx, &proto.CircuitBreaker{Name: item.ConfigName}, po.Status.PodIP)
100+
},
101+
}
102+
go processor.processLoop()
103+
return processor
104+
}
105+
106+
type deleteProcessor struct {
107+
q workqueue.DelayingInterface
108+
processFunc func(deleteItem) error
109+
ctx context.Context
110+
}
111+
112+
func (d *deleteProcessor) Add(namespace, podName, config string) {
113+
d.q.Add(deleteItem{Namespace: namespace, PodName: podName, ConfigName: config})
114+
}
115+
116+
func (d *deleteProcessor) processLoop() {
117+
go func() {
118+
for {
119+
item, shutdown := d.q.Get()
120+
if shutdown {
121+
break
122+
}
123+
val := item.(deleteItem)
124+
if err := d.processFunc(val); err != nil {
125+
d.q.AddAfter(item, 5*time.Second)
126+
}
127+
d.q.Done(item)
128+
}
129+
}()
130+
<-d.ctx.Done()
131+
d.q.ShutDown()
57132
}
58133

59-
//
60-
//type hashCache struct {
61-
// store map[string]sets.String
62-
// mu sync.RWMutex
63-
//}
64-
//
65-
//func (h *hashCache) Get(hash string) sets.String {
66-
// h.mu.RLock()
67-
// defer h.mu.RUnlock()
68-
// val, ok := h.store[hash]
69-
// if !ok {
70-
// return sets.NewString()
71-
// }
72-
// return val.Clone()
73-
//}
74-
//
75-
//func (h *hashCache) Add(hash string, items ...string) {
76-
// h.mu.Lock()
77-
// defer h.mu.Unlock()
78-
// val, ok := h.store[hash]
79-
// if !ok {
80-
// h.store[hash] = sets.NewString(items...)
81-
// } else {
82-
// val.Insert(items...)
83-
// }
84-
//}
85-
//
86-
//func (h *hashCache) Replace(hash string, items ...string) {
87-
// h.mu.Lock()
88-
// defer h.mu.Unlock()
89-
// h.store[hash] = sets.NewString(items...)
90-
//}
91-
//
92-
//func (h *hashCache) DeleteHash(hash string) {
93-
// h.mu.Lock()
94-
// defer h.mu.Unlock()
95-
// delete(h.store, hash)
96-
//}
97-
//
98-
//func (h *hashCache) DeleteVal(hash string, items ...string) {
99-
// h.mu.Lock()
100-
// defer h.mu.Unlock()
101-
// val, ok := h.store[hash]
102-
// if ok {
103-
// val.Delete(items...)
104-
// }
105-
//}
134+
type deleteItem struct {
135+
Namespace string
136+
PodName string
137+
ConfigName string
138+
}

0 commit comments

Comments
 (0)