@@ -29,6 +29,7 @@ import (
2929
3030 "github.com/cectc/dbpack/pkg/config"
3131 "github.com/cectc/dbpack/pkg/dt/api"
32+ "github.com/cectc/dbpack/pkg/dt/metrics"
3233 "github.com/cectc/dbpack/pkg/dt/storage"
3334 "github.com/cectc/dbpack/pkg/log"
3435 "github.com/cectc/dbpack/pkg/misc"
@@ -57,6 +58,7 @@ func InitDistributedTransactionManager(conf *config.DistributedTransaction, stor
5758 if conf .RetryDeadThreshold == 0 {
5859 conf .RetryDeadThreshold = DefaultRetryDeadThreshold
5960 }
61+
6062 manager = & DistributedTransactionManager {
6163 applicationID : conf .ApplicationID ,
6264 storageDriver : storageDriver ,
@@ -110,6 +112,7 @@ func (manager *DistributedTransactionManager) Begin(ctx context.Context, transac
110112 if err := manager .storageDriver .AddGlobalSession (ctx , gt ); err != nil {
111113 return "" , err
112114 }
115+ metrics .GlobalTransactionCounter .WithLabelValues (manager .applicationID , transactionName , metrics .TransactionStatusActive ).Inc ()
113116 manager .globalSessionQueue .AddAfter (gt , time .Duration (timeout )* time .Millisecond )
114117 log .Infof ("successfully begin global transaction xid = {%s}" , gt .XID )
115118 return xid , nil
@@ -144,6 +147,7 @@ func (manager *DistributedTransactionManager) BranchRegister(ctx context.Context
144147 if err := manager .storageDriver .AddBranchSession (ctx , bs ); err != nil {
145148 return "" , 0 , err
146149 }
150+ metrics .BranchTransactionCounter .WithLabelValues (manager .applicationID , in .ResourceID , metrics .TransactionStatusActive ).Inc ()
147151 return branchID , branchSessionID , nil
148152}
149153
@@ -220,14 +224,15 @@ func (manager *DistributedTransactionManager) _branchRollback(bs *api.BranchSess
220224}
221225
222226func (manager * DistributedTransactionManager ) processGlobalSessions () error {
223- globalSessions , err := manager .storageDriver .ListGlobalSession (context .Background (), manager .applicationID )
227+ ctx := context .Background ()
228+ globalSessions , err := manager .storageDriver .ListGlobalSession (ctx , manager .applicationID )
224229 if err != nil {
225230 return err
226231 }
227232 for _ , gs := range globalSessions {
228233 if gs .Status == api .Begin {
229234 if isGlobalSessionTimeout (gs ) {
230- if _ , err := manager .Rollback (context . Background () , gs .XID ); err != nil {
235+ if _ , err := manager .Rollback (ctx , gs .XID ); err != nil {
231236 return err
232237 }
233238 }
@@ -244,11 +249,21 @@ func (manager *DistributedTransactionManager) processGlobalSessions() error {
244249 if err != nil {
245250 return err
246251 }
252+ // branch session has been committed or rollbacked
247253 if len (bsKeys ) == 0 {
248254 if err := manager .storageDriver .DeleteGlobalSession (context .Background (), gs .XID ); err != nil {
249255 return err
250256 }
251257 log .Debugf ("global session finished, key: %s" , gs .XID )
258+ switch gs .Status {
259+ case api .Committing :
260+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusCommitted )
261+ case api .Rollbacking :
262+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusRollbacked )
263+ }
264+ } else {
265+ // global transaction timeout
266+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusTimeout )
252267 }
253268 }
254269 }
@@ -283,7 +298,7 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte
283298 }
284299 if newGlobalSession .Status == api .Begin {
285300 if isGlobalSessionTimeout (newGlobalSession ) {
286- _ , err : = manager .Rollback (context .Background (), newGlobalSession .XID )
301+ _ , err = manager .Rollback (context .Background (), newGlobalSession .XID )
287302 if err != nil {
288303 log .Error (err )
289304 }
@@ -299,6 +314,15 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte
299314 log .Error (err )
300315 }
301316 log .Debugf ("global session finished, key: %s" , newGlobalSession .XID )
317+ switch newGlobalSession .Status {
318+ case api .Committing :
319+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusCommitted )
320+ case api .Rollbacking :
321+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusRollbacked )
322+ }
323+ } else {
324+ // global transaction timeout.
325+ manager .recordGlobalTransactionMetric (gs .TransactionName , metrics .TransactionStatusRollbacked )
302326 }
303327 }
304328 return true
@@ -320,6 +344,7 @@ func (manager *DistributedTransactionManager) processBranchSessions() error {
320344 manager .branchSessionQueue .Add (bs )
321345 case api .PhaseTwoRollbacking :
322346 if manager .IsRollingBackDead (bs ) {
347+ metrics .BranchTransactionCounter .WithLabelValues (bs .ApplicationID , bs .ResourceID , metrics .TransactionStatusTimeout )
323348 log .Debugf ("branch session rollback dead, key: %s, lock key: %s" , bs .BranchID , bs .LockKey )
324349 if manager .rollbackRetryTimeoutUnlockEnable {
325350 log .Debugf ("branch id: %d, lock key: %s released" , bs .BranchID , bs .LockKey )
@@ -356,8 +381,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
356381 defer manager .branchSessionQueue .Done (obj )
357382
358383 bs := obj .(* api.BranchSession )
384+ var (
385+ status api.BranchSession_BranchStatus
386+ transactionStatus string
387+ err error
388+ )
359389 if bs .Status == api .PhaseTwoCommitting {
360- status , err := manager .branchCommit (bs )
390+ transactionStatus = metrics .TransactionStatusCommitted
391+ status , err = manager .branchCommit (bs )
361392 if err != nil {
362393 log .Error (err )
363394 manager .branchSessionQueue .Add (obj )
@@ -367,14 +398,16 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
367398 }
368399 }
369400 if bs .Status == api .PhaseTwoRollbacking {
401+ transactionStatus = metrics .TransactionStatusRollbacked
370402 if manager .IsRollingBackDead (bs ) {
403+ metrics .BranchTransactionCounter .WithLabelValues (bs .ApplicationID , bs .ResourceID , metrics .TransactionStatusTimeout )
371404 if manager .rollbackRetryTimeoutUnlockEnable {
372- if _ , err := manager .storageDriver .ReleaseLockKeys (context . Background () , bs .ResourceID , []string {bs .LockKey }); err != nil {
405+ if _ , err := manager .storageDriver .ReleaseLockKeys (ctx , bs .ResourceID , []string {bs .LockKey }); err != nil {
373406 log .Error (err )
374407 }
375408 }
376409 } else {
377- status , err : = manager .branchRollback (bs )
410+ status , err = manager .branchRollback (bs )
378411 if err != nil {
379412 log .Error (err )
380413 manager .branchSessionQueue .Add (obj )
@@ -384,6 +417,14 @@ func (manager *DistributedTransactionManager) processNextBranchSession(ctx conte
384417 }
385418 }
386419 }
420+
421+ if status == api .Complete {
422+ metrics .BranchTransactionTimer .WithLabelValues (manager .applicationID , bs .ResourceID , transactionStatus ).Observe (
423+ float64 (int64 (misc .CurrentTimeMillis ()) - bs .BeginTime ))
424+ metrics .BranchTransactionCounter .WithLabelValues (manager .applicationID , bs .ResourceID , metrics .TransactionStatusActive ).Desc ()
425+ metrics .BranchTransactionCounter .WithLabelValues (manager .applicationID , bs .ResourceID , transactionStatus ).Inc ()
426+ }
427+
387428 return true
388429}
389430
@@ -395,6 +436,11 @@ func (manager *DistributedTransactionManager) watchBranchSession() {
395436 }
396437}
397438
439+ func (manager * DistributedTransactionManager ) recordGlobalTransactionMetric (transactionName string , transactionStatus string ) {
440+ metrics .GlobalTransactionCounter .WithLabelValues (manager .applicationID , transactionName , metrics .TransactionStatusActive ).Desc ()
441+ metrics .GlobalTransactionCounter .WithLabelValues (manager .applicationID , transactionName , transactionStatus ).Inc ()
442+ }
443+
398444func isGlobalSessionTimeout (gs * api.GlobalSession ) bool {
399445 return (misc .CurrentTimeMillis () - uint64 (gs .BeginTime )) > uint64 (gs .Timeout )
400446}
0 commit comments