Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/server"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/pools"
_ "github.com/cectc/dbpack/third_party/types/parser_driver"
)
Expand Down Expand Up @@ -154,12 +155,6 @@ var (
}
}

// temporarily turn off tracer output
//tracingMgr, err := tracing.NewTracer(Version, "console")
//if err != nil {
// log.Fatalf("could not setup tracing manager: %s", err.Error())
//}

ctx, cancel := context.WithCancel(context.Background())
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
Expand All @@ -172,7 +167,6 @@ var (
cancel()
}()
<-c
//_ = tracingMgr.Shutdown(ctx)
os.Exit(1) // second signal. Exit directly.
}()

Expand All @@ -189,8 +183,13 @@ var (
if lisErr != nil {
log.Fatalf("unable init metrics server: %+v", lisErr)
}

go initServer(ctx, lis)

if conf.Trace != nil {
go initTracing(ctx, conf.Trace.JaegerEndpoint)
}

dbpack.Start(ctx)
},
}
Expand All @@ -202,6 +201,18 @@ func init() {
rootCommand.AddCommand(startCommand)
}

func initTracing(ctx context.Context, jaegerEndpoint string) {
traceCtl, err := tracing.NewTracer(Version, jaegerEndpoint)
if err != nil {
log.Fatalf("could not setup tracing manager: %s", err.Error())
}

go func() {
<-ctx.Done()
traceCtl.Shutdown(ctx)
}()
}

func initServer(ctx context.Context, lis net.Listener) {
go func() {
<-ctx.Done()
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200824191128-ae9734ed278b // indirect
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0 // indirect
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/exporters/jaeger v1.7.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0
go.opentelemetry.io/otel/sdk v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
Expand Down Expand Up @@ -807,6 +808,7 @@ github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5q
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
Expand Down Expand Up @@ -915,9 +917,12 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0 h1:wXgjiRldljksZkZrldGVe6XrG9u3kYDyQmkZwmm5dI0=
go.opentelemetry.io/otel/exporters/jaeger v1.7.0/go.mod h1:PwQAOqBgqbLQRKlj466DuD2qyMjbtcPpfPfj+AqbSBs=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0 h1:8hPcgCg0rUJiKE6VWahRvjgLUrNl7rW2hffUEPKXVEM=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.7.0/go.mod h1:K4GDXPY6TjUiwbOh+DkKaEdCF8y+lvMoM6SeAPyfCCM=
go.opentelemetry.io/otel/sdk v1.7.0 h1:4OmStpcKVOfvDOgCt7UriAPtKolwIhxpnSNI/yK+1B0=
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type Configuration struct {
TerminationDrainDuration time.Duration `yaml:"termination_drain_duration" json:"termination_drain_duration"`

HTTPListenPort *int `yaml:"http_listen_port"`

Trace *Trace `yaml:"trace"`
}

type (
Expand Down Expand Up @@ -112,6 +114,10 @@ type (
Timeout time.Duration `yaml:"timeout" json:"-"`
PermitWithoutStream bool `yaml:"permit_without_stream"`
}

Trace struct {
JaegerEndpoint string `yaml:"jaeger_endpoint"`
}
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/dt/mysql_undo_log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (manager MysqlUndoLogManager) DeleteUndoLogByXID(db proto.DB, xid string) e
}

func (manager MysqlUndoLogManager) DeleteUndoLogByLogCreated(db proto.DB, logCreated time.Time, limitRows int) error {
// TODO pass ctx.
result, _, err := db.ExecuteSql(context.Background(), DeleteUndoLogByCreateSql, logCreated, limitRows)
if err != nil {
return err
Expand Down
20 changes: 11 additions & 9 deletions pkg/executor/read_write_splitting.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,31 +272,33 @@ func (executor *ReadWriteSplittingExecutor) ExecutorComQuery(ctx context.Context
}

func (executor *ReadWriteSplittingExecutor) ExecutorComStmtExecute(ctx context.Context, stmt *proto.Stmt) (proto.Result, uint16, error) {
connectionID := proto.ConnectionID(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, "executor_com_stmt_execute")
defer span.End()
connectionID := proto.ConnectionID(newCtx)
txi, ok := executor.localTransactionMap.Load(connectionID)
if ok {
// in local transaction
tx := txi.(proto.Tx)
return tx.ExecuteStmt(ctx, stmt)
return tx.ExecuteStmt(newCtx, stmt)
}
switch st := stmt.StmtNode.(type) {
case *ast.InsertStmt, *ast.DeleteStmt, *ast.UpdateStmt:
db := executor.masters.Next(proto.WithMaster(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(ctx), stmt)
db := executor.masters.Next(proto.WithMaster(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithMaster(newCtx), stmt)
case *ast.SelectStmt:
var db *DataSourceBrief
if has, dsName := hasUseDBHint(st.TableHints); has {
protoDB := resource.GetDBManager().GetDB(dsName)
if protoDB == nil {
log.Debugf("data source %d not found", dsName)
db = executor.reads.Next(proto.WithSlave(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(ctx), stmt)
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
} else {
return protoDB.ExecuteStmt(proto.WithSlave(ctx), stmt)
return protoDB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
}
}
db = executor.reads.Next(proto.WithSlave(ctx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(ctx), stmt)
db = executor.reads.Next(proto.WithSlave(newCtx)).(*DataSourceBrief)
return db.DB.ExecuteStmt(proto.WithSlave(newCtx), stmt)
default:
return nil, 0, errors.Errorf("unsupported %t statement", stmt.StmtNode)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/filter/dt/exec/prepare_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -49,8 +50,11 @@ func NewPrepareDeleteExecutor(
}

func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
Expand All @@ -64,6 +68,7 @@ func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/filter/dt/exec/prepare_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand Down Expand Up @@ -58,19 +59,23 @@ func (executor *prepareInsertExecutor) BeforeImage(ctx context.Context) (*schema
}

func (executor *prepareInsertExecutor) AfterImage(ctx context.Context) (*schema.TableRecords, error) {
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
defer span.End()
var afterImage *schema.TableRecords
var err error
pkValues, err := executor.getPKValuesByColumn(ctx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
if executor.getPKIndex(ctx) >= 0 {
afterImage, err = executor.buildTableRecords(ctx, pkValues)
afterImage, err = executor.buildTableRecords(newCtx, pkValues)
} else {
pk, _ := executor.result.LastInsertId()
afterImage, err = executor.buildTableRecords(ctx, []interface{}{pk})
afterImage, err = executor.buildTableRecords(newCtx, []interface{}{pk})
}
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return afterImage, nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/filter/dt/exec/prepare_select_for_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"github.com/cectc/dbpack/pkg/tracing"

"github.com/cectc/dbpack/pkg/driver"
"github.com/cectc/dbpack/pkg/dt"
"github.com/cectc/dbpack/pkg/dt/schema"
Expand Down Expand Up @@ -54,7 +56,10 @@ func NewPrepareSelectForUpdateExecutor(
}

func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context, xid string, lockRetryInterval time.Duration, lockRetryTimes int) (bool, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, "executable")
defer span.End()

tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
return false, err
}
Expand All @@ -70,7 +75,7 @@ func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context,
err error
)
for i := 0; i < lockRetryTimes; i++ {
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(ctx,
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(newCtx,
executor.conn.DataSourceName(), lockKeys, xid)
if lockable && err == nil {
break
Expand Down
14 changes: 11 additions & 3 deletions pkg/filter/dt/exec/prepare_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -52,8 +53,11 @@ func NewPrepareUpdateExecutor(
}

func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
Expand All @@ -67,6 +71,7 @@ func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema

result, _, err := executor.conn.PrepareQueryArgs(sql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand All @@ -76,9 +81,11 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
if executor.beforeImage == nil || len(executor.beforeImage.Rows) == 0 {
return nil, nil
}

tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}

Expand All @@ -89,6 +96,7 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
}
result, _, err := executor.conn.PrepareQueryArgs(afterImageSql, args)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildBinaryRecords(tableMeta, result), nil
Expand Down
7 changes: 6 additions & 1 deletion pkg/filter/dt/exec/query_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cectc/dbpack/pkg/meta"
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand All @@ -46,13 +47,17 @@ func NewQueryDeleteExecutor(
}

func (executor *queryDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
tableMeta, err := executor.GetTableMeta(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
defer span.End()
tableMeta, err := executor.GetTableMeta(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
sql := executor.buildBeforeImageSql(tableMeta)
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return schema.BuildTextRecords(tableMeta, result), nil
Expand Down
14 changes: 10 additions & 4 deletions pkg/filter/dt/exec/query_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cectc/dbpack/pkg/misc"
"github.com/cectc/dbpack/pkg/proto"
"github.com/cectc/dbpack/pkg/resource"
"github.com/cectc/dbpack/pkg/tracing"
"github.com/cectc/dbpack/third_party/parser/ast"
"github.com/cectc/dbpack/third_party/parser/format"
)
Expand Down Expand Up @@ -59,17 +60,22 @@ func (executor *queryInsertExecutor) AfterImage(ctx context.Context) (*schema.Ta
pkValues []interface{}
err error
)
pkValues, err = executor.getPKValuesByColumn(ctx)
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
defer span.End()

pkValues, err = executor.getPKValuesByColumn(newCtx)
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
if executor.getPKIndex(ctx) >= 0 {
afterImage, err = executor.buildTableRecords(ctx, pkValues)
if executor.getPKIndex(newCtx) >= 0 {
afterImage, err = executor.buildTableRecords(newCtx, pkValues)
} else {
pk, _ := executor.result.LastInsertId()
afterImage, err = executor.buildTableRecords(ctx, []interface{}{pk})
afterImage, err = executor.buildTableRecords(newCtx, []interface{}{pk})
}
if err != nil {
tracing.RecordErrorSpan(span, err)
return nil, err
}
return afterImage, nil
Expand Down
Loading