-
Notifications
You must be signed in to change notification settings - Fork 23
fix(writemarker):improved ReddemWriteMarkers to fix #624 #647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
40c842f
e358b54
490018d
ee7306d
e69c130
be5d9cf
019d3fc
239e104
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" | ||
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" | ||
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" | ||
"github.com/0chain/blobber/code/go/0chain.net/core/logging" | ||
. "github.com/0chain/blobber/code/go/0chain.net/core/logging" | ||
|
||
"go.uber.org/zap" | ||
|
@@ -16,41 +17,61 @@ func SetupWorkers(ctx context.Context) { | |
go startRedeemWriteMarkers(ctx) | ||
} | ||
|
||
func RedeemMarkersForAllocation(ctx context.Context, allocationObj *allocation.Allocation) error { | ||
rctx := datastore.GetStore().CreateTransaction(ctx) | ||
db := datastore.GetStore().GetTransaction(rctx) | ||
func redeemWriterMarkersForAllocation(allocationObj *allocation.Allocation) { | ||
defer func() { | ||
err := db.Commit().Error | ||
if err != nil { | ||
Logger.Error("Error committing the writemarker redeem", zap.Error(err)) | ||
if r := recover(); r != nil { | ||
cnlangzi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
logging.Logger.Error("[recover] redeemWriterMarkersForAllocation", zap.Any("err", r)) | ||
} | ||
rctx.Done() | ||
}() | ||
|
||
writemarkers := make([]*WriteMarkerEntity, 0) | ||
ctx := datastore.GetStore().CreateTransaction(context.TODO()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cnlangzi This For example, if there were 10 writemarkers to redeem for an allocation. You redeemed 9 writemarkers and I think we should use separate db transaction for each writemarker updates so that once redeemed writemarker has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it happens on all blobber transaction and 0chain transaction whatever it is 1 or more transactions. I are not sure if 0chain transaction is idempotent. that is problem on all distributed transaction design. that is why we need it is idempotent, or 2-steps transactions. cc @peterlimg There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @cnlangzi The other blobber transactions I think is a different story. We just need to create new transaction and commit with it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let me explain it more
what happens on blobber side on next round? do you think it is the same story ? that is why I said we should raise an issue to discuss and confirm it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. anyway, updated code as per your suggestion. |
||
db := datastore.GetStore().GetTransaction(ctx) | ||
var err error | ||
|
||
err := db.Not(WriteMarkerEntity{Status: Committed}). | ||
done := false | ||
cnlangzi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
defer func() { | ||
|
||
if !done { | ||
if err := db.Rollback().Error; err != nil { | ||
Logger.Error("Error rollbacking the writemarker redeem", | ||
zap.Any("allocation", allocationObj.ID), | ||
zap.Error(err)) | ||
} | ||
} | ||
ctx.Done() | ||
}() | ||
|
||
var writemarkers []*WriteMarkerEntity | ||
|
||
err = db.Not(WriteMarkerEntity{Status: Committed}). | ||
Where(WriteMarker{AllocationID: allocationObj.ID}). | ||
Order("sequence"). | ||
Find(&writemarkers).Error | ||
if err != nil { | ||
return err | ||
Logger.Error("Error redeeming the write marker. failed to load allocation's writemarker ", | ||
zap.Any("allocation", allocationObj.ID), | ||
zap.Any("error", err)) | ||
return | ||
} | ||
startredeem := false | ||
for _, wm := range writemarkers { | ||
if wm.WM.PreviousAllocationRoot == allocationObj.LatestRedeemedWM && !startredeem { | ||
startredeem = true | ||
} | ||
if startredeem || allocationObj.LatestRedeemedWM == "" { | ||
err := wm.RedeemMarker(rctx) | ||
err = wm.RedeemMarker(ctx) | ||
if err != nil { | ||
Logger.Error("Error redeeming the write marker.", zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err)) | ||
continue | ||
Logger.Error("Error redeeming the write marker.", | ||
zap.Any("wm", wm.WM.AllocationID), zap.Any("error", err)) | ||
return | ||
} | ||
err = db.Model(allocationObj).Updates(allocation.Allocation{LatestRedeemedWM: wm.WM.AllocationRoot}).Error | ||
if err != nil { | ||
Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err)) | ||
return err | ||
Logger.Error("Error redeeming the write marker. Allocation latest wm redeemed update failed", | ||
zap.Any("wm", wm.WM.AllocationRoot), zap.Any("error", err)) | ||
|
||
return | ||
} | ||
allocationObj.LatestRedeemedWM = wm.WM.AllocationRoot | ||
Logger.Info("Success Redeeming the write marker", zap.Any("wm", wm.WM.AllocationRoot), zap.Any("txn", wm.CloseTxnID)) | ||
|
@@ -61,7 +82,15 @@ func RedeemMarkersForAllocation(ctx context.Context, allocationObj *allocation.A | |
Where("allocation_root = ? AND allocation_root = latest_redeemed_write_marker", allocationObj.AllocationRoot). | ||
Update("is_redeem_required", false) | ||
} | ||
return nil | ||
|
||
err = db.Commit().Error | ||
if err != nil { | ||
Logger.Error("Error committing the writemarker redeem", | ||
zap.Any("allocation", allocationObj.ID), | ||
zap.Error(err)) | ||
} | ||
|
||
done = true | ||
} | ||
|
||
func startRedeemWriteMarkers(ctx context.Context) { | ||
|
@@ -73,9 +102,9 @@ func startRedeemWriteMarkers(ctx context.Context) { | |
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
// Logger.Info("Trying to redeem writemarkers.", | ||
// zap.Any("numOfWorkers", numOfWorkers)) | ||
redeemWriteMarker(ctx) | ||
Logger.Info("Trying to redeem writemarkers.", | ||
zap.Any("numOfWorkers", config.Configuration.WMRedeemNumWorkers)) | ||
redeemWriteMarkers() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,42 +1,27 @@ | ||
package writemarker | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" | ||
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" | ||
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" | ||
"github.com/0chain/blobber/code/go/0chain.net/core/logging" | ||
"github.com/remeh/sizedwaitgroup" | ||
"go.uber.org/zap" | ||
) | ||
|
||
func redeemWriteMarker(ctx context.Context) { | ||
defer func() { | ||
if r := recover(); r != nil { | ||
logging.Logger.Error("[recover] redeemWriteMarker", zap.Any("err", r)) | ||
} | ||
}() | ||
func redeemWriteMarkers() { | ||
|
||
rctx := datastore.GetStore().CreateTransaction(ctx) | ||
db := datastore.GetStore().GetTransaction(rctx) | ||
allocations := make([]*allocation.Allocation, 0) | ||
alloc := &allocation.Allocation{IsRedeemRequired: true} | ||
db.Where(alloc).Find(&allocations) | ||
db := datastore.GetStore().GetDB() | ||
var allocations []*allocation.Allocation | ||
db.Where(&allocation.Allocation{IsRedeemRequired: true}). | ||
Find(&allocations) | ||
if len(allocations) > 0 { | ||
swg := sizedwaitgroup.New(config.Configuration.WMRedeemNumWorkers) | ||
for _, allocationObj := range allocations { | ||
swg.Add() | ||
go func(redeemCtx context.Context, allocationObj *allocation.Allocation) { | ||
err := RedeemMarkersForAllocation(redeemCtx, allocationObj) | ||
if err != nil { | ||
logging.Logger.Error("Error redeeming the write marker for allocation.", zap.Any("allocation", allocationObj.ID), zap.Error(err)) | ||
} | ||
go func(allocationObj *allocation.Allocation) { | ||
redeemWriterMarkersForAllocation(allocationObj) | ||
swg.Done() | ||
}(ctx, allocationObj) | ||
}(allocationObj) | ||
} | ||
swg.Wait() | ||
} | ||
db.Rollback() | ||
rctx.Done() | ||
} |
Uh oh!
There was an error while loading. Please reload this page.