diff --git a/.github/workflows/build-&-publish-docker-image.yml b/.github/workflows/build-&-publish-docker-image.yml index c87559549..8225c6acc 100644 --- a/.github/workflows/build-&-publish-docker-image.yml +++ b/.github/workflows/build-&-publish-docker-image.yml @@ -20,7 +20,7 @@ env: jobs: blobber: - runs-on: [self-hosted, build] + runs-on: [self-hosted, docker-builds] steps: - name: Set docker image tag run: | @@ -64,7 +64,7 @@ jobs: ./docker.local/bin/build.base.sh && ./docker.local/bin/build.blobber.sh validator: - runs-on: [self-hosted, build] + runs-on: [self-hosted, docker-builds] steps: - name: Set docker image tag run: | diff --git a/code/go/0chain.net/blobber/config.go b/code/go/0chain.net/blobber/config.go index 173cc482c..8579e462e 100644 --- a/code/go/0chain.net/blobber/config.go +++ b/code/go/0chain.net/blobber/config.go @@ -134,6 +134,8 @@ func setupConfig(configDir string, deploymentMode int) { config.Configuration.Geolocation.Latitude = viper.GetFloat64("geolocation.latitude") config.Configuration.Geolocation.Longitude = viper.GetFloat64("geolocation.longitude") + config.Configuration.ChallengeCompletionTime = viper.GetDuration("challenge_completion_time") + fmt.Print(" [OK]\n") } diff --git a/code/go/0chain.net/blobbercore/challenge/challenge.go b/code/go/0chain.net/blobbercore/challenge/challenge.go index c922b5e25..099cbcdb9 100644 --- a/code/go/0chain.net/blobbercore/challenge/challenge.go +++ b/code/go/0chain.net/blobbercore/challenge/challenge.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "time" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/config" @@ -121,6 +122,7 @@ func saveNewChallenge(c *ChallengeEntity, ctx context.Context) { logging.Logger.Info("[challenge]elapsed:add ", zap.String("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), + zap.Time("start", startTime), zap.String("delay", startTime.Sub(c.CreatedAt).String()), zap.String("save", time.Since(startTime).String())) @@ -135,64 +137,127 @@ func processAccepted(ctx context.Context) { }() db := datastore.GetStore().GetDB() - challenges := make([]*ChallengeEntity, 0) - db.Where(ChallengeEntity{Status: Accepted}).Find(&challenges) - if len(challenges) > 0 { - - startTime := time.Now() - - swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers) - for _, c := range challenges { - logging.Logger.Info("[challenge]process: ", - zap.String("challenge_id", c.ChallengeID), - zap.Time("created", c.CreatedAt)) - err := c.UnmarshalFields() - if err != nil { - logging.Logger.Error("[challenge]process: ", - zap.String("challenge_id", c.ChallengeID), - zap.Time("created", c.CreatedAt), - zap.String("validators", string(c.ValidatorsString)), - zap.String("lastCommitTxnList", string(c.LastCommitTxnList)), - zap.String("validationTickets", string(c.ValidationTicketsString)), - zap.String("ObjectPath", string(c.ObjectPathString)), - zap.Error(err)) - continue - } - swg.Add() - go validateChallenge(&swg, c) + + rows, err := db.Model(&ChallengeEntity{}). + Where("status = ?", Accepted). + Select("challenge_id", "created_at").Rows() + + if err != nil { + logging.Logger.Error("[challenge]process: ", + zap.Error(err)) + return + } + + defer rows.Close() + + startTime := time.Now() + swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers) + count := 0 + for rows.Next() { + count++ + now := time.Now() + + var challengeID string + var createdAt time.Time + + err := rows.Scan(&challengeID, &createdAt) + if err != nil { + logging.Logger.Error("[challenge]process: ", + zap.Error(err)) + continue } - swg.Wait() - logging.Logger.Info("[challenge]elapsed:process ", - zap.Int("count", len(challenges)), - zap.String("save", time.Since(startTime).String())) + if time.Since(createdAt) > config.Configuration.ChallengeCompletionTime { + + db.Model(&ChallengeEntity{}). + Where("challenge_id =? and status =? ", challengeID, Accepted). + Updates(map[string]interface{}{ + "status": Cancelled, + "result": ChallengeFailure, + "status_message": fmt.Sprintf("created: %s, start: %s , delay: %s, cct: %s", createdAt, now, now.Sub(createdAt).String(), config.Configuration.ChallengeCompletionTime.String()), + }) + + logging.Logger.Error("[challenge]process: timeout ", + zap.Any("challenge_id", challengeID), + zap.Time("created", createdAt), + zap.Time("start", now), + zap.String("delay", now.Sub(createdAt).String()), + zap.String("cct", config.Configuration.ChallengeCompletionTime.String()), + zap.Error(err)) + continue + } + + swg.Add() + go func(id string) { + defer swg.Done() + validateChallenge(id) + }(challengeID) + } -} -func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) { - defer swg.Done() + swg.Wait() + + logging.Logger.Info("[challenge]elapsed:process ", + zap.Int("count", count), + zap.String("save", time.Since(startTime).String())) + +} +func validateChallenge(id string) { startTime := time.Now() ctx := datastore.GetStore().CreateTransaction(context.TODO()) defer ctx.Done() - db := datastore.GetStore().GetTransaction(ctx) + var c *ChallengeEntity + + tx := datastore.GetStore().GetTransaction(ctx) + + if err := tx.Model(&ChallengeEntity{}). + Where("challenge_id = ? and status = ?", id, Accepted). + Find(c).Error; err != nil { + + logging.Logger.Error("[challenge]validate: ", + zap.Any("challenge_id", id), + zap.Error(err)) + + tx.Rollback() + return + } + + logging.Logger.Info("[challenge]validate: ", + zap.String("challenge_id", c.ChallengeID), + zap.Time("created", c.CreatedAt)) + + err := c.UnmarshalFields() + if err != nil { + logging.Logger.Error("[challenge]validate: ", + zap.String("challenge_id", c.ChallengeID), + zap.Time("created", c.CreatedAt), + zap.String("validators", string(c.ValidatorsString)), + zap.String("lastCommitTxnList", string(c.LastCommitTxnList)), + zap.String("validationTickets", string(c.ValidationTicketsString)), + zap.String("ObjectPath", string(c.ObjectPathString)), + zap.Error(err)) + tx.Rollback() + return + } + if err := c.LoadValidationTickets(ctx); err != nil { logging.Logger.Error("[challenge]validate: ", zap.Any("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), zap.Error(err)) - db.Rollback() + tx.Rollback() return } - if err := db.Commit().Error; err != nil { + if err := tx.Commit().Error; err != nil { logging.Logger.Error("[challenge]validate(Commit): ", zap.Any("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), zap.Error(err)) - db.Rollback() + tx.Rollback() return } @@ -203,6 +268,7 @@ func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) { logging.Logger.Info("[challenge]elapsed:validate ", zap.String("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), + zap.Time("start", startTime), zap.String("delay", startTime.Sub(c.CreatedAt).String()), zap.String("save", time.Since(startTime).String())) } @@ -215,33 +281,91 @@ func commitProcessed(ctx context.Context) { }() db := datastore.GetStore().GetDB() - var challenges []*ChallengeEntity + count := 0 - db.Where(ChallengeEntity{Status: Processed}). - Order("sequence"). - Find(&challenges) + rows, err := db.Model(&ChallengeEntity{}). + Where("status = ?", Processed). + Select("challenge_id", "created_at").Rows() - if len(challenges) > 0 { + if err != nil { + logging.Logger.Error("[challenge]commit: ", + zap.Error(err)) + return + } + defer rows.Close() - startTime := time.Now() + startTime := time.Now() + swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers) + + for rows.Next() { + count++ + now := time.Now() + + var challengeID string + var createdAt time.Time - swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers) - for _, challenge := range challenges { - swg.Add() - go func(c *ChallengeEntity) { - defer swg.Done() - commitChallenge(c) - }(challenge) + err := rows.Scan(&challengeID, &createdAt) + if err != nil { + logging.Logger.Error("[challenge]commit: ", + zap.Error(err)) + continue } - swg.Wait() - logging.Logger.Info("[challenge]elapsed:commit ", - zap.Int("count", len(challenges)), - zap.String("save", time.Since(startTime).String())) + if time.Since(createdAt) > config.Configuration.ChallengeCompletionTime { + + db.Model(&ChallengeEntity{}). + Where("challenge_id =? and status =? ", challengeID, Accepted). + Updates(map[string]interface{}{ + "status": Cancelled, + "result": ChallengeFailure, + "status_message": fmt.Sprintf("created: %s, start: %s , delay: %s, cct: %s", createdAt, now, now.Sub(createdAt).String(), config.Configuration.ChallengeCompletionTime.String()), + }) + + logging.Logger.Error("[challenge]commit: timeout ", + zap.Any("challenge_id", challengeID), + zap.Time("created", createdAt), + zap.Time("start", now), + zap.String("delay", now.Sub(createdAt).String()), + zap.String("cct", config.Configuration.ChallengeCompletionTime.String()), + zap.Error(err)) + continue + } + + swg.Add() + go func(id string) { + defer swg.Done() + commitChallenge(id) + }(challengeID) + } + + swg.Wait() + + logging.Logger.Info("[challenge]elapsed:commit ", + zap.Int("count", count), + zap.String("save", time.Since(startTime).String())) } -func commitChallenge(c *ChallengeEntity) { +func commitChallenge(id string) { + + ctx := datastore.GetStore().CreateTransaction(context.TODO()) + defer ctx.Done() + + tx := datastore.GetStore().GetTransaction(ctx) + + var c *ChallengeEntity + + if err := tx.Model(&ChallengeEntity{}). + Where("challenge_id = ? and status = ?", id, Processed). + Find(c).Error; err != nil { + + logging.Logger.Error("[challenge]commit: ", + zap.Any("challenge_id", id), + zap.Error(err)) + + tx.Rollback() + return + } startTime := time.Now() @@ -259,28 +383,25 @@ func commitChallenge(c *ChallengeEntity) { zap.String("validationTickets", string(c.ValidationTicketsString)), zap.String("ObjectPath", string(c.ObjectPathString)), zap.Error(err)) + tx.Rollback() + return } - ctx := datastore.GetStore().CreateTransaction(context.TODO()) - defer ctx.Done() - - db := datastore.GetStore().GetTransaction(ctx) - if err := c.CommitChallenge(ctx, false); err != nil { logging.Logger.Error("[challenge]commit", zap.String("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), zap.Error(err)) - db.Rollback() + tx.Rollback() return } - if err := db.Commit().Error; err != nil { + if err := tx.Commit().Error; err != nil { logging.Logger.Warn("[challenge]commit", zap.Any("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), zap.Error(err)) - db.Rollback() + tx.Rollback() return } @@ -293,6 +414,7 @@ func commitChallenge(c *ChallengeEntity) { logging.Logger.Info("[challenge]elapsed:commit ", zap.String("challenge_id", c.ChallengeID), zap.Time("created", c.CreatedAt), + zap.Time("start", startTime), zap.String("delay", startTime.Sub(c.CreatedAt).String()), zap.String("save", time.Since(startTime).String())) diff --git a/code/go/0chain.net/blobbercore/challenge/entity.go b/code/go/0chain.net/blobbercore/challenge/entity.go index 06e82a37e..b5e86d883 100644 --- a/code/go/0chain.net/blobbercore/challenge/entity.go +++ b/code/go/0chain.net/blobbercore/challenge/entity.go @@ -25,6 +25,7 @@ const ( Accepted ChallengeStatus = iota + 1 Processed Committed + Cancelled ) func (s ChallengeStatus) String() string { @@ -35,6 +36,8 @@ func (s ChallengeStatus) String() string { return "Processed" case Committed: return "Committed" + case Cancelled: + return "Cancelled" default: return fmt.Sprintf("%d", int(s)) } @@ -77,7 +80,7 @@ type ChallengeEntity struct { AllocationID string `gorm:"column:allocation_id;size64;not null" json:"allocation_id"` AllocationRoot string `gorm:"column:allocation_root;size:64" json:"allocation_root"` RespondedAllocationRoot string `gorm:"column:responded_allocation_root;size:64" json:"responded_allocation_root"` - Status ChallengeStatus `gorm:"column:status;type:integer;not null;default:0" json:"status"` + Status ChallengeStatus `gorm:"column:status;type:integer;not null;default:0;index:idx_status" json:"status"` Result ChallengeResult `gorm:"column:result;type:integer;not null;default:0" json:"result"` StatusMessage string `gorm:"column:status_message" json:"status_message"` CommitTxnID string `gorm:"column:commit_txn_id;size:64" json:"commit_txn_id"` @@ -204,6 +207,7 @@ func GetChallengeEntity(ctx context.Context, challengeID string) (*ChallengeEnti } // getStatus check challenge if exists in db +// nolint func getStatus(db *gorm.DB, challengeID string) *ChallengeStatus { var status []int diff --git a/code/go/0chain.net/blobbercore/config/config.go b/code/go/0chain.net/blobbercore/config/config.go index 0eb937f56..72cae86af 100644 --- a/code/go/0chain.net/blobbercore/config/config.go +++ b/code/go/0chain.net/blobbercore/config/config.go @@ -24,6 +24,7 @@ func SetupDefaultConfig() { viper.SetDefault("challenge_response.frequency", 10) viper.SetDefault("challenge_response.num_workers", 5) viper.SetDefault("challenge_response.max_retries", 10) + viper.SetDefault("challenge_completion_time", "3m") viper.SetDefault("healthcheck.frequency", "60s") @@ -169,6 +170,8 @@ type Config struct { // AutomacitUpdate Whether to automatically update blobber updates to blockchain AutomaticUpdate bool BlobberUpdateInterval time.Duration + + ChallengeCompletionTime time.Duration } /*Configuration of the system */ diff --git a/code/go/0chain.net/blobbercore/writemarker/worker.go b/code/go/0chain.net/blobbercore/writemarker/worker.go index 688228236..03f36153c 100644 --- a/code/go/0chain.net/blobbercore/writemarker/worker.go +++ b/code/go/0chain.net/blobbercore/writemarker/worker.go @@ -109,13 +109,16 @@ func startRedeemWriteMarkers(ctx context.Context) { var ticker = time.NewTicker( time.Duration(config.Configuration.WMRedeemFreq) * time.Second, ) + + logging.Logger.Info("Redeem writemarkers", + zap.Any("numOfWorkers", config.Configuration.WMRedeemNumWorkers)) + for { select { case <-ctx.Done(): return case <-ticker.C: - logging.Logger.Info("Trying to redeem writemarkers.", - zap.Any("numOfWorkers", config.Configuration.WMRedeemNumWorkers)) + redeemWriteMarkers() } } diff --git a/config/0chain_blobber.yaml b/config/0chain_blobber.yaml index 9b0988071..2b09aec1b 100755 --- a/config/0chain_blobber.yaml +++ b/config/0chain_blobber.yaml @@ -67,6 +67,8 @@ min_confirmation: 50 block_worker: http://198.18.0.98:9091 +challenge_completion_time: 3m + handlers: rate_limit: 0 # 10 per second . it can't too small one if a large file is download with blocks file_rate_limit: 100 # 100 files per second