Skip to content

Commit d94dbb6

Browse files
author
v.raskin
committed
add circuit breaking
1 parent 65c7fac commit d94dbb6

File tree

5 files changed

+145
-2
lines changed

5 files changed

+145
-2
lines changed

config/config.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,28 @@ type ConnectionPool struct {
10521052
// Maximum number of idle connections between chproxy and particuler ClickHouse instance
10531053
MaxIdleConnsPerHost int `yaml:"max_idle_conns_per_host,omitempty"`
10541054

1055+
// BreakerOn switch on CircuitBreaker for clickhouse http client.
1056+
BreakerOn bool
1057+
// BreakerMaxRequests is the maximum number of requests allowed to pass through
1058+
// when the CircuitBreaker is half-open.
1059+
// If BreakerMaxRequests is 0, the CircuitBreaker allows only 1 request.
1060+
BreakerMaxRequests uint32
1061+
// BreakerInterval is the cyclic period of the closed state
1062+
// for the CircuitBreaker to clear the internal Counts.
1063+
// If BreakerInterval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
1064+
BreakerInterval time.Duration
1065+
// BreakerTimeout is the period of the open state,
1066+
// after which the state of the CircuitBreaker becomes half-open.
1067+
// If BreakerTimeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
1068+
BreakerTimeout time.Duration
1069+
// BreakerFailureRatio is a threshold for determining whether a system is considered ready to handle requests based on its recent failure rate
1070+
// Default value is 0.6.
1071+
BreakerFailureRatio float64
1072+
// BreakerErrorRequests is a variable that represents a threshold for the total number of failed requests
1073+
// that should be considered significant enough to trigger some action or state change within the system
1074+
// Default value is 3.
1075+
BreakerErrorRequests uint32
1076+
10551077
// Catches all undefined fields
10561078
XXX map[string]interface{} `yaml:",inline"`
10571079
}

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ require (
1111
github.com/pierrec/lz4 v2.4.0+incompatible
1212
github.com/prometheus/client_golang v1.11.1
1313
github.com/redis/go-redis/v9 v9.0.2
14-
github.com/stretchr/testify v1.8.1
14+
github.com/sony/gobreaker/v2 v2.0.0
15+
github.com/stretchr/testify v1.8.4
1516
golang.org/x/crypto v0.21.0
1617
golang.org/x/time v0.3.0
1718
gopkg.in/yaml.v2 v2.4.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ github.com/redis/go-redis/v9 v9.0.2/go.mod h1:/xDTe9EF1LM61hek62Poq2nzQSGj0xSrEt
115115
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
116116
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
117117
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
118+
github.com/sony/gobreaker/v2 v2.0.0 h1:23AaR4JQ65y4rz8JWMzgXw2gKOykZ/qfqYunll4OwJ4=
119+
github.com/sony/gobreaker/v2 v2.0.0/go.mod h1:8JnRUz80DJ1/ne8M8v7nmTs2713i58nIt4s7XcGe/DI=
118120
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
119121
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
120122
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
@@ -126,6 +128,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
126128
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
127129
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
128130
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
131+
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
132+
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
129133
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 h1:k/gmLsJDWwWqbLCur2yWnJzwQEKRcAHXo6seXGuSwWw=
130134
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9/go.mod h1:E1AXubJBdNmFERAOucpDIxNzeGfLzg0mYh+UfMWdChA=
131135
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

proxy.go

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"sync"
1616
"time"
1717

18+
"github.com/sony/gobreaker/v2"
19+
1820
"github.com/contentsquare/chproxy/cache"
1921
"github.com/contentsquare/chproxy/config"
2022
"github.com/contentsquare/chproxy/internal/topology"
@@ -51,8 +53,46 @@ type reverseProxy struct {
5153
maxErrorReasonSize int64
5254
}
5355

56+
type circuitBreakingTransport struct {
57+
transport http.RoundTripper
58+
cb *gobreaker.CircuitBreaker[*http.Response]
59+
}
60+
61+
var (
62+
// errCircuitBreaking is an error instance indicating a circuit breaker has been triggered.
63+
errCircuitBreaking = errors.New("circuit breaking status code")
64+
65+
// clickhouseBadStatusForCircuitBreaking is a map that associates HTTP status codes with an empty struct.
66+
// This map is used to quickly check if a ClickHouse operation resulted in a status code that indicates
67+
// a condition severe enough to trigger circuit breaking.
68+
clickhouseBadStatusForCircuitBreaking = map[int]struct{}{
69+
201: {}, // QUOTA_EXCEEDED
70+
241: {}, // MEMORY_LIMIT_EXCEEDED
71+
439: {}, // CANNOT_SCHEDULE_TASK
72+
}
73+
)
74+
75+
func (b *circuitBreakingTransport) RoundTrip(r *http.Request) (*http.Response, error) {
76+
return b.cb.Execute(func() (*http.Response, error) {
77+
resp, err := b.transport.RoundTrip(r)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
if _, ok := clickhouseBadStatusForCircuitBreaking[resp.StatusCode]; ok {
83+
return nil, errCircuitBreaking
84+
}
85+
86+
if resp.StatusCode >= http.StatusInternalServerError {
87+
return nil, errCircuitBreaking
88+
}
89+
90+
return resp, err
91+
})
92+
}
93+
5494
func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
55-
transport := &http.Transport{
95+
var transport http.RoundTripper = &http.Transport{
5696
Proxy: http.ProxyFromEnvironment,
5797
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
5898
dialer := &net.Dialer{
@@ -69,6 +109,33 @@ func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
69109
ExpectContinueTimeout: 1 * time.Second,
70110
}
71111

112+
if cfgCp.BreakerOn {
113+
var st gobreaker.Settings
114+
st.Name = "ch-proxy"
115+
st.MaxRequests = cfgCp.BreakerMaxRequests
116+
st.Interval = cfgCp.BreakerInterval
117+
st.Timeout = cfgCp.BreakerTimeout
118+
119+
errorRequests := uint32(3)
120+
if cfgCp.BreakerErrorRequests != 0 {
121+
errorRequests = cfgCp.BreakerErrorRequests
122+
}
123+
failureRatio := 0.6
124+
if cfgCp.BreakerFailureRatio != 0 {
125+
failureRatio = cfgCp.BreakerFailureRatio
126+
}
127+
128+
st.ReadyToTrip = func(counts gobreaker.Counts) bool {
129+
fr := float64(counts.TotalFailures) / float64(counts.Requests)
130+
return counts.Requests >= errorRequests && fr >= failureRatio
131+
}
132+
133+
transport = &circuitBreakingTransport{
134+
transport: transport,
135+
cb: gobreaker.NewCircuitBreaker[*http.Response](st), //nolint:bodyclose
136+
}
137+
}
138+
72139
return &reverseProxy{
73140
rp: &httputil.ReverseProxy{
74141
Director: func(*http.Request) {},

proxy_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,43 @@ var goodCfgWithCacheAndMaxErrorReasonSize = &config.Config{
160160
},
161161
},
162162
}
163+
var goodCfgWithBreaker = &config.Config{
164+
Server: config.Server{
165+
Metrics: config.Metrics{
166+
Namespace: "proxy_test"},
167+
},
168+
Clusters: []config.Cluster{
169+
{
170+
Name: "cluster",
171+
Scheme: "http",
172+
Replicas: []config.Replica{
173+
{
174+
Nodes: []string{"localhost:8123"},
175+
},
176+
},
177+
ClusterUsers: []config.ClusterUser{
178+
{
179+
Name: "web",
180+
},
181+
},
182+
},
183+
},
184+
Users: []config.User{
185+
{
186+
Name: defaultUsername,
187+
ToCluster: "cluster",
188+
ToUser: "web",
189+
},
190+
},
191+
ParamGroups: []config.ParamGroup{
192+
{Name: "param_test", Params: []config.Param{{Key: "param_key", Value: "param_value"}}},
193+
},
194+
ConnectionPool: config.ConnectionPool{
195+
BreakerOn: true,
196+
BreakerErrorRequests: 1,
197+
BreakerFailureRatio: 0.1,
198+
},
199+
}
163200

164201
func newConfiguredProxy(cfg *config.Config) (*reverseProxy, error) {
165202
p := newReverseProxy(&cfg.ConnectionPool)
@@ -795,6 +832,17 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
795832
},
796833
transactionFailReason: "unknown error reason",
797834
},
835+
{
836+
cfg: goodCfgWithBreaker,
837+
name: "error with breaker on",
838+
expResponse: badGatewayResponse,
839+
expStatusCode: http.StatusBadGateway,
840+
f: func(p *reverseProxy) *http.Response {
841+
req := httptest.NewRequest("GET", fmt.Sprintf("%s/badGateway?query=%s", fakeServer.URL, query), nil)
842+
return makeCustomRequest(p, req)
843+
},
844+
transactionFailReason: "unknown error reason1",
845+
},
798846
}
799847

800848
for _, tc := range testCases {
@@ -807,6 +855,7 @@ func TestReverseProxy_ServeHTTP1(t *testing.T) {
807855
resp := tc.f(proxy)
808856
b := bbToString(t, resp.Body)
809857
resp.Body.Close()
858+
810859
if len(tc.cfg.Caches) != 0 {
811860
compareTransactionFailReason(t, proxy, tc.cfg.Clusters[0].ClusterUsers[0], query, tc.transactionFailReason)
812861
}

0 commit comments

Comments
 (0)