Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ require (
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mgechev/revive v1.2.1 // indirect
github.com/soheilhy/cmux v0.1.5-0.20210205191134-5ec6847320e5 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
Expand Down
24 changes: 0 additions & 24 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4 h1:tFXjAxje9thrTF4h57Ckik+scJjTWdwAtZqZPtOT48M=
github.com/chavacava/garif v0.0.0-20220316182200-5cad0b5181d4/go.mod h1:W8EnPSQ8Nv4fUjc/v1/8tHFqhuOJXnRub0dTfuAQktU=
github.com/cheggaaa/pb/v3 v3.0.8/go.mod h1:UICbiLec/XO6Hw6k+BHEtHeQFzzBH4i2/qk/ow1EJTA=
github.com/cheynewallace/tabby v1.1.1/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -227,10 +225,7 @@ github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M=
github.com/fatih/structtag v1.2.0 h1:/OdNE99OxoI/PqaW/SuSK9uxxT3f/tcSZgon/ssNSx4=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA=
Expand Down Expand Up @@ -565,24 +560,17 @@ github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaO
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-runewidth v0.0.12/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk=
github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU=
github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mattn/go-shellwords v1.0.3/go.mod h1:3xCvwCdWdlDJUrvuMn7Wuy9eWs4pE8vqg+NOMyg4B2o=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
Expand All @@ -592,17 +580,12 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/mediocregopher/mediocre-go-lib v0.0.0-20181029021733-cb65787f37ed/go.mod h1:dSsfyI2zABAdhcbvkXqgxOxrCsbYeHCPgrZkku60dSg=
github.com/mediocregopher/radix/v3 v3.3.0/go.mod h1:EmfVyvspXz1uZEyPBMyGK+kjWiKQGvsUt6O3Pj+LDCQ=
github.com/mgechev/dots v0.0.0-20190921121421-c36f7dcfbb81/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg=
github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517 h1:zpIH83+oKzcpryru8ceC6BxnoG8TBrhgAvRg8obzup0=
github.com/mgechev/dots v0.0.0-20210922191527-e955255bf517/go.mod h1:KQ7+USdGKfpPjXk4Ga+5XxQM4Lm4e3gAogrreFAYpOg=
github.com/mgechev/revive v1.0.2/go.mod h1:rb0dQy1LVAxW9SWy5R3LPUjevzUbUS316U5MFySA2lo=
github.com/mgechev/revive v1.2.1 h1:GjFml7ZsoR0IrQ2E2YIvWFNS5GPDV7xNwvA5GM1HZC4=
github.com/mgechev/revive v1.2.1/go.mod h1:+Ro3wqY4vakcYNtkBWdZC7dBg1xSB6sp054wWwmeFm0=
github.com/microcosm-cc/bluemonday v1.0.2/go.mod h1:iVP4YcDBq+n/5fb23BhYFvIMq/leAFZyRl6bYmGDlGc=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/minio/sio v0.3.0/go.mod h1:8b0yPp2avGThviy/+OCJBI6OMpvxoUuiLvE6F1lebhw=
github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc=
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
Expand Down Expand Up @@ -640,8 +623,6 @@ github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn
github.com/oleiade/reflections v1.0.1/go.mod h1:rdFxbxq4QXVZWj0F+e9jqjDkc7dbp97vkRixKo2JR60=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/olekukonko/tablewriter v0.0.4/go.mod h1:zq6QwlOf5SlnkVbMSr5EoBv3636FWnp+qbPhuoO21uA=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
Expand Down Expand Up @@ -767,7 +748,6 @@ github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qq
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -1191,13 +1171,10 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210831042530-f4d43177bf5e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464 h1:MpIuURY70f0iKp/oooEFtB2oENcHITo/z1b6u41pKCw=
golang.org/x/sys v0.0.0-20220519141025-dcacdad47464/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -1460,7 +1437,6 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0 h1:hjy8E9ON/egN1tAYqKb61G10WtihqetD4sz2H+8nIeA=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Expand Down
45 changes: 35 additions & 10 deletions pkg/dt/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/go-resty/resty/v2"
"github.com/pingcap/errors"
"k8s.io/client-go/util/workqueue"

"github.com/cectc/dbpack/pkg/config"
Expand Down Expand Up @@ -164,9 +165,28 @@ func (manager *DistributedTransactionManager) IsLockable(ctx context.Context, re
}

func (manager *DistributedTransactionManager) branchCommit(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
if bs.Type == api.TCC {
return manager.tccBranchCommit(bs)
var (
status api.BranchSession_BranchStatus
err error
)
switch bs.Type {
case api.TCC:
status, err = manager.tccBranchCommit(bs)
case api.AT:
status, err = manager._branchCommit(bs)
default:
return bs.Status, errors.New("should never happen!")
}
if status == api.Complete {
if err := manager.storageDriver.DeleteBranchSession(context.Background(), bs.BranchID); err != nil {
log.Error(err)
}
log.Debugf("branch session committed, branch id: %s, lock key: %s", bs.BranchID, bs.LockKey)
}
return status, err
}

func (manager *DistributedTransactionManager) _branchCommit(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
db := resource.GetDBManager().GetDB(bs.ResourceID)
if db == nil {
return 0, fmt.Errorf("DB resource is not exist, db name: %s", bs.ResourceID)
Expand All @@ -175,18 +195,23 @@ func (manager *DistributedTransactionManager) branchCommit(bs *api.BranchSession
if err := GetUndoLogManager().DeleteUndoLogByXID(db, bs.XID); err != nil {
return api.PhaseTwoCommitting, err
}
if err := manager.storageDriver.DeleteBranchSession(context.Background(), bs.BranchID); err != nil {
log.Error(err)
}
log.Debugf("branch session committed, branch id: %s, lock key: %s", bs.BranchID, bs.LockKey)
return api.Complete, nil
}

func (manager *DistributedTransactionManager) branchRollback(bs *api.BranchSession) (api.BranchSession_BranchStatus, error) {
if bs.Type == api.TCC {
return manager.tccBranchRollback(bs)
var (
status api.BranchSession_BranchStatus
lockKeys []string
err error
)
switch bs.Type {
case api.TCC:
status, err = manager.tccBranchRollback(bs)
case api.AT:
status, lockKeys, err = manager._branchRollback(bs)
default:
return bs.Status, errors.New("should never happen!")
}
status, lockKeys, err := manager._branchRollback(bs)
if len(lockKeys) > 0 {
if _, err := manager.storageDriver.ReleaseLockKeys(context.Background(), bs.ResourceID, lockKeys); err != nil {
log.Errorf("release lock and remove branch session failed, xid = %s, resource_id = %s, lockKeys = %s",
Expand All @@ -197,6 +222,7 @@ func (manager *DistributedTransactionManager) branchRollback(bs *api.BranchSessi
if err := manager.storageDriver.DeleteBranchSession(context.Background(), bs.BranchID); err != nil {
log.Error(err)
}
log.Debugf("branch session rollbacked, branch id: %s, lock key: %s", bs.BranchID, bs.LockKey)
}
return status, err
}
Expand All @@ -219,7 +245,6 @@ func (manager *DistributedTransactionManager) _branchRollback(bs *api.BranchSess
return bs.Status, nil, err
}
}
log.Debugf("branch session rollbacked, branch id: %s, lock key: %s", bs.BranchID, bs.LockKey)
return api.Complete, lockKeys, nil
}

Expand Down
67 changes: 38 additions & 29 deletions pkg/dt/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,29 @@ func (s *store) GlobalCommit(ctx context.Context, xid string) (api.GlobalSession
if err != nil {
return gs.Status, err
}
_, err = s.client.Put(ctx, xid, string(data))

var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(xid, string(data)))

branchKeys, err := s.GetBranchSessionKeys(ctx, xid)
if err != nil {
return api.Begin, err
return gs.Status, err
}
err = s.commitBranchSessions(ctx, branchKeys, &ops)
if err != nil {
return gs.Status, err
}

go func() {
branchKeys, err := s.GetBranchSessionKeys(ctx, xid)
if err != nil {
log.Error(err)
}
txn := s.client.Txn(ctx)
txn.Then(ops...)
txnResp, err := txn.Commit()
if err != nil {
return gs.Status, err
}
if !txnResp.Succeeded {
return gs.Status, errors.Errorf("update status to committing failed, xid %s", xid)
}

err = s.commitBranchSessions(ctx, branchKeys)
if err != nil {
log.Error(err)
}
}()
return api.Committing, nil
}

Expand All @@ -202,19 +209,27 @@ func (s *store) GlobalRollback(ctx context.Context, xid string) (api.GlobalSessi
if err != nil {
return gs.Status, err
}
_, err = s.client.Put(ctx, xid, string(data))
if err != nil {
return api.Begin, err
}

var ops []clientv3.Op
ops = append(ops, clientv3.OpPut(xid, string(data)))

branchKeys, err := s.GetBranchSessionKeys(ctx, xid)
if err != nil {
log.Error(err)
return gs.Status, err
}
err = s.rollbackBranchSessions(ctx, branchKeys, &ops)
if err != nil {
return gs.Status, err
}

err = s.rollbackBranchSessions(ctx, branchKeys)
txn := s.client.Txn(ctx)
txn.Then(ops...)
txnResp, err := txn.Commit()
if err != nil {
log.Error(err)
return gs.Status, err
}
if !txnResp.Succeeded {
return gs.Status, errors.Errorf("update status to rollbacking failed, xid %s", xid)
}

return api.Rollbacking, nil
Expand Down Expand Up @@ -377,7 +392,7 @@ func (s *store) releaseGlobalLocks(ctx context.Context, xid string) (bool, error
return true, nil
}

func (s *store) commitBranchSessions(ctx context.Context, branchSessionKeys []string) error {
func (s *store) commitBranchSessions(ctx context.Context, branchSessionKeys []string, ops *[]clientv3.Op) error {
for _, key := range branchSessionKeys {
bs, err := s.GetBranchSession(ctx, key)
if err != nil {
Expand All @@ -389,16 +404,13 @@ func (s *store) commitBranchSessions(ctx context.Context, branchSessionKeys []st
if err != nil {
return err
}
_, err = s.client.Put(ctx, key, string(data))
if err != nil {
return err
}
*ops = append(*ops, clientv3.OpPut(key, string(data)))
}
}
return nil
}

func (s *store) rollbackBranchSessions(ctx context.Context, branchSessionKeys []string) error {
func (s *store) rollbackBranchSessions(ctx context.Context, branchSessionKeys []string, ops *[]clientv3.Op) error {
for _, key := range branchSessionKeys {
bs, err := s.GetBranchSession(ctx, key)
if err != nil {
Expand All @@ -410,10 +422,7 @@ func (s *store) rollbackBranchSessions(ctx context.Context, branchSessionKeys []
if err != nil {
return err
}
_, err = s.client.Put(ctx, key, string(data))
if err != nil {
return err
}
*ops = append(*ops, clientv3.OpPut(key, string(data)))
}
}
return nil
Expand Down