diff --git a/code/go/0chain.net/blobbercore/allocation/newfilechange.go b/code/go/0chain.net/blobbercore/allocation/newfilechange.go index 23d33fbe8..0428f6409 100644 --- a/code/go/0chain.net/blobbercore/allocation/newfilechange.go +++ b/code/go/0chain.net/blobbercore/allocation/newfilechange.go @@ -22,9 +22,9 @@ type NewFileChange struct { //client side: Path string `json:"filepath" validation:"required"` //client side: - ActualHash string `json:"actual_hash,omitempty" validation:"required"` + ActualHash string `json:"actual_hash,omitempty" ` //client side: - ActualSize int64 `json:"actual_size,omitempty" validation:"required"` + ActualSize int64 `json:"actual_size,omitempty"` //client side: ActualThumbnailSize int64 `json:"actual_thumb_size"` //client side: diff --git a/code/go/0chain.net/blobbercore/challenge/challenge.go b/code/go/0chain.net/blobbercore/challenge/challenge.go index 7bd02c41e..dbe081921 100644 --- a/code/go/0chain.net/blobbercore/challenge/challenge.go +++ b/code/go/0chain.net/blobbercore/challenge/challenge.go @@ -9,6 +9,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/chain" + "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/lock" "github.com/0chain/blobber/code/go/0chain.net/core/node" "github.com/0chain/blobber/code/go/0chain.net/core/transaction" @@ -40,7 +41,7 @@ func syncOpenChallenges(ctx context.Context) { retBytes, err := transaction.MakeSCRestAPICall(transaction.STORAGE_CONTRACT_ADDRESS, "/openchallenges", params, chain.GetServerChain()) if err != nil { - logging.Logger.Error("Error getting the open challenges from the blockchain", zap.Error(err)) + logging.Logger.Error("[challenge]open: ", zap.Error(err)) } else { bytesReader := bytes.NewBuffer(retBytes) @@ -50,11 +51,11 @@ func syncOpenChallenges(ctx context.Context) { errd := d.Decode(&blobberChallenges) if errd != nil { - logging.Logger.Error("Error in unmarshal of the sharder response", zap.Error(errd)) + logging.Logger.Error("[challenge]json: ", zap.Error(errd)) } else { for _, challengeObj := range blobberChallenges.Challenges { if challengeObj == nil || len(challengeObj.ChallengeID) == 0 { - logging.Logger.Info("No challenge entity from the challenge map") + logging.Logger.Info("[challenge]open: No challenge entity from the challenge map") continue } @@ -69,7 +70,7 @@ func syncOpenChallenges(ctx context.Context) { if err != nil { if !errors.Is(err, gorm.ErrRecordNotFound) { - logging.Logger.Info("Error in load challenge entity from database ", zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.Error(err)) continue } } @@ -78,13 +79,16 @@ func syncOpenChallenges(ctx context.Context) { isNextChallengeOnChain := latestChallenge == nil || latestChallenge.ChallengeID == challengeObj.PrevChallengeID if isFirstChallengeInDatabase || isNextChallengeOnChain { - logging.Logger.Info("Adding new challenge found from blockchain", zap.String("challenge", challengeObj.ChallengeID)) + logging.Logger.Info("[challenge]add: ", zap.String("challenge_id", challengeObj.ChallengeID)) challengeObj.Status = Accepted + challengeObj.CreatedAt = common.ToTime(challengeObj.Created) + challengeObj.UpdatedAt = challengeObj.CreatedAt + if err := challengeObj.Save(tx); err != nil { - logging.Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err)) } } else { - logging.Logger.Error("Challenge chain is not valid") + logging.Logger.Error("[challenge]Challenge chain is not valid") } } @@ -110,26 +114,29 @@ func processAccepted(ctx context.Context) { if len(openchallenges) > 0 { swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers) for _, openchallenge := range openchallenges { - logging.Logger.Info("Processing the challenge", zap.Any("challenge_id", openchallenge.ChallengeID), zap.Any("openchallenge", openchallenge)) + logging.Logger.Info("[challenge]process: ", zap.String("challenge_id", openchallenge.ChallengeID)) err := openchallenge.UnmarshalFields() if err != nil { - logging.Logger.Error("Error unmarshaling challenge entity.", zap.Error(err)) + logging.Logger.Error("[challenge]json: ", zap.Error(err)) continue } swg.Add() go func(redeemCtx context.Context, challengeEntity *ChallengeEntity) { + defer swg.Done() redeemCtx = datastore.GetStore().CreateTransaction(redeemCtx) defer redeemCtx.Done() err := loadValidationTickets(redeemCtx, challengeEntity) if err != nil { - logging.Logger.Error("Getting validation tickets failed", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err)) + logging.Logger.Error("[challenge]validate: ", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err)) + return } db := datastore.GetStore().GetTransaction(redeemCtx) err = db.Commit().Error if err != nil { - logging.Logger.Error("Error commiting the readmarker redeem", zap.Error(err)) + logging.Logger.Error("[challenge]db: ", zap.Any("challenge_id", challengeEntity.ChallengeID), zap.Error(err)) + return } - swg.Done() + }(ctx, openchallenge) } swg.Wait() @@ -145,13 +152,13 @@ func loadValidationTickets(ctx context.Context, challengeObj *ChallengeEntity) e defer func() { if r := recover(); r != nil { - logging.Logger.Error("[recover] LoadValidationTickets", zap.Any("err", r)) + logging.Logger.Error("[recover]LoadValidationTickets", zap.Any("err", r)) } }() err := challengeObj.LoadValidationTickets(ctx) if err != nil { - logging.Logger.Error("Error getting the validation tickets", zap.Error(err), zap.String("challenge_id", challengeObj.ChallengeID)) + logging.Logger.Error("[challenge]load: ", zap.String("challenge_id", challengeObj.ChallengeID), zap.Error(err)) } return err diff --git a/code/go/0chain.net/blobbercore/challenge/entity.go b/code/go/0chain.net/blobbercore/challenge/entity.go index 7b6fcd7a4..bea780f62 100644 --- a/code/go/0chain.net/blobbercore/challenge/entity.go +++ b/code/go/0chain.net/blobbercore/challenge/entity.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" @@ -76,6 +77,9 @@ type ChallengeEntity struct { ObjectPathString datatypes.JSON `json:"-" gorm:"column:object_path"` ObjectPath *reference.ObjectPath `json:"object_path" gorm:"-"` Created common.Timestamp `json:"created" gorm:"-"` + + CreatedAt time.Time `gorm:"created_at"` + UpdatedAt time.Time `gorm:"updated_at"` } func (ChallengeEntity) TableName() string { diff --git a/code/go/0chain.net/blobbercore/challenge/protocol.go b/code/go/0chain.net/blobbercore/challenge/protocol.go index b578868b9..e3f9a5b0f 100644 --- a/code/go/0chain.net/blobbercore/challenge/protocol.go +++ b/code/go/0chain.net/blobbercore/challenge/protocol.go @@ -63,19 +63,23 @@ func (cr *ChallengeEntity) SubmitChallengeToBC(ctx context.Context) (*transactio func (cr *ChallengeEntity) ErrorChallenge(ctx context.Context, err error) { cr.StatusMessage = err.Error() + cr.UpdatedAt = time.Now().UTC() + if err := cr.Save(ctx); err != nil { - Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } } // LoadValidationTickets load validation tickets func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if len(cr.Validators) == 0 { - cr.StatusMessage = "No validators assigned to the challange" + cr.StatusMessage = "No validators assigned to the challenge" + cr.UpdatedAt = time.Now().UTC() + if err := cr.Save(ctx); err != nil { - Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } - return common.NewError("no_validators", "No validators assigned to the challange") + return common.NewError("no_validators", "No validators assigned to the challenge") } allocationObj, err := allocation.GetAllocationByID(ctx, cr.AllocationID) @@ -97,16 +101,19 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { r := rand.New(rand.NewSource(cr.RandomNumber)) blockNum = r.Int63n(rootRef.NumBlocks) blockNum = blockNum + 1 + cr.BlockNum = blockNum } else { - Logger.Error("Got a challenge for a blank allocation") + err = common.NewError("allocation_is_blank", "Got a challenge for a blank allocation") + cr.ErrorChallenge(ctx, err) + return err } - cr.BlockNum = blockNum if err != nil { cr.ErrorChallenge(ctx, err) return err } - Logger.Info("blockNum for challenge", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber)) + + Logger.Info("[challenge]rand: ", zap.Any("rootRef.NumBlocks", rootRef.NumBlocks), zap.Any("blockNum", blockNum), zap.Any("challenge_id", cr.ChallengeID), zap.Any("random_seed", cr.RandomNumber)) objectPath, err := reference.GetObjectPath(ctx, cr.AllocationID, blockNum) if err != nil { cr.ErrorChallenge(ctx, err) @@ -165,7 +172,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { postDataBytes, err := json.Marshal(postData) if err != nil { - Logger.Error("Error in marshalling the post data for validation. " + err.Error()) + Logger.Error("[db]form: " + err.Error()) cr.ErrorChallenge(ctx, err) return err } @@ -185,7 +192,7 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { resp, err := util.SendPostRequest(url, postDataBytes, nil) if err != nil { - Logger.Info("Got error from the validator.", zap.Any("error", err.Error())) + Logger.Info("[challenge]post: ", zap.Any("error", err.Error())) delete(responses, validator.ID) cr.ValidationTickets[i] = nil continue @@ -193,15 +200,15 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { var validationTicket ValidationTicket err = json.Unmarshal(resp, &validationTicket) if err != nil { - Logger.Info("Got error decoding from the validator response .", zap.Any("resp", string(resp)), zap.Any("error", err.Error())) + Logger.Error("[challenge]resp: ", zap.String("validator", validator.ID), zap.Any("resp", string(resp)), zap.Any("error", err.Error())) delete(responses, validator.ID) cr.ValidationTickets[i] = nil continue } - Logger.Info("Got response from the validator.", zap.Any("validator_response", validationTicket)) + Logger.Info("[challenge]resp: Got response from the validator.", zap.Any("validator_response", validationTicket)) verified, err := validationTicket.VerifySign() if err != nil || !verified { - Logger.Info("Validation ticket from validator could not be verified.") + Logger.Error("[challenge]ticket: Validation ticket from validator could not be verified.", zap.String("validator", validator.ID)) delete(responses, validator.ID) cr.ValidationTickets[i] = nil continue @@ -219,22 +226,25 @@ func (cr *ChallengeEntity) LoadValidationTickets(ctx context.Context) error { if vt.Result { numSuccess++ } else { + Logger.Error("[challenge]ticket: "+vt.Message, zap.String("validator", vt.ValidatorID)) numFailure++ } numValidatorsResponded++ } } - Logger.Info("validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses)) + Logger.Info("[challenge]validator response stats", zap.Any("challenge_id", cr.ChallengeID), zap.Any("validator_responses", responses)) if numSuccess > (len(cr.Validators)/2) || numFailure > (len(cr.Validators)/2) || numValidatorsResponded == len(cr.Validators) { if numSuccess > (len(cr.Validators) / 2) { cr.Result = ChallengeSuccess } else { cr.Result = ChallengeFailure - //Logger.Error("Challenge failed by the validators", zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath), zap.Any("challenge", cr)) + + Logger.Error("[challenge]validate: ", zap.String("challenge_id", cr.ChallengeID), zap.Any("block_num", cr.BlockNum), zap.Any("object_path", objectPath)) } cr.Status = Processed + cr.UpdatedAt = time.Now().UTC() } else { cr.ErrorChallenge(ctx, common.NewError("no_consensus_challenge", "No Consensus on the challenge result. Erroring out the challenge")) return common.NewError("no_consensus_challenge", "No Consensus on the challenge result. Erroring out the challenge") @@ -247,19 +257,20 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) if len(cr.LastCommitTxnIDs) > 0 { for _, lastTxn := range cr.LastCommitTxnIDs { - Logger.Info("Verifying the transaction : " + lastTxn) + Logger.Info("[challenge]commit: Verifying the transaction : " + lastTxn) t, err := transaction.VerifyTransaction(lastTxn, chain.GetServerChain()) if err == nil { cr.Status = Committed cr.StatusMessage = t.TransactionOutput cr.CommitTxnID = t.Hash + cr.UpdatedAt = time.Now().UTC() if err := cr.Save(ctx); err != nil { - Logger.Error("ChallengeEntity_Save", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + Logger.Error("[challenge]db: ", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) return nil } - Logger.Error("Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + Logger.Error("[challenge]trans: Error verifying the txn from BC."+lastTxn, zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } } @@ -272,14 +283,16 @@ func (cr *ChallengeEntity) CommitChallenge(ctx context.Context, verifyOnly bool) if t != nil { cr.CommitTxnID = t.Hash cr.LastCommitTxnIDs = append(cr.LastCommitTxnIDs, t.Hash) + cr.UpdatedAt = time.Now().UTC() } cr.ErrorChallenge(ctx, err) - Logger.Error("Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) + Logger.Error("[challenge]submit: Error while submitting challenge to BC.", zap.String("challenge_id", cr.ChallengeID), zap.Error(err)) } else { cr.Status = Committed cr.StatusMessage = t.TransactionOutput cr.CommitTxnID = t.Hash cr.LastCommitTxnIDs = append(cr.LastCommitTxnIDs, t.Hash) + cr.UpdatedAt = time.Now().UTC() } err = cr.Save(ctx) FileChallenged(ctx, cr.RefID, cr.Result, cr.CommitTxnID) diff --git a/code/go/0chain.net/blobbercore/challenge/worker.go b/code/go/0chain.net/blobbercore/challenge/worker.go index 95ce9baa5..10e6ec5c9 100644 --- a/code/go/0chain.net/blobbercore/challenge/worker.go +++ b/code/go/0chain.net/blobbercore/challenge/worker.go @@ -15,26 +15,23 @@ func SetupWorkers(ctx context.Context) { } func startCommitProcessed(ctx context.Context) { - ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second) - defer ticker.Stop() for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second): commitProcessed(ctx) } } } func startProcessAccepted(ctx context.Context) { - ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second) - defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second): processAccepted(ctx) } } @@ -42,13 +39,12 @@ func startProcessAccepted(ctx context.Context) { // startSyncOpen func startSyncOpen(ctx context.Context) { - ticker := time.NewTicker(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second) - defer ticker.Stop() + for { select { case <-ctx.Done(): return - case <-ticker.C: + case <-time.After(time.Duration(config.Configuration.ChallengeResolveFreq) * time.Second): syncOpenChallenges(ctx) } } diff --git a/code/go/0chain.net/core/transaction/http.go b/code/go/0chain.net/core/transaction/http.go index d610c4616..c6ea69311 100644 --- a/code/go/0chain.net/core/transaction/http.go +++ b/code/go/0chain.net/core/transaction/http.go @@ -2,10 +2,9 @@ package transaction import ( "context" - "crypto/sha1" "encoding/hex" + "hash/fnv" "math" - "strconv" "fmt" "io" @@ -16,13 +15,11 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/core/chain" "github.com/0chain/blobber/code/go/0chain.net/core/common" - . "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "github.com/0chain/errors" "github.com/0chain/gosdk/core/resty" "github.com/0chain/gosdk/core/util" "github.com/0chain/gosdk/zcncore" - - "go.uber.org/zap" ) const TXN_SUBMIT_URL = "v1/transaction/put" @@ -122,26 +119,23 @@ func makeSCRestAPICall(scAddress string, relativePath string, params map[string] url := req.URL.String() if resp.StatusCode != http.StatusOK { - resBody, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() + errorMsg := "[sharder]" + resp.Status + ": " + url + msgList = append(msgList, errorMsg) - Logger.Error("[sharder]"+resp.Status, zap.String("url", req.URL.String()), zap.String("response", string(resBody))) - - msgList = append(msgList, url+": ["+strconv.Itoa(resp.StatusCode)+"] "+string(resBody)) - - return errors.Throw(ErrBadRequest, req.URL.String()+" "+resp.Status) + return errors.Throw(ErrBadRequest, errorMsg) } - hash := sha1.New() + hash := fnv.New32() //use fnv for better performance teeReader := io.TeeReader(resp.Body, hash) resBody, err := ioutil.ReadAll(teeReader) resp.Body.Close() if err != nil { - Logger.Error("[sharder]"+err.Error(), zap.String("url", req.URL.String()), zap.String("response", string(resBody))) - msgList = append(msgList, url+": "+err.Error()) - return errors.Throw(ErrBadRequest, req.URL.String()+" "+err.Error()) + errorMsg := "[sharder]body: " + url + " " + err.Error() + msgList = append(msgList, errorMsg) + return errors.Throw(ErrBadRequest, errorMsg) } diff --git a/go.mod b/go.mod index 0ad973545..751e0f50f 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/0chain/errors v1.0.3 - github.com/0chain/gosdk v1.2.88 + github.com/0chain/gosdk v1.3.0-beta.2 github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/didip/tollbooth/v6 v6.1.1 github.com/go-ini/ini v1.55.0 // indirect @@ -37,4 +37,4 @@ require ( nhooyr.io/websocket v1.8.7 // indirect ) -// replace github.com/0chain/gosdk => ../gosdk +//replace github.com/0chain/gosdk => ../gosdk diff --git a/go.sum b/go.sum index b786db258..9f3f7e3f4 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7 github.com/0chain/errors v1.0.2/go.mod h1:5t76jLb56TKfg/K2VD+eUMmNZJ42QsIRI8KzWuztwU4= github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM= github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc= -github.com/0chain/gosdk v1.2.88 h1:rllzNvDvLP9iDXI6jYT9v3aBgeucGvb1uOpA9Af2AjQ= -github.com/0chain/gosdk v1.2.88/go.mod h1:JtvcqYYWRdOVFm0pvjdKO5pCiItc/Is2f5wTuuA8F4M= +github.com/0chain/gosdk v1.3.0-beta.2 h1:+exlUvGFp1iG2UF+3Hken4CEfoYdHsBav+70be096IY= +github.com/0chain/gosdk v1.3.0-beta.2/go.mod h1:JtvcqYYWRdOVFm0pvjdKO5pCiItc/Is2f5wTuuA8F4M= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4=