Skip to content

Commit d09483e

Browse files
committed
chore: resolve pr issue
1 parent 165176e commit d09483e

14 files changed

Lines changed: 135 additions & 88 deletions

pkg/filter/dt/exec/prepare_delete.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cectc/dbpack/pkg/meta"
2828
"github.com/cectc/dbpack/pkg/misc"
2929
"github.com/cectc/dbpack/pkg/resource"
30+
"github.com/cectc/dbpack/pkg/tracing"
3031
"github.com/cectc/dbpack/third_party/parser/ast"
3132
"github.com/cectc/dbpack/third_party/parser/format"
3233
)
@@ -49,8 +50,11 @@ func NewPrepareDeleteExecutor(
4950
}
5051

5152
func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
52-
tableMeta, err := executor.GetTableMeta(ctx)
53+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
54+
defer span.End()
55+
tableMeta, err := executor.GetTableMeta(newCtx)
5356
if err != nil {
57+
tracing.RecordErrorSpan(span, err)
5458
return nil, err
5559
}
5660
sql := executor.buildBeforeImageSql(tableMeta)
@@ -64,6 +68,7 @@ func (executor *prepareDeleteExecutor) BeforeImage(ctx context.Context) (*schema
6468

6569
result, _, err := executor.conn.PrepareQueryArgs(sql, args)
6670
if err != nil {
71+
tracing.RecordErrorSpan(span, err)
6772
return nil, err
6873
}
6974
return schema.BuildBinaryRecords(tableMeta, result), nil

pkg/filter/dt/exec/prepare_insert.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cectc/dbpack/pkg/misc"
3030
"github.com/cectc/dbpack/pkg/proto"
3131
"github.com/cectc/dbpack/pkg/resource"
32+
"github.com/cectc/dbpack/pkg/tracing"
3233
"github.com/cectc/dbpack/third_party/parser/ast"
3334
"github.com/cectc/dbpack/third_party/parser/format"
3435
)
@@ -58,19 +59,23 @@ func (executor *prepareInsertExecutor) BeforeImage(ctx context.Context) (*schema
5859
}
5960

6061
func (executor *prepareInsertExecutor) AfterImage(ctx context.Context) (*schema.TableRecords, error) {
62+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
63+
defer span.End()
6164
var afterImage *schema.TableRecords
6265
var err error
6366
pkValues, err := executor.getPKValuesByColumn(ctx)
6467
if err != nil {
68+
tracing.RecordErrorSpan(span, err)
6569
return nil, err
6670
}
6771
if executor.getPKIndex(ctx) >= 0 {
68-
afterImage, err = executor.buildTableRecords(ctx, pkValues)
72+
afterImage, err = executor.buildTableRecords(newCtx, pkValues)
6973
} else {
7074
pk, _ := executor.result.LastInsertId()
71-
afterImage, err = executor.buildTableRecords(ctx, []interface{}{pk})
75+
afterImage, err = executor.buildTableRecords(newCtx, []interface{}{pk})
7276
}
7377
if err != nil {
78+
tracing.RecordErrorSpan(span, err)
7479
return nil, err
7580
}
7681
return afterImage, nil

pkg/filter/dt/exec/prepare_select_for_update.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"strings"
2222
"time"
2323

24+
"github.com/cectc/dbpack/pkg/tracing"
25+
2426
"github.com/cectc/dbpack/pkg/driver"
2527
"github.com/cectc/dbpack/pkg/dt"
2628
"github.com/cectc/dbpack/pkg/dt/schema"
@@ -54,7 +56,10 @@ func NewPrepareSelectForUpdateExecutor(
5456
}
5557

5658
func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context, xid string, lockRetryInterval time.Duration, lockRetryTimes int) (bool, error) {
57-
tableMeta, err := executor.GetTableMeta(ctx)
59+
newCtx, span := tracing.GetTraceSpan(ctx, "executable")
60+
defer span.End()
61+
62+
tableMeta, err := executor.GetTableMeta(newCtx)
5863
if err != nil {
5964
return false, err
6065
}
@@ -70,7 +75,7 @@ func (executor *prepareSelectForUpdateExecutor) Executable(ctx context.Context,
7075
err error
7176
)
7277
for i := 0; i < lockRetryTimes; i++ {
73-
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(ctx,
78+
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(newCtx,
7479
executor.conn.DataSourceName(), lockKeys, xid)
7580
if lockable && err == nil {
7681
break

pkg/filter/dt/exec/prepare_update.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cectc/dbpack/pkg/meta"
2828
"github.com/cectc/dbpack/pkg/misc"
2929
"github.com/cectc/dbpack/pkg/resource"
30+
"github.com/cectc/dbpack/pkg/tracing"
3031
"github.com/cectc/dbpack/third_party/parser/ast"
3132
"github.com/cectc/dbpack/third_party/parser/format"
3233
)
@@ -52,8 +53,11 @@ func NewPrepareUpdateExecutor(
5253
}
5354

5455
func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
55-
tableMeta, err := executor.GetTableMeta(ctx)
56+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
57+
defer span.End()
58+
tableMeta, err := executor.GetTableMeta(newCtx)
5659
if err != nil {
60+
tracing.RecordErrorSpan(span, err)
5761
return nil, err
5862
}
5963
sql := executor.buildBeforeImageSql(tableMeta)
@@ -67,6 +71,7 @@ func (executor *prepareUpdateExecutor) BeforeImage(ctx context.Context) (*schema
6771

6872
result, _, err := executor.conn.PrepareQueryArgs(sql, args)
6973
if err != nil {
74+
tracing.RecordErrorSpan(span, err)
7075
return nil, err
7176
}
7277
return schema.BuildBinaryRecords(tableMeta, result), nil
@@ -76,9 +81,11 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
7681
if executor.beforeImage == nil || len(executor.beforeImage.Rows) == 0 {
7782
return nil, nil
7883
}
79-
80-
tableMeta, err := executor.GetTableMeta(ctx)
84+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
85+
defer span.End()
86+
tableMeta, err := executor.GetTableMeta(newCtx)
8187
if err != nil {
88+
tracing.RecordErrorSpan(span, err)
8289
return nil, err
8390
}
8491

@@ -89,6 +96,7 @@ func (executor *prepareUpdateExecutor) AfterImage(ctx context.Context) (*schema.
8996
}
9097
result, _, err := executor.conn.PrepareQueryArgs(afterImageSql, args)
9198
if err != nil {
99+
tracing.RecordErrorSpan(span, err)
92100
return nil, err
93101
}
94102
return schema.BuildBinaryRecords(tableMeta, result), nil

pkg/filter/dt/exec/query_delete.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cectc/dbpack/pkg/meta"
2828
"github.com/cectc/dbpack/pkg/misc"
2929
"github.com/cectc/dbpack/pkg/resource"
30+
"github.com/cectc/dbpack/pkg/tracing"
3031
"github.com/cectc/dbpack/third_party/parser/ast"
3132
"github.com/cectc/dbpack/third_party/parser/format"
3233
)
@@ -46,13 +47,17 @@ func NewQueryDeleteExecutor(
4647
}
4748

4849
func (executor *queryDeleteExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
49-
tableMeta, err := executor.GetTableMeta(ctx)
50+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
51+
defer span.End()
52+
tableMeta, err := executor.GetTableMeta(newCtx)
5053
if err != nil {
54+
tracing.RecordErrorSpan(span, err)
5155
return nil, err
5256
}
5357
sql := executor.buildBeforeImageSql(tableMeta)
5458
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
5559
if err != nil {
60+
tracing.RecordErrorSpan(span, err)
5661
return nil, err
5762
}
5863
return schema.BuildTextRecords(tableMeta, result), nil

pkg/filter/dt/exec/query_insert.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cectc/dbpack/pkg/misc"
2929
"github.com/cectc/dbpack/pkg/proto"
3030
"github.com/cectc/dbpack/pkg/resource"
31+
"github.com/cectc/dbpack/pkg/tracing"
3132
"github.com/cectc/dbpack/third_party/parser/ast"
3233
"github.com/cectc/dbpack/third_party/parser/format"
3334
)
@@ -59,17 +60,22 @@ func (executor *queryInsertExecutor) AfterImage(ctx context.Context) (*schema.Ta
5960
pkValues []interface{}
6061
err error
6162
)
62-
pkValues, err = executor.getPKValuesByColumn(ctx)
63+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
64+
defer span.End()
65+
66+
pkValues, err = executor.getPKValuesByColumn(newCtx)
6367
if err != nil {
68+
tracing.RecordErrorSpan(span, err)
6469
return nil, err
6570
}
66-
if executor.getPKIndex(ctx) >= 0 {
67-
afterImage, err = executor.buildTableRecords(ctx, pkValues)
71+
if executor.getPKIndex(newCtx) >= 0 {
72+
afterImage, err = executor.buildTableRecords(newCtx, pkValues)
6873
} else {
6974
pk, _ := executor.result.LastInsertId()
70-
afterImage, err = executor.buildTableRecords(ctx, []interface{}{pk})
75+
afterImage, err = executor.buildTableRecords(newCtx, []interface{}{pk})
7176
}
7277
if err != nil {
78+
tracing.RecordErrorSpan(span, err)
7379
return nil, err
7480
}
7581
return afterImage, nil

pkg/filter/dt/exec/query_select_for_update.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cectc/dbpack/pkg/mysql"
3030
"github.com/cectc/dbpack/pkg/proto"
3131
"github.com/cectc/dbpack/pkg/resource"
32+
"github.com/cectc/dbpack/pkg/tracing"
3233
"github.com/cectc/dbpack/third_party/parser/ast"
3334
"github.com/cectc/dbpack/third_party/parser/format"
3435
)
@@ -51,7 +52,9 @@ func NewQuerySelectForUpdateExecutor(
5152
}
5253

5354
func (executor *querySelectForUpdateExecutor) Executable(ctx context.Context, xid string, lockRetryInterval time.Duration, lockRetryTimes int) (bool, error) {
54-
tableMeta, err := executor.GetTableMeta(ctx)
55+
newCtx, span := tracing.GetTraceSpan(ctx, "executable")
56+
defer span.End()
57+
tableMeta, err := executor.GetTableMeta(newCtx)
5558
if err != nil {
5659
return false, err
5760
}
@@ -67,14 +70,15 @@ func (executor *querySelectForUpdateExecutor) Executable(ctx context.Context, xi
6770
err error
6871
)
6972
for i := 0; i < lockRetryTimes; i++ {
70-
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(ctx,
73+
lockable, err = dt.GetDistributedTransactionManager().IsLockableWithXID(newCtx,
7174
executor.conn.DataSourceName(), lockKeys, xid)
7275
if lockable && err == nil {
7376
break
7477
}
7578
time.Sleep(lockRetryInterval)
7679
}
7780
if err != nil {
81+
tracing.RecordErrorSpan(span, err)
7882
return false, err
7983
}
8084
}

pkg/filter/dt/exec/query_update.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cectc/dbpack/pkg/meta"
2828
"github.com/cectc/dbpack/pkg/misc"
2929
"github.com/cectc/dbpack/pkg/resource"
30+
"github.com/cectc/dbpack/pkg/tracing"
3031
"github.com/cectc/dbpack/third_party/parser/ast"
3132
"github.com/cectc/dbpack/third_party/parser/format"
3233
)
@@ -49,13 +50,17 @@ func NewQueryUpdateExecutor(
4950
}
5051

5152
func (executor *queryUpdateExecutor) BeforeImage(ctx context.Context) (*schema.TableRecords, error) {
52-
tableMeta, err := executor.GetTableMeta(ctx)
53+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchBeforeImage)
54+
defer span.End()
55+
tableMeta, err := executor.GetTableMeta(newCtx)
5356
if err != nil {
57+
tracing.RecordErrorSpan(span, err)
5458
return nil, err
5559
}
5660
sql := executor.buildBeforeImageSql(tableMeta)
5761
result, _, err := executor.conn.ExecuteWithWarningCount(sql, true)
5862
if err != nil {
63+
tracing.RecordErrorSpan(span, err)
5964
return nil, err
6065
}
6166
return schema.BuildTextRecords(tableMeta, result), nil
@@ -65,15 +70,18 @@ func (executor *queryUpdateExecutor) AfterImage(ctx context.Context) (*schema.Ta
6570
if executor.beforeImage == nil || len(executor.beforeImage.Rows) == 0 {
6671
return nil, nil
6772
}
68-
69-
tableMeta, err := executor.GetTableMeta(ctx)
73+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.ExecutorFetchAfterImage)
74+
defer span.End()
75+
tableMeta, err := executor.GetTableMeta(newCtx)
7076
if err != nil {
77+
tracing.RecordErrorSpan(span, err)
7178
return nil, err
7279
}
7380

7481
afterImageSql := executor.buildAfterImageSql(tableMeta)
7582
result, _, err := executor.conn.ExecuteWithWarningCount(afterImageSql, true)
7683
if err != nil {
84+
tracing.RecordErrorSpan(span, err)
7785
return nil, err
7886
}
7987
return schema.BuildTextRecords(tableMeta, result), nil

pkg/filter/dt/filter_mysql.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ func (f *_mysqlFilter) registerBranchTransaction(ctx context.Context, xid, resou
184184
branchID int64
185185
err error
186186
)
187+
newCtx, span := tracing.GetTraceSpan(ctx, tracing.BranchTransactionRegister)
188+
defer span.End()
189+
187190
br := &api.BranchRegisterRequest{
188191
XID: xid,
189192
ResourceID: resourceID,
@@ -192,7 +195,7 @@ func (f *_mysqlFilter) registerBranchTransaction(ctx context.Context, xid, resou
192195
ApplicationData: nil,
193196
}
194197
for retryCount := 0; retryCount < f.lockRetryTimes; retryCount++ {
195-
_, branchID, err = dt.GetDistributedTransactionManager().BranchRegister(ctx, br)
198+
_, branchID, err = dt.GetDistributedTransactionManager().BranchRegister(newCtx, br)
196199
if err == nil {
197200
break
198201
}

0 commit comments

Comments
 (0)