Skip to content

Commit 7a1a977

Browse files
authored
feat: support circuit breaker (#228)
1 parent 8d6877d commit 7a1a977

12 files changed

Lines changed: 351 additions & 68 deletions

File tree

cmd/cmd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/cectc/dbpack/pkg/executor"
3838
"github.com/cectc/dbpack/pkg/filter"
3939
_ "github.com/cectc/dbpack/pkg/filter/audit_log"
40+
_ "github.com/cectc/dbpack/pkg/filter/breaker"
4041
_ "github.com/cectc/dbpack/pkg/filter/crypto"
4142
_ "github.com/cectc/dbpack/pkg/filter/dt"
4243
_ "github.com/cectc/dbpack/pkg/filter/metrics"

go.mod

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,25 +34,25 @@ require (
3434
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0
3535
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0
3636
go.uber.org/atomic v1.9.0
37-
go.uber.org/goleak v1.1.11
37+
go.uber.org/goleak v1.1.12
3838
go.uber.org/ratelimit v0.2.1-0.20220713224938-b62b799bc9a5
3939
go.uber.org/zap v1.21.0
4040
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
4141
golang.org/x/text v0.3.7
42-
google.golang.org/grpc v1.40.0
42+
google.golang.org/grpc v1.43.0
4343
gopkg.in/yaml.v3 v3.0.0
4444
k8s.io/client-go v0.23.5
4545
vimagination.zapto.org/byteio v1.0.1
4646
)
4747

4848
require (
4949
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
50-
github.com/Microsoft/go-winio v0.4.17 // indirect
51-
github.com/Microsoft/hcsshim v0.8.23 // indirect
50+
github.com/Microsoft/go-winio v0.5.1 // indirect
51+
github.com/Microsoft/hcsshim v0.9.4 // indirect
5252
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
53-
github.com/containerd/cgroups v1.0.1 // indirect
54-
github.com/containerd/containerd v1.5.9 // indirect
55-
github.com/docker/distribution v2.7.1+incompatible // indirect
53+
github.com/containerd/cgroups v1.0.4 // indirect
54+
github.com/containerd/containerd v1.6.8 // indirect
55+
github.com/docker/distribution v2.8.1+incompatible // indirect
5656
github.com/docker/docker v20.10.11+incompatible // indirect
5757
github.com/docker/go-connections v0.4.0 // indirect
5858
github.com/docker/go-units v0.4.0 // indirect
@@ -71,10 +71,10 @@ require (
7171
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
7272
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7373
github.com/modern-go/reflect2 v1.0.2 // indirect
74-
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c // indirect
74+
github.com/morikuni/aec v1.0.0 // indirect
7575
github.com/opencontainers/go-digest v1.0.0 // indirect
76-
github.com/opencontainers/image-spec v1.0.2 // indirect
77-
github.com/opencontainers/runc v1.0.2 // indirect
76+
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
77+
github.com/opencontainers/runc v1.1.3 // indirect
7878
github.com/sirupsen/logrus v1.8.1 // indirect
7979
github.com/ugorji/go/codec v1.1.7 // indirect
8080
go.opencensus.io v0.23.0 // indirect
@@ -105,7 +105,7 @@ require (
105105
github.com/gobuffalo/packr v1.30.1 // indirect
106106
github.com/golang/protobuf v1.5.2 // indirect
107107
github.com/gorilla/mux v1.8.0
108-
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
108+
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
109109
github.com/inconshreveable/mousetrap v1.0.0 // indirect
110110
github.com/joho/godotenv v1.3.0 // indirect
111111
github.com/klauspost/compress v1.15.0 // indirect
@@ -141,7 +141,7 @@ require (
141141
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
142142
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
143143
golang.org/x/tools v0.1.10 // indirect
144-
google.golang.org/genproto v0.0.0-20210402141018-6c239bbf2bb1 // indirect
144+
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
145145
google.golang.org/protobuf v1.27.1
146146
gopkg.in/natefinch/lumberjack.v2 v2.0.0
147147
k8s.io/apimachinery v0.23.5 // indirect

go.sum

Lines changed: 78 additions & 18 deletions
Large diffs are not rendered by default.

pkg/executor/read_write_splitting.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -163,12 +163,9 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(
163163
defer func() {
164164
if err == nil {
165165
result, err = decodeResult(result)
166-
if err != nil {
167-
span.RecordError(err)
168-
return
169-
}
170-
err = executor.doPostFilter(spanCtx, result)
171-
} else {
166+
}
167+
err = executor.doPostFilter(spanCtx, result, err)
168+
if err != nil {
172169
span.RecordError(err)
173170
}
174171
}()
@@ -307,12 +304,9 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComStmtExecute(
307304
defer func() {
308305
if err == nil {
309306
result, err = decodeResult(result)
310-
if err != nil {
311-
span.RecordError(err)
312-
return
313-
}
314-
err = executor.doPostFilter(spanCtx, result)
315-
} else {
307+
}
308+
err = executor.doPostFilter(spanCtx, result, err)
309+
if err != nil {
316310
span.RecordError(err)
317311
}
318312
}()
@@ -371,10 +365,10 @@ func (executor *ReadWriteSplittingExecutor) doPreFilter(ctx context.Context) err
371365
return nil
372366
}
373367

374-
func (executor *ReadWriteSplittingExecutor) doPostFilter(ctx context.Context, result proto.Result) error {
368+
func (executor *ReadWriteSplittingExecutor) doPostFilter(ctx context.Context, result proto.Result, err error) error {
375369
for i := 0; i < len(executor.PostFilters); i++ {
376370
f := executor.PostFilters[i]
377-
err := f.PostHandle(ctx, result)
371+
err := f.PostHandle(ctx, result, err)
378372
if err != nil {
379373
return err
380374
}

pkg/executor/sharding.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,7 @@ func (executor *ShardingExecutor) ExecutorComStmtExecute(
256256
return nil, 0, err
257257
}
258258
defer func() {
259-
if err == nil {
260-
err = executor.doPostFilter(ctx, result)
261-
}
259+
err = executor.doPostFilter(ctx, result, err)
262260
}()
263261

264262
var (
@@ -306,10 +304,10 @@ func (executor *ShardingExecutor) doPreFilter(ctx context.Context) error {
306304
return nil
307305
}
308306

309-
func (executor *ShardingExecutor) doPostFilter(ctx context.Context, result proto.Result) error {
307+
func (executor *ShardingExecutor) doPostFilter(ctx context.Context, result proto.Result, err error) error {
310308
for i := 0; i < len(executor.PostFilters); i++ {
311309
f := executor.PostFilters[i]
312-
err := f.PostHandle(ctx, result)
310+
err := f.PostHandle(ctx, result, err)
313311
if err != nil {
314312
return err
315313
}

pkg/executor/single_db.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,9 @@ func (executor *SingleDBExecutor) ExecutorComQuery(
136136
defer func() {
137137
if err == nil {
138138
result, err = decodeResult(result)
139-
if err != nil {
140-
span.RecordError(err)
141-
return
142-
}
143-
err = executor.doPostFilter(spanCtx, result)
144-
} else {
139+
}
140+
err = executor.doPostFilter(spanCtx, result, err)
141+
if err != nil {
145142
span.RecordError(err)
146143
}
147144
}()
@@ -238,12 +235,9 @@ func (executor *SingleDBExecutor) ExecutorComStmtExecute(
238235
defer func() {
239236
if err == nil {
240237
result, err = decodeResult(result)
241-
if err != nil {
242-
span.RecordError(err)
243-
return
244-
}
245-
err = executor.doPostFilter(spanCtx, result)
246-
} else {
238+
}
239+
err = executor.doPostFilter(spanCtx, result, err)
240+
if err != nil {
247241
span.RecordError(err)
248242
}
249243
}()
@@ -283,10 +277,10 @@ func (executor *SingleDBExecutor) doPreFilter(ctx context.Context) error {
283277
return nil
284278
}
285279

286-
func (executor *SingleDBExecutor) doPostFilter(ctx context.Context, result proto.Result) error {
280+
func (executor *SingleDBExecutor) doPostFilter(ctx context.Context, result proto.Result, err error) error {
287281
for i := 0; i < len(executor.PostFilters); i++ {
288282
f := executor.PostFilters[i]
289-
err := f.PostHandle(ctx, result)
283+
err := f.PostHandle(ctx, result, err)
290284
if err != nil {
291285
return err
292286
}

pkg/filter/breaker/breaker.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2022 CECTC, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package breaker
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"sync"
23+
"sync/atomic"
24+
"time"
25+
26+
"github.com/pkg/errors"
27+
28+
"github.com/cectc/dbpack/pkg/filter"
29+
"github.com/cectc/dbpack/pkg/log"
30+
"github.com/cectc/dbpack/pkg/proto"
31+
)
32+
33+
const circuitBreakFilter = "CircuitBreakerFilter"
34+
35+
// ErrBreakerOpen is the error returned from PreHandle() when the function is not executed
36+
// because the breaker is currently open.
37+
var ErrBreakerOpen = errors.New("circuit breaker is open")
38+
39+
const (
40+
closed uint32 = iota
41+
open
42+
halfOpen
43+
)
44+
45+
type _factory struct{}
46+
47+
func (factory *_factory) NewFilter(_ string, config map[string]interface{}) (proto.Filter, error) {
48+
var (
49+
err error
50+
content []byte
51+
conf *CircuitBreakerConfig
52+
)
53+
if content, err = json.Marshal(config); err != nil {
54+
return nil, errors.Wrap(err, "marshal circuit breaker filter config failed.")
55+
}
56+
if err = json.Unmarshal(content, &conf); err != nil {
57+
log.Errorf("unmarshal circuit breaker filter failed, %v", err)
58+
return nil, err
59+
}
60+
61+
return &_filter{
62+
errorThreshold: conf.ErrorThreshold,
63+
successThreshold: conf.SuccessThreshold,
64+
timeout: time.Duration(conf.Timeout) * time.Second,
65+
}, nil
66+
}
67+
68+
type _filter struct {
69+
errorThreshold int
70+
successThreshold int
71+
timeout time.Duration
72+
73+
lock sync.Mutex
74+
state uint32
75+
errors int
76+
successes int
77+
lastError time.Time
78+
}
79+
80+
type CircuitBreakerConfig struct {
81+
ErrorThreshold int
82+
SuccessThreshold int
83+
Timeout int
84+
}
85+
86+
func (f *_filter) GetKind() string {
87+
return circuitBreakFilter
88+
}
89+
90+
func (f *_filter) PreHandle(ctx context.Context) error {
91+
state := atomic.LoadUint32(&f.state)
92+
93+
if state == open {
94+
return ErrBreakerOpen
95+
}
96+
97+
return nil
98+
}
99+
100+
func (f *_filter) PostHandle(ctx context.Context, result proto.Result, err error) error {
101+
state := atomic.LoadUint32(&f.state)
102+
if err == nil && state == closed {
103+
return nil
104+
}
105+
106+
f.lock.Lock()
107+
defer f.lock.Unlock()
108+
109+
if err == nil {
110+
if f.state == halfOpen {
111+
f.successes++
112+
if f.successes == f.successThreshold {
113+
f.closeBreaker()
114+
}
115+
}
116+
} else {
117+
if f.errors > 0 {
118+
expiry := f.lastError.Add(f.timeout)
119+
if time.Now().After(expiry) {
120+
f.errors = 0
121+
}
122+
}
123+
124+
switch f.state {
125+
case closed:
126+
f.errors++
127+
if f.errors == f.errorThreshold {
128+
f.openBreaker()
129+
} else {
130+
f.lastError = time.Now()
131+
}
132+
case halfOpen:
133+
f.openBreaker()
134+
}
135+
}
136+
return err
137+
}
138+
139+
func (f *_filter) openBreaker() {
140+
f.changeState(open)
141+
go f.timer()
142+
}
143+
144+
func (f *_filter) closeBreaker() {
145+
f.changeState(closed)
146+
}
147+
148+
func (f *_filter) timer() {
149+
time.Sleep(f.timeout)
150+
151+
f.lock.Lock()
152+
defer f.lock.Unlock()
153+
154+
f.changeState(halfOpen)
155+
}
156+
157+
func (f *_filter) changeState(newState uint32) {
158+
f.errors = 0
159+
f.successes = 0
160+
atomic.StoreUint32(&f.state, newState)
161+
}
162+
163+
func init() {
164+
filter.RegistryFilterFactory(circuitBreakFilter, &_factory{})
165+
}

0 commit comments

Comments
 (0)