Skip to content

Commit 2e08670

Browse files
committed
fix ut
1 parent a58a624 commit 2e08670

9 files changed

+69
-23
lines changed

pkg/manager/controllers/circuitbreaker/circuitbreaker_controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (r *CircuitBreakerReconciler) Reconcile(ctx context.Context, req ctrl.Reque
7373
}
7474

7575
defer func() {
76-
if res.RequeueAfter == 0 {
76+
if res.RequeueAfter == 0 && reconcileErr == nil {
7777
res.RequeueAfter = defaultRequeueTime
7878
}
7979
}()
@@ -284,8 +284,10 @@ func protoClient(podIp string) protoconnect.ThrottlingClient {
284284
return protoconnect.NewThrottlingClient(proto.DefaultHttpClient, podAddr(podIp))
285285
}
286286

287+
var proxyGRPCServerPort = constants.ProxyGRPCServerPort
288+
287289
func podAddr(podIp string) string {
288-
return fmt.Sprintf("https://%s:%d", podIp, constants.ProxyGRPCServerPort)
290+
return fmt.Sprintf("https://%s:%d", podIp, proxyGRPCServerPort)
289291
}
290292

291293
func isProxyAvailable(po *v1.Pod) bool {

pkg/manager/controllers/circuitbreaker/circuitbreaker_controller_suit_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"path/filepath"
2323
"sync"
2424
"testing"
25-
"time"
2625

2726
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2827
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -82,7 +81,6 @@ func TestMain(m *testing.M) {
8281
panic(err)
8382
}
8483
}()
85-
<-time.After(time.Second * 3)
8684
code := m.Run()
8785
env.Stop()
8886
os.Exit(code)

pkg/manager/controllers/circuitbreaker/circuitbreaker_controller_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ func TestCircuitBreaker(t *testing.T) {
131131
g.Expect(c.Status().Update(ctx, testPod)).Should(gomega.BeNil())
132132
g.Eventually(func() string {
133133
g.Expect(c.Get(ctx, types.NamespacedName{Name: "testcb", Namespace: "default"}, cb)).Should(gomega.BeNil())
134+
g.Expect(c.Get(ctx, types.NamespacedName{Name: testPod.Name, Namespace: "default"}, testPod)).Should(gomega.BeNil())
134135
if cb.Status.TargetStatus != nil {
135136
return cb.Status.TargetStatus[0].PodIP
136137
}
@@ -181,8 +182,13 @@ var syncCount int
181182
func RunMockServer() {
182183
breakerMgr := circuitbreaker.NewManager(ctx)
183184
breakerManager = breakerMgr
184-
proxyServer := grpcserver.GrpcServer{BreakerMgr: &mockBreakerManager{breakerMgr}}
185-
go proxyServer.Start(ctx)
185+
proxyGRPCServerPort = 5455
186+
proxyServer := grpcserver.GrpcServer{BreakerMgr: &mockBreakerManager{breakerMgr}, Port: 5455}
187+
wg.Add(1)
188+
go func() {
189+
proxyServer.Start(ctx)
190+
wg.Done()
191+
}()
186192
<-time.After(2 * time.Second)
187193
}
188194

pkg/manager/controllers/circuitbreaker/event_handler.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,19 @@ func (h *podEventHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingIn
5959
if !sidecarInjected(oldPod) {
6060
return
6161
}
62-
if !isProxyAvailable(newPod) || isProxyAvailable(oldPod) {
62+
if !isProxyAvailable(newPod) {
6363
return
6464
}
65+
if !isProxyAvailable(oldPod) {
66+
breakers, err := effectiveBreakers(h.reader, newPod)
67+
if err != nil {
68+
klog.Errorf("fail to get effective CircuitBreakers by pod %s/%s, %v", newPod.Namespace, newPod.Name, err)
69+
return
70+
}
71+
add(q, breakers)
72+
return
73+
}
74+
// all available
6575
breakers, err := matchChangedBreakers(h.reader, oldPod, newPod)
6676
if err != nil {
6777
klog.Errorf("fail to get effective CircuitBreakers by pod %s/%s, %v", newPod.Namespace, newPod.Name, err)

pkg/manager/controllers/faultinjection/event_handler.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,16 @@ func (h *podEventHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingIn
5959
if !sidecarInjected(oldPod) {
6060
return
6161
}
62-
if !isProxyAvailable(newPod) || isProxyAvailable(oldPod) {
62+
if !isProxyAvailable(newPod) {
63+
return
64+
}
65+
if !isProxyAvailable(oldPod) {
66+
breakers, err := effectiveFaults(h.reader, newPod)
67+
if err != nil {
68+
klog.Errorf("fail to get effective CircuitBreakers by pod %s/%s, %v", newPod.Namespace, newPod.Name, err)
69+
return
70+
}
71+
add(q, breakers)
6372
return
6473
}
6574
faults, err := matchChangedFaults(h.reader, oldPod, newPod)

pkg/manager/controllers/faultinjection/faultinject_controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (r *FaultInjectionReconciler) Reconcile(ctx context.Context, req ctrl.Reque
6969
}
7070

7171
defer func() {
72-
if res.RequeueAfter == 0 {
72+
if res.RequeueAfter == 0 && reconcileErr == nil {
7373
res.RequeueAfter = defaultRequeueTime
7474
}
7575
}()
@@ -278,8 +278,10 @@ func protoClient(podIp string) protoconnect.FaultInjectClient {
278278
return protoconnect.NewFaultInjectClient(proto.DefaultHttpClient, podAddr(podIp))
279279
}
280280

281+
var proxyGRPCServerPort = constants.ProxyGRPCServerPort
282+
281283
func podAddr(podIp string) string {
282-
return fmt.Sprintf("https://%s:%d", podIp, constants.ProxyGRPCServerPort)
284+
return fmt.Sprintf("https://%s:%d", podIp, proxyGRPCServerPort)
283285
}
284286

285287
// isProxyAvailable check whether the proxy container is available

pkg/manager/controllers/faultinjection/faultinjection_controller_test.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ var faultInjection = &ctrlmeshv1alpha1.FaultInjection{
107107
func TestFaultInjection(t *testing.T) {
108108
g := gomega.NewGomegaWithT(t)
109109
defer Stop()
110-
grpcserver.GrpcServerPort = 5455
110+
wg.Add(1)
111111
RunMockServer()
112112
testPod := mockPod.DeepCopy()
113113
testFaultInjection := faultInjection.DeepCopy()
@@ -206,8 +206,12 @@ var faultManager faultinjection.ManagerInterface
206206
func RunMockServer() {
207207
faultInjectionMgr := faultinjection.NewManager(ctx)
208208
faultManager = faultInjectionMgr
209-
proxyServer := grpcserver.GrpcServer{FaultInjectionMgr: &mockFaultInjectionManager{faultInjectionMgr}}
210-
go proxyServer.Start(ctx)
209+
proxyGRPCServerPort = 5456
210+
proxyServer := grpcserver.GrpcServer{FaultInjectionMgr: &mockFaultInjectionManager{faultInjectionMgr}, Port: 5456}
211+
go func() {
212+
proxyServer.Start(ctx)
213+
wg.Done()
214+
}()
211215
<-time.After(2 * time.Second)
212216
}
213217

pkg/proxy/grpcserver/server.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ import (
3535
)
3636

3737
var (
38-
GrpcServerPort = constants.ProxyGRPCServerPort
38+
grpcServerPort = constants.ProxyGRPCServerPort
3939
)
4040

4141
func init() {
4242
envConfig := os.Getenv(constants.EnvProxyGRPCServerPort)
4343
if envConfig != "" {
4444
p, err := strconv.Atoi(envConfig)
4545
if err != nil {
46-
GrpcServerPort = p
46+
grpcServerPort = p
4747
}
4848
}
4949
}
@@ -52,19 +52,25 @@ type GrpcServer struct {
5252
BreakerMgr circuitbreaker.ManagerInterface
5353
FaultInjectionMgr faultinjection.ManagerInterface
5454

55-
mux *http.ServeMux
55+
Port int
56+
mux *http.ServeMux
5657
}
5758

5859
func (s *GrpcServer) Start(ctx context.Context) {
60+
if s.Port == 0 {
61+
s.Port = grpcServerPort
62+
}
5963
s.mux = http.NewServeMux()
6064
s.mux.Handle(protoconnect.NewThrottlingHandler(&grpcThrottlingHandler{mgr: s.BreakerMgr}, connect.WithSendMaxBytes(1024*1024*64)))
6165
s.mux.Handle(protoconnect.NewFaultInjectHandler(&grpcFaultInjectHandler{mgr: s.FaultInjectionMgr}, connect.WithSendMaxBytes(1024*1024*64)))
62-
addr := fmt.Sprintf(":%d", GrpcServerPort)
66+
addr := fmt.Sprintf(":%d", s.Port)
67+
// Use h2c so we can serve HTTP/2 without TLS.
68+
server := &http.Server{Addr: addr, Handler: h2c.NewHandler(s.mux, &http2.Server{})}
6369
go func() {
64-
// Use h2c so we can serve HTTP/2 without TLS.
65-
if err := http.ListenAndServe(addr, h2c.NewHandler(s.mux, &http2.Server{})); err != nil {
70+
if err := server.ListenAndServe(); err != nil {
6671
klog.Errorf("serve gRPC error %v", err)
6772
}
6873
}()
6974
<-ctx.Done()
75+
server.Close()
7076
}

pkg/proxy/grpcserver/server_test.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package grpcserver
1919
import (
2020
"context"
2121
"fmt"
22+
"sync"
2223
"testing"
2324
"time"
2425

@@ -35,14 +36,22 @@ import (
3536

3637
func TestServer(t *testing.T) {
3738
g := gomega.NewGomegaWithT(t)
38-
ctx := context.TODO()
39-
GrpcServerPort = 8889
39+
ctx, cancel := context.WithCancel(context.TODO())
40+
wg := sync.WaitGroup{}
41+
defer func() {
42+
cancel()
43+
wg.Wait()
44+
}()
4045
breakerMgr := circuitbreaker.NewManager(ctx)
4146
proxyServer := &GrpcServer{BreakerMgr: breakerMgr}
42-
go proxyServer.Start(ctx)
47+
wg.Add(1)
48+
go func() {
49+
proxyServer.Start(ctx)
50+
wg.Done()
51+
}()
4352
<-time.After(2 * time.Second)
4453
fmt.Println(proto.TrafficInterceptRule_NORMAL.String())
45-
grpcClient := protoconnect.NewThrottlingClient(proto.DefaultHttpClient, "http://127.0.0.1:8889")
54+
grpcClient := protoconnect.NewThrottlingClient(proto.DefaultHttpClient, "http://127.0.0.1:5453")
4655

4756
cb := &ctrlmeshv1alpha1.CircuitBreaker{
4857
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)