Skip to content

Commit 45d386b

Browse files
spring-youngYuyao
authored and
Yuyao
committed
fix:fix circuit and fault store update
1 parent 2a3fa0e commit 45d386b

File tree

5 files changed

+45
-42
lines changed

5 files changed

+45
-42
lines changed

pkg/apis/ctrlmesh/constants/constants.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ const (
6363
EnvEnableSim = "ENABLE_SIM"
6464

6565
EnvDisableCircuitBreaker = "DISABLE_CIRCUIT_BREAKER"
66-
EnvDisableFaultInjection = "DISABLE_FAULT_INJECTION"
66+
EnvEnableFaultInjection = "ENABLE_FAULT_INJECTION"
6767
EnvEnableApiServerCircuitBreaker = "ENABLE_API_SERVER_BREAKER"
6868
EnvEnableRestCircuitBreaker = "ENABLE_REST_BREAKER"
6969
EnvEnableRestFaultInjection = "ENABLE_REST_FAULT_INJECTION"
@@ -77,6 +77,7 @@ func AllProxySyncEnvKey() []string {
7777
EnvIPTable,
7878
EnvEnableWebHookProxy,
7979
EnvDisableCircuitBreaker,
80+
EnvEnableFaultInjection,
8081
EnvEnableApiServerCircuitBreaker,
8182
EnvEnableRestCircuitBreaker,
8283
EnvEnableRestFaultInjection,

pkg/proxy/apiserver/handler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ var (
5353
enableIpTable = os.Getenv(constants.EnvIPTable) == "true"
5454

5555
disableCircuitBreaker = os.Getenv(constants.EnvDisableCircuitBreaker) == "true"
56-
disableFaultInjection = os.Getenv(constants.EnvDisableCircuitBreaker) == "true"
56+
enableFaultInjection = os.Getenv(constants.EnvEnableFaultInjection) == "true"
5757
)
5858

5959
type Proxy struct {
@@ -91,7 +91,7 @@ func NewProxy(opts *Options) (*Proxy, error) {
9191
if opts.BreakerWrapperFunc != nil && !disableCircuitBreaker {
9292
handler = opts.BreakerWrapperFunc(handler)
9393
}
94-
if opts.FaultInjectionWrapperFunc != nil && !disableFaultInjection {
94+
if opts.FaultInjectionWrapperFunc != nil && enableFaultInjection {
9595
handler = opts.FaultInjectionWrapperFunc(handler)
9696
}
9797
handler = genericfilters.WithWaitGroup(handler, opts.LongRunningFunc, opts.HandlerChainWaitGroup)

pkg/proxy/circuitbreaker/manager.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ func (m *manager) Sync(config *ctrlmeshproto.CircuitBreaker) (*ctrlmeshproto.Con
7979
LimitingSnapshot: m.snapshot(config.Name),
8080
}, nil
8181
} else {
82+
if ok{
83+
m.unregisterRules(cb.Name)
84+
}
8285
m.breakerMap[config.Name] = config
8386
m.registerRules(config)
8487
var msg string
@@ -163,9 +166,6 @@ type ValidateResult struct {
163166
// RegisterRules register a circuit breaker to the local limiter store
164167
func (m *manager) registerRules(cb *ctrlmeshproto.CircuitBreaker) {
165168
logger.Info("register rule", "circuit-breaker", cb.Name)
166-
if _, ok := m.breakerMap[cb.Name]; ok {
167-
m.unregisterRules(cb.Name)
168-
}
169169

170170
for _, limiting := range cb.RateLimitings {
171171
key := fmt.Sprintf("%s:%s", cb.Name, limiting.Name)

pkg/proxy/faultinjection/manager.go

+21-25
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"encoding/json"
2222
"errors"
2323
"fmt"
24+
2425
"math/rand"
2526
"net/http"
2627
"net/url"
@@ -30,9 +31,7 @@ import (
3031

3132
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
3233
apirequest "k8s.io/apiserver/pkg/endpoints/request"
33-
logf "sigs.k8s.io/controller-runtime/pkg/log"
34-
35-
// pkgfi "github.com/KusionStack/controller-mesh/circuitbreaker"
34+
"k8s.io/klog/v2"
3635

3736
ctrlmeshproto "github.com/KusionStack/controller-mesh/pkg/apis/ctrlmesh/proto"
3837
"github.com/KusionStack/controller-mesh/pkg/utils"
@@ -41,7 +40,6 @@ import (
4140
const timeLayout = "15:04:05"
4241

4342
var (
44-
logger = logf.Log.WithName("fault-injection-manager")
4543
randNum = rand.New(rand.NewSource(time.Now().UnixNano()))
4644
)
4745

@@ -89,6 +87,9 @@ func (m *manager) Sync(config *ctrlmeshproto.FaultInjection) (*ctrlmeshproto.Fau
8987
Message: fmt.Sprintf("faultInjection spec hash not updated, hash %s", fi.ConfigHash),
9088
}, nil
9189
} else {
90+
if ok{
91+
m.unregisterRules(fi.Name)
92+
}
9293
m.faultInjectionMap[config.Name] = config
9394
m.registerRules(config)
9495
var msg string
@@ -117,16 +118,16 @@ func (m *manager) Sync(config *ctrlmeshproto.FaultInjection) (*ctrlmeshproto.Fau
117118
}, nil
118119
}
119120
case ctrlmeshproto.FaultInjection_CHECK:
120-
cb, ok := m.faultInjectionMap[config.Name]
121+
fi, ok := m.faultInjectionMap[config.Name]
121122
if !ok {
122123
return &ctrlmeshproto.FaultInjectConfigResp{
123124
Success: false,
124-
Message: fmt.Sprintf("fault injection config %s not found", cb.Name),
125+
Message: fmt.Sprintf("fault injection config %s not found", fi.Name),
125126
}, nil
126-
} else if config.ConfigHash != cb.ConfigHash {
127+
} else if config.ConfigHash != fi.ConfigHash {
127128
return &ctrlmeshproto.FaultInjectConfigResp{
128129
Success: false,
129-
Message: fmt.Sprintf("unequal fault injection %s hash, old %s, new %s", cb.Name, cb.ConfigHash, config.ConfigHash),
130+
Message: fmt.Sprintf("unequal fault injection %s hash, old %s, new %s", fi.Name, fi.ConfigHash, config.ConfigHash),
130131
}, nil
131132
}
132133
return &ctrlmeshproto.FaultInjectConfigResp{
@@ -150,17 +151,14 @@ func (m *manager) HandlerWrapper() func(http.Handler) http.Handler {
150151

151152
func (m *manager) FaultInjectionRest(URL string, method string) (result *FaultInjectionResult) {
152153
now := time.Now()
153-
defer func() {
154-
logger.Info("validate rest", "URL", URL, "method", method, "result", result, "cost time", time.Since(now).String())
155-
}()
156-
157154
urls := generateWildcardUrls(URL, method)
158155
for _, url := range urls {
159156
faultInjections, states := m.faultInjectionStore.byIndex(IndexRest, url)
160157
if len(faultInjections) == 0 {
161158
continue
162159
}
163160
result = m.doFaultInjection(faultInjections, states)
161+
klog.Infof("validate rest, URL: %s, method:%s, result: %v, cost time: %v ", URL, method, result, time.Since(now).String())
164162
return result
165163
}
166164
result = &FaultInjectionResult{Abort: false, Reason: "No rule match"}
@@ -173,7 +171,7 @@ func generateWildcardUrls(URL string, method string) []string {
173171
URL = strings.TrimSuffix(URL, "/")
174172
u, err := url.Parse(URL)
175173
if err != nil {
176-
logger.Error(err, "failed to url", "URL", URL, "method", method)
174+
klog.Errorf("failed to url, URL: %s, method: %s,err: %v", URL, method, err)
177175
return result
178176
}
179177
if len(u.Path) > 0 {
@@ -189,16 +187,14 @@ func generateWildcardUrls(URL string, method string) []string {
189187

190188
func (m *manager) FaultInjectionResource(namespace, apiGroup, resource, verb string) (result *FaultInjectionResult) {
191189
now := time.Now()
192-
defer func() {
193-
logger.Info("validate resource", "namespace", namespace, "apiGroup", apiGroup, "resource", resource, "verb", verb, "result", result, "cost time", time.Since(now).String())
194-
}()
195190
seeds := generateWildcardSeeds(namespace, apiGroup, resource, verb)
196191
for _, seed := range seeds {
197192
faultInjections, states := m.faultInjectionStore.byIndex(IndexResource, seed)
198193
if len(faultInjections) == 0 {
199194
continue
200195
}
201196
result = m.doFaultInjection(faultInjections, states)
197+
klog.Infof("validate resource, namespace: %s, apiGroup: %s, resource: %s, verb: %s, result: %v, cost time: %v, ", namespace, apiGroup, resource, verb, result, time.Since(now).String())
202198
return result
203199
}
204200
result = &FaultInjectionResult{Abort: false, Reason: "No rule match"}
@@ -253,8 +249,12 @@ func withFaultInjection(injector FaultInjector, handler http.Handler) http.Handl
253249
if apiErr.Code != http.StatusOK {
254250
w.Header().Set("Content-Type", "application/json")
255251
w.WriteHeader(int(apiErr.Code))
256-
json.NewEncoder(w).Encode(apiErr)
257-
logger.Info("faultinjection rule", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode))
252+
if err := json.NewEncoder(w).Encode(apiErr); err != nil {
253+
// Error encoding the JSON response, at this point the headers are already written.
254+
klog.Errorf("failed to write api error response: %v", err)
255+
return
256+
}
257+
klog.Infof("faultinjection rule: %s", fmt.Sprintf("fault injection, %s, %s,%d", apiErr.Reason, apiErr.Message, apiErr.Code))
258258
return
259259
}
260260
}
@@ -278,7 +278,7 @@ func (m *manager) doFaultInjection(faultInjections []*ctrlmeshproto.HTTPFaultInj
278278
if isInpercentRange(faultInjections[idx].Delay.Percent) {
279279
delay := faultInjections[idx].Delay.GetFixedDelay()
280280
delayDuration := delay.AsDuration()
281-
logger.Info("Delaying time ", "for", delayDuration)
281+
klog.Infof("Delaying time: %v ", delayDuration)
282282
time.Sleep(delayDuration)
283283
}
284284
}
@@ -398,11 +398,7 @@ func isEffectiveTimeRange(timeRange *ctrlmeshproto.EffectiveTimeRange) bool {
398398

399399
// RegisterRules register a fault injection to the local store
400400
func (m *manager) registerRules(fi *ctrlmeshproto.FaultInjection) {
401-
logger.Info("register rule", "faultInjection", fi.Name)
402-
if _, ok := m.faultInjectionMap[fi.Name]; ok {
403-
m.unregisterRules(fi.Name)
404-
}
405-
401+
klog.Infof("register rule, faultInjection: %s", fi.Name)
406402
for _, faultInjection := range fi.HttpFaultInjections {
407403
key := fmt.Sprintf("%s:%s", fi.Name, faultInjection.Name)
408404
m.faultInjectionStore.createOrUpdateRule(
@@ -413,7 +409,7 @@ func (m *manager) registerRules(fi *ctrlmeshproto.FaultInjection) {
413409

414410
// UnregisterRules unregister a fault injection to the local store
415411
func (m *manager) unregisterRules(fiName string) {
416-
logger.Info("unregister rule", "faultInjection", fiName)
412+
klog.Infof("unregister rule, faultInjection: %s", fiName)
417413
fi, ok := m.faultInjectionMap[fiName]
418414
if !ok {
419415
return

pkg/proxy/http/http_proxy.go

+17-11
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,10 @@ import (
3737
)
3838

3939
var (
40-
enableRestBreaker = os.Getenv(constants.EnvEnableRestCircuitBreaker) == "true"
41-
logger = logf.Log.WithName("http-proxy")
40+
enableRestBreaker = os.Getenv(constants.EnvEnableRestCircuitBreaker) == "true"
41+
enableRestFaultInjection = os.Getenv(constants.EnvEnableRestFaultInjection) == "true"
42+
43+
logger = logf.Log.WithName("http-proxy")
4244
)
4345

4446
type ITProxy interface {
@@ -86,16 +88,20 @@ func (t *tproxy) handleHTTP(resp http.ResponseWriter, req *http.Request) {
8688
}
8789
klog.Infof("handel http request, url: %s ", realEndPointUrl.String())
8890
// faultinjection
89-
result := t.FaultInjector.FaultInjectionRest(req.Header.Get(meshhttp.HeaderMeshRealEndpoint), req.Method)
90-
if result.Abort {
91-
apiErr := utils.HttpToAPIError(int(result.ErrCode), req.Method, result.Message)
92-
resp.Header().Set("Content-Type", "application/json")
93-
resp.WriteHeader(int(apiErr.Code))
94-
if err := json.NewEncoder(resp).Encode(apiErr); err != nil {
95-
http.Error(resp, fmt.Sprintf("fail to inject fault %v", err), http.StatusInternalServerError)
91+
if enableRestFaultInjection {
92+
result := t.FaultInjector.FaultInjectionRest(req.Header.Get(meshhttp.HeaderMeshRealEndpoint), req.Method)
93+
if result.Abort {
94+
apiErr := utils.HttpToAPIError(int(result.ErrCode), req.Method, result.Message)
95+
if apiErr.Code != http.StatusOK {
96+
resp.Header().Set("Content-Type", "application/json")
97+
resp.WriteHeader(int(apiErr.Code))
98+
if err := json.NewEncoder(resp).Encode(apiErr); err != nil {
99+
http.Error(resp, fmt.Sprintf("fail to inject fault %v", err), http.StatusInternalServerError)
100+
}
101+
klog.Infof("faultInjection rule, rule: %s", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode))
102+
return
103+
}
96104
}
97-
klog.Infof("faultInjection rule, rule: %s", fmt.Sprintf("fault injection, %s, %s,%d", result.Reason, result.Message, result.ErrCode))
98-
return
99105
}
100106

101107
// circuitbreaker

0 commit comments

Comments
 (0)