@@ -15,6 +15,8 @@ import (
1515 "sync"
1616 "time"
1717
18+ "github.com/sony/gobreaker/v2"
19+
1820 "github.com/contentsquare/chproxy/cache"
1921 "github.com/contentsquare/chproxy/config"
2022 "github.com/contentsquare/chproxy/internal/topology"
@@ -51,8 +53,46 @@ type reverseProxy struct {
5153 maxErrorReasonSize int64
5254}
5355
56+ type circuitBreakingTransport struct {
57+ transport http.RoundTripper
58+ cb * gobreaker.CircuitBreaker [* http.Response ]
59+ }
60+
61+ var (
62+ // errCircuitBreaking is an error instance indicating a circuit breaker has been triggered.
63+ errCircuitBreaking = errors .New ("circuit breaking status code" )
64+
65+ // clickhouseBadStatusForCircuitBreaking is a map that associates HTTP status codes with an empty struct.
66+ // This map is used to quickly check if a ClickHouse operation resulted in a status code that indicates
67+ // a condition severe enough to trigger circuit breaking.
68+ clickhouseBadStatusForCircuitBreaking = map [int ]struct {}{
69+ 201 : {}, // QUOTA_EXCEEDED
70+ 241 : {}, // MEMORY_LIMIT_EXCEEDED
71+ 439 : {}, // CANNOT_SCHEDULE_TASK
72+ }
73+ )
74+
75+ func (b * circuitBreakingTransport ) RoundTrip (r * http.Request ) (* http.Response , error ) {
76+ return b .cb .Execute (func () (* http.Response , error ) {
77+ resp , err := b .transport .RoundTrip (r )
78+ if err != nil {
79+ return nil , err
80+ }
81+
82+ if _ , ok := clickhouseBadStatusForCircuitBreaking [resp .StatusCode ]; ok {
83+ return nil , errCircuitBreaking
84+ }
85+
86+ if resp .StatusCode >= http .StatusInternalServerError {
87+ return nil , errCircuitBreaking
88+ }
89+
90+ return resp , err
91+ })
92+ }
93+
5494func newReverseProxy (cfgCp * config.ConnectionPool ) * reverseProxy {
55- transport : = & http.Transport {
95+ var transport http. RoundTripper = & http.Transport {
5696 Proxy : http .ProxyFromEnvironment ,
5797 DialContext : func (ctx context.Context , network , addr string ) (net.Conn , error ) {
5898 dialer := & net.Dialer {
@@ -69,6 +109,33 @@ func newReverseProxy(cfgCp *config.ConnectionPool) *reverseProxy {
69109 ExpectContinueTimeout : 1 * time .Second ,
70110 }
71111
112+ if cfgCp .BreakerOn {
113+ var st gobreaker.Settings
114+ st .Name = "ch-proxy"
115+ st .MaxRequests = cfgCp .BreakerMaxRequests
116+ st .Interval = cfgCp .BreakerInterval
117+ st .Timeout = cfgCp .BreakerTimeout
118+
119+ errorRequests := uint32 (3 )
120+ if cfgCp .BreakerErrorRequests != 0 {
121+ errorRequests = cfgCp .BreakerErrorRequests
122+ }
123+ failureRatio := 0.6
124+ if cfgCp .BreakerFailureRatio != 0 {
125+ failureRatio = cfgCp .BreakerFailureRatio
126+ }
127+
128+ st .ReadyToTrip = func (counts gobreaker.Counts ) bool {
129+ fr := float64 (counts .TotalFailures ) / float64 (counts .Requests )
130+ return counts .Requests >= errorRequests && fr >= failureRatio
131+ }
132+
133+ transport = & circuitBreakingTransport {
134+ transport : transport ,
135+ cb : gobreaker.NewCircuitBreaker [* http.Response ](st ), //nolint:bodyclose
136+ }
137+ }
138+
72139 return & reverseProxy {
73140 rp : & httputil.ReverseProxy {
74141 Director : func (* http.Request ) {},
0 commit comments