Skip to content

Fix/challenge log #400

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/allocation/newfilechange.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type NewFileChange struct {
//client side:
Path string `json:"filepath" validation:"required"`
//client side:
ActualHash string `json:"actual_hash,omitempty" validation:"required"`
ActualHash string `json:"actual_hash,omitempty" `
//client side:
ActualSize int64 `json:"actual_size,omitempty" validation:"required"`
ActualSize int64 `json:"actual_size,omitempty"`
//client side:
ActualThumbnailSize int64 `json:"actual_thumb_size"`
//client side:
Expand Down
35 changes: 21 additions & 14 deletions code/go/0chain.net/blobbercore/challenge/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/chain"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/lock"
"github.com/0chain/blobber/code/go/0chain.net/core/node"
"github.com/0chain/blobber/code/go/0chain.net/core/transaction"
Expand Down Expand Up @@ -40,7 +41,7 @@ func syncOpenChallenges(ctx context.Context) {
retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain())

if err != nil {
logging.Logger.Error("Error getting the open challenges from the blockchain", zap.Error(err))
logging.Logger.Error("[challenge]open: ", zap.Error(err))
} else {

bytesReader := bytes.NewBuffer(retBytes)
Expand All @@ -50,11 +51,11 @@ func syncOpenChallenges(ctx context.Context) {
errd := d.Decode(&blobberChallenges)

if errd != nil {
logging.Logger.Error("Error in unmarshal of the sharder response", zap.Error(errd))
logging.Logger.Error("[challenge]json: ", zap.Error(errd))
} else {
for _, challengeObj := range blobberChallenges.Challenges {
if challengeObj == nil || len(challengeObj.ChallengeID) == 0 {
logging.Logger.Info("No challenge entity from the challenge map")
logging.Logger.Info("[challenge]open: No challenge entity from the challenge map")
continue
}

Expand All @@ -69,7 +70,7 @@ func syncOpenChallenges(ctx context.Context) {

if err != nil {
if !errors.Is(err, gorm.ErrRecordNotFound) {
logging.Logger.Info("Error in load challenge entity from database ", zap.Error(err))
logging.Logger.Error("[challenge]db: ", zap.Error(err))
continue
}
}
Expand All @@ -78,13 +79,16 @@ func syncOpenChallenges(ctx context.Context) {
isNextChallengeOnChain := latestChallenge == nil || latestChallenge.ChallengeID == challengeObj.PrevChallengeID

if isFirstChallengeInDatabase || isNextChallengeOnChain {
logging.Logger.Info("Adding new challenge found from blockchain", zap.String("challenge", challengeObj.ChallengeID))
logging.Logger.Info("[challenge]add: ", zap.String("challenge_id", challengeObj.ChallengeID))
challengeObj.Status = Accepted
challengeObj.CreatedAt = common.ToTime(challengeObj.Created)
challengeObj.UpdatedAt = challengeObj.CreatedAt

if err := challengeObj.Save(tx); err != nil {
logging.Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err))
logging.Logger.Error("[challenge]db: ", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err))
}
} else {
logging.Logger.Error("Challenge chain is not valid")
logging.Logger.Error("[challenge]Challenge chain is not valid")
}

}
Expand All @@ -110,26 +114,29 @@ func processAccepted(ctx context.Context) {
if len(openchallenges) > 0 {
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
for _, openchallenge := range openchallenges {
logging.Logger.Info("Processing the challenge", zap.Any("challenge_id", openchallenge.ChallengeID), zap.Any("openchallenge", openchallenge))
logging.Logger.Info("[challenge]process: ", zap.String("challenge_id", openchallenge.ChallengeID))
err := openchallenge.UnmarshalFields()
if err != nil {
logging.Logger.Error("Error unmarshaling challenge entity.", zap.Error(err))
logging.Logger.Error("[challenge]json: ", zap.Error(err))
continue
}
swg.Add()
go func(redeemCtx context.Context, challengeEntity *ChallengeEntity) {
defer swg.Done()
redeemCtx = datastore.GetStore().CreateTransaction(redeemCtx)
defer redeemCtx.Done()
err := loadValidationTickets(redeemCtx, challengeEntity)
if err != nil {
logging.Logger.Error("Getting validation tickets failed", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err))
logging.Logger.Error("[challenge]validate: ", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err))
return
}
db := datastore.GetStore().GetTransaction(redeemCtx)
err = db.Commit().Error
if err != nil {
logging.Logger.Error("Error commiting the readmarker redeem", zap.Error(err))
logging.Logger.Error("[challenge]db: ", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err))
return
}
swg.Done()

}(ctx, openchallenge)
}
swg.Wait()
Expand All @@ -145,13 +152,13 @@ func loadValidationTickets(ctx context.Context, challengeObj *ChallengeEntity) e

defer func() {
if r := recover(); r != nil {
logging.Logger.Error("[recover] LoadValidationTickets", zap.Any("err", r))
logging.Logger.Error("[recover]LoadValidationTickets", zap.Any("err", r))
}
}()

err := challengeObj.LoadValidationTickets(ctx)
if err != nil {
logging.Logger.Error("Error getting the validation tickets", zap.Error(err), zap.String("challenge_id", challengeObj.ChallengeID))
logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err))
}

