Skip to content

fix(writemarker):improved ReddemWriteMarkers to fix #624 #647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 25, 2022
101 changes: 71 additions & 30 deletions code/go/0chain.net/blobbercore/writemarker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
. "github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"

"go.uber.org/zap"
)
Expand All @@ -16,51 +16,92 @@ func SetupWorkers(ctx context.Context) {
go startRedeemWriteMarkers(ctx)
}

func RedeemMarkersForAllocation(ctx context.Context, allocationObj *allocation.Allocation) error {
rctx := datastore.GetStore().CreateTransaction(ctx)
db := datastore.GetStore().GetTransaction(rctx)
defer func() {
err := db.Commit().Error
if err != nil {
Logger.Error("Error committing the writemarker redeem", zap.Error(err))
}
rctx.Done()
}()
func redeemWriterMarkersForAllocation(allocationObj *allocation.Allocation) {

db := datastore.GetStore().GetDB()
var err error

writemarkers := make([]*WriteMarkerEntity, 0)
var writemarkers []*WriteMarkerEntity

err := db.Not(WriteMarkerEntity{Status: Committed}).
err = db.Not(WriteMarkerEntity{Status: Committed}).
Where(WriteMarker{AllocationID: allocationObj.ID}).
Order("sequence").
Find(&writemarkers).Error
if err != nil {
return err
logging.Logger.Error("Error redeeming the write marker. failed to load allocation's writemarker ",
zap.Any("allocation", allocationObj.ID),
zap.Any("error", err))
return
}
startredeem := false
for _, wm := range writemarkers {
if wm.WM.PreviousAllocationRoot == allocationObj.LatestRedeemedWM && !startredeem {
startredeem = true
}
if startredeem || allocationObj.LatestRedeemedWM == "" {
err := wm.RedeemMarker(rctx)
if err != nil {
Logger.Error("Error redeeming the write marker.", zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))
continue
}
err = db.Model(allocationObj).Updates(allocation.Allocation{LatestRedeemedWM: wm.WM.AllocationRoot}).Error
err = redeemWriteMarker(allocationObj, wm)
if err != nil {
Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err))
return err
return
}
allocationObj.LatestRedeemedWM = wm.WM.AllocationRoot
Logger.Info("Success Redeeming the write marker", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))
}
}

if allocationObj.LatestRedeemedWM == allocationObj.AllocationRoot {
db.Model(allocationObj).
Where("allocation_root = ? AND allocation_root = latest_redeemed_write_marker", allocationObj.AllocationRoot).
Update("is_redeem_required", false)
err = db.Exec("UPDATE allocations SET is_redeem_required=? WHERE allocation_id = ? ", false, allocationObj.ID).Error
if err != nil {
logging.Logger.Error("Error redeeming the write marker. failed to update allocation's is_redeem_required ",
zap.Any("allocation", allocationObj.ID),
zap.Any("error", err))
}
}
}

func redeemWriteMarker(allocationObj *allocation.Allocation, wm *WriteMarkerEntity) error {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
db := datastore.GetStore().GetTransaction(ctx)

shouldRollback := false

defer func() {
if shouldRollback {
if err := db.Rollback(); err != nil {
logging.Logger.Error("Error rollback on redeeming the write marker.",
zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))
}
}
}()

err := wm.RedeemMarker(ctx)
if err != nil {
logging.Logger.Error("Error redeeming the write marker.",
zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err))

shouldRollback = true

return err
}

err = db.Exec("UPDATE allocations SET latest_redeemed_write_marker=? WHERE id=?",
wm.WM.AllocationRoot, allocationObj.ID).Error
if err != nil {
logging.Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed",
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err))
shouldRollback = true
return err
}

err = db.Commit().Error
if err != nil {
logging.Logger.Error("Error committing the writemarker redeem",
zap.Any("allocation", allocationObj.ID),
zap.Error(err))
shouldRollback = true
return err
}

allocationObj.LatestRedeemedWM = wm.WM.AllocationRoot
logging.Logger.Info("Success Redeeming the write marker", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID))

return nil
}

Expand All @@ -73,9 +114,9 @@ func startRedeemWriteMarkers(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
// Logger.Info("Trying to redeem writemarkers.",
// zap.Any("numOfWorkers", numOfWorkers))
redeemWriteMarker(ctx)
logging.Logger.Info("Trying to redeem writemarkers.",
zap.Any("numOfWorkers", config.Configuration.WMRedeemNumWorkers))
redeemWriteMarkers()
}
}
}
31 changes: 8 additions & 23 deletions code/go/0chain.net/blobbercore/writemarker/writemarker.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,27 @@
package writemarker

import (
"context"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/remeh/sizedwaitgroup"
"go.uber.org/zap"
)

func redeemWriteMarker(ctx context.Context) {
defer func() {
if r := recover(); r != nil {
logging.Logger.Error("[recover] redeemWriteMarker", zap.Any("err", r))
}
}()
func redeemWriteMarkers() {

rctx := datastore.GetStore().CreateTransaction(ctx)
db := datastore.GetStore().GetTransaction(rctx)
allocations := make([]*allocation.Allocation, 0)
alloc := &allocation.Allocation{IsRedeemRequired: true}
db.Where(alloc).Find(&allocations)
db := datastore.GetStore().GetDB()
var allocations []*allocation.Allocation
db.Where(&allocation.Allocation{IsRedeemRequired: true}).
Find(&allocations)
if len(allocations) > 0 {
swg := sizedwaitgroup.New(config.Configuration.WMRedeemNumWorkers)
for _, allocationObj := range allocations {
swg.Add()
go func(redeemCtx context.Context, allocationObj *allocation.Allocation) {
err := RedeemMarkersForAllocation(redeemCtx, allocationObj)
if err != nil {
logging.Logger.Error("Error redeeming the write marker for allocation.", zap.Any("allocation", allocationObj.ID), zap.Error(err))
}
go func(allocationObj *allocation.Allocation) {
redeemWriterMarkersForAllocation(allocationObj)
swg.Done()
}(ctx, allocationObj)
}(allocationObj)
}
swg.Wait()
}
db.Rollback()
rctx.Done()
}