@@ -17,23 +17,9 @@ func SetupWorkers(ctx context.Context) {
17
17
}
18
18
19
19
func redeemWriterMarkersForAllocation (allocationObj * allocation.Allocation ) {
20
- ctx := datastore .GetStore ().CreateTransaction (context .TODO ())
21
- db := datastore .GetStore ().GetTransaction (ctx )
22
- var err error
23
20
24
- done := false
25
-
26
- defer func () {
27
-
28
- if ! done {
29
- if err := db .Rollback ().Error ; err != nil {
30
- logging .Logger .Error ("Error rollbacking the writemarker redeem" ,
31
- zap .Any ("allocation" , allocationObj .ID ),
32
- zap .Error (err ))
33
- }
34
- }
35
- ctx .Done ()
36
- }()
21
+ db := datastore .GetStore ().GetDB ()
22
+ var err error
37
23
38
24
var writemarkers []* WriteMarkerEntity
39
25
@@ -53,37 +39,68 @@ func redeemWriterMarkersForAllocation(allocationObj *allocation.Allocation) {
53
39
startredeem = true
54
40
}
55
41
if startredeem || allocationObj .LatestRedeemedWM == "" {
56
- err = wm .RedeemMarker (ctx )
42
+
43
+ err = redeemWriteMarker (allocationObj , wm )
57
44
if err != nil {
58
- logging .Logger .Error ("Error redeeming the write marker." ,
59
- zap .Any ("wm" , wm .WM .AllocationID ), zap .Any ("error" , err ))
60
45
return
61
46
}
62
- err = db .Model (allocationObj ).Updates (allocation.Allocation {LatestRedeemedWM : wm .WM .AllocationRoot }).Error
63
- if err != nil {
64
- logging .Logger .Error ("Error redeeming the write marker. Allocation latest wm redeemed update failed" ,
65
- zap .Any ("wm" , wm .WM .AllocationRoot ), zap .Any ("error" , err ))
66
47
67
- return
68
- }
69
- allocationObj .LatestRedeemedWM = wm .WM .AllocationRoot
70
- logging .Logger .Info ("Success Redeeming the write marker" , zap .Any ("wm" , wm .WM .AllocationRoot ), zap .Any ("txn" , wm .CloseTxnID ))
71
48
}
72
49
}
50
+
73
51
if allocationObj .LatestRedeemedWM == allocationObj .AllocationRoot {
74
52
db .Model (allocationObj ).
75
53
Where ("allocation_root = ? AND allocation_root = latest_redeemed_write_marker" , allocationObj .AllocationRoot ).
76
54
Update ("is_redeem_required" , false )
77
55
}
78
56
57
+ }
58
+
59
+ func redeemWriteMarker (allocationObj * allocation.Allocation , wm * WriteMarkerEntity ) error {
60
+ ctx := datastore .GetStore ().CreateTransaction (context .TODO ())
61
+ db := datastore .GetStore ().GetTransaction (ctx )
62
+
63
+ shouldRollback := false
64
+
65
+ defer func () {
66
+ if shouldRollback {
67
+ if err := db .Rollback (); err != nil {
68
+ logging .Logger .Error ("Error rollback on redeeming the write marker." ,
69
+ zap .Any ("wm" , wm .WM .AllocationID ), zap .Any ("error" , err ))
70
+ }
71
+ }
72
+ }()
73
+
74
+ err := wm .RedeemMarker (ctx )
75
+ if err != nil {
76
+ logging .Logger .Error ("Error redeeming the write marker." ,
77
+ zap .Any ("wm" , wm .WM .AllocationID ), zap .Any ("error" , err ))
78
+
79
+ shouldRollback = true
80
+
81
+ return err
82
+ }
83
+ err = db .Model (allocationObj ).Updates (allocation.Allocation {LatestRedeemedWM : wm .WM .AllocationRoot }).Error
84
+ if err != nil {
85
+ logging .Logger .Error ("Error redeeming the write marker. Allocation latest wm redeemed update failed" ,
86
+ zap .Any ("wm" , wm .WM .AllocationRoot ), zap .Any ("error" , err ))
87
+ shouldRollback = true
88
+
89
+ return err
90
+ }
91
+ allocationObj .LatestRedeemedWM = wm .WM .AllocationRoot
92
+ logging .Logger .Info ("Success Redeeming the write marker" , zap .Any ("wm" , wm .WM .AllocationRoot ), zap .Any ("txn" , wm .CloseTxnID ))
93
+
79
94
err = db .Commit ().Error
80
95
if err != nil {
81
96
logging .Logger .Error ("Error committing the writemarker redeem" ,
82
97
zap .Any ("allocation" , allocationObj .ID ),
83
98
zap .Error (err ))
99
+ shouldRollback = true
100
+ return err
84
101
}
85
102
86
- done = true
103
+ return nil
87
104
}
88
105
89
106
func startRedeemWriteMarkers (ctx context.Context ) {
0 commit comments