return err
Expand Down
4 changes: 4 additions & 0 deletions code/go/0chain.net/blobbercore/challenge/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
Expand Down Expand Up @@ -76,6 +77,9 @@ type ChallengeEntity struct {
ObjectPathString datatypes.JSON `json:"-" gorm:"column:object_path"`
ObjectPath *reference.ObjectPath `json:"object_path" gorm:"-"`
Created common.Timestamp `json:"created" gorm:"-"`

CreatedAt time.Time `gorm:"created_at"`
UpdatedAt time.Time `gorm:"updated_at"`
}

func (ChallengeEntity) TableName() string {
Expand Down
49 changes: 31 additions & 18 deletions code/go/0chain.net/blobbercore/challenge/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,23 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio

func (cr *ChallengeEntity) ErrorChallenge(ctx context.Context, err error) {
cr.StatusMessage = err.Error()
cr.UpdatedAt = time.Now().UTC()

if err := cr.Save(ctx); err != nil {
Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
}

// LoadValidationTickets load validation tickets
func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
if len(cr.Validators) == 0 {
cr.StatusMessage = "No validators assigned to the challange"
cr.StatusMessage = "No validators assigned to the challenge"
cr.UpdatedAt = time.Now().UTC()

if err := cr.Save(ctx); err != nil {
Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
return common.NewError("no_validators", "No validators assigned to the challange")
return common.NewError("no_validators", "No validators assigned to the challenge")
}

allocationObj, err := allocation.GetAllocationByID(ctx, cr.AllocationID)
Expand All @@ -97,16 +101,19 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
r := rand.New(rand.NewSource(cr.RandomNumber))
blockNum = r.Int63n(rootRef.NumBlocks)
blockNum = blockNum + 1
cr.BlockNum = blockNum
} else {
Logger.Error("Got a challenge for a blank allocation")
err = common.NewError("allocation_is_blank", "Got a challenge for a blank allocation")
cr.ErrorChallenge(ctx, err)
return err
}

cr.BlockNum = blockNum
if err != nil {
cr.ErrorChallenge(ctx, err)
return err
}
Logger.Info("blockNum for challenge", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))

Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber))
objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum)
if err != nil {
cr.ErrorChallenge(ctx, err)
Expand Down Expand Up @@ -165,7 +172,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {

postDataBytes, err := json.Marshal(postData)
if err != nil {
Logger.Error("Error in marshalling the post data for validation. " + err.Error())
Logger.Error("[db]form: " + err.Error())
cr.ErrorChallenge(ctx, err)
return err
}
Expand All @@ -185,23 +192,23 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {

resp, err := util.SendPostRequest(url, postDataBytes, nil)
if err != nil {
Logger.Info("Got error from the validator.", zap.Any("error", err.Error()))
Logger.Info("[challenge]post: ", zap.Any("error", err.Error()))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
}
var validationTicket ValidationTicket
err = json.Unmarshal(resp, &validationTicket)
if err != nil {
Logger.Info("Got error decoding from the validator response .", zap.Any("resp", string(resp)), zap.Any("error", err.Error()))
Logger.Error("[challenge]resp: ", zap.String("validator", validator.ID), zap.Any("resp", string(resp)), zap.Any("error", err.Error()))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
}
Logger.Info("Got response from the validator.", zap.Any("validator_response", validationTicket))
Logger.Info("[challenge]resp: Got response from the validator.", zap.Any("validator_response", validationTicket))
verified, err := validationTicket.VerifySign()
if err != nil || !verified {
Logger.Info("Validation ticket from validator could not be verified.")
Logger.Error("[challenge]ticket: Validation ticket from validator could not be verified.", zap.String("validator", validator.ID))
delete(responses, validator.ID)
cr.ValidationTickets[i] = nil
continue
Expand All @@ -219,22 +226,25 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error {
if vt.Result {
numSuccess++
} else {
Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID))
numFailure++
}
numValidatorsResponded++
}
}

Logger.Info("validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses))
Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses))
if numSuccess > (len(cr.Validators)/2) || numFailure > (len(cr.Validators)/2) || numValidatorsResponded == len(cr.Validators) {
if numSuccess > (len(cr.Validators) / 2) {
cr.Result = ChallengeSuccess
} else {
cr.Result = ChallengeFailure
//Logger.Error("Challenge failed by the validators", zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath), zap.Any("challenge", cr))

Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath))
}

cr.Status = Processed
cr.UpdatedAt = time.Now().UTC()
} else {
cr.ErrorChallenge(ctx, common.NewError("no_consensus_challenge", "No Consensus on the challenge result. Erroring out the challenge"))
return common.NewError("no_consensus_challenge", "No Consensus on the challenge result. Erroring out the challenge")
Expand All @@ -247,19 +257,20 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool)

if len(cr.LastCommitTxnIDs) > 0 {
for _, lastTxn := range cr.LastCommitTxnIDs {
Logger.Info("Verifying the transaction : " + lastTxn)
Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn)
t, err := transaction.VerifyTransaction(lastTxn, chain.GetServerChain())
if err == nil {
cr.Status = Committed
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.UpdatedAt = time.Now().UTC()
if err := cr.Save(ctx); err != nil {
Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID)
return nil
}
Logger.Error("Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
}
}

Expand All @@ -272,14 +283,16 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool)
if t != nil {
cr.CommitTxnID = t.Hash
cr.LastCommitTxnIDs = append(cr.LastCommitTxnIDs, t.Hash)
cr.UpdatedAt = time.Now().UTC()
}
cr.ErrorChallenge(ctx, err)
Logger.Error("Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err))
} else {
cr.Status = Committed
cr.StatusMessage = t.TransactionOutput
cr.CommitTxnID = t.Hash
cr.LastCommitTxnIDs = append(cr.LastCommitTxnIDs, t.Hash)
cr.UpdatedAt = time.Now().UTC()
}
err = cr.Save(ctx)
FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID)
Expand Down
14 changes: 5 additions & 9 deletions code/go/0chain.net/blobbercore/challenge/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,36 @@ func SetupWorkers(ctx context.Context) {
}

func startCommitProcessed(ctx context.Context) {
ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second):
commitProcessed(ctx)
}
}
}

func startProcessAccepted(ctx context.Context) {
ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second):
processAccepted(ctx)
}
}
}

// startSyncOpen
func startSyncOpen(ctx context.Context) {
ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second):
syncOpenChallenges(ctx)
}
}
Expand Down
Loading