Skip to content

Commit 9658eb1

Browse files
authored
fix(challenge): load challenges one by one from db to prevent oomkilled (#779)
1 parent b0bcbb3 commit 9658eb1

File tree

7 files changed

+203
-67
lines changed

7 files changed

+203
-67
lines changed

.github/workflows/build-&-publish-docker-image.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ env:
2020

2121
jobs:
2222
blobber:
23-
runs-on: [self-hosted, build]
23+
runs-on: [self-hosted, docker-builds]
2424
steps:
2525
- name: Set docker image tag
2626
run: |
@@ -64,7 +64,7 @@ jobs:
6464
./docker.local/bin/build.base.sh && ./docker.local/bin/build.blobber.sh
6565
6666
validator:
67-
runs-on: [self-hosted, build]
67+
runs-on: [self-hosted, docker-builds]
6868
steps:
6969
- name: Set docker image tag
7070
run: |

code/go/0chain.net/blobber/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ func setupConfig(configDir string, deploymentMode int) {
134134
config.Configuration.Geolocation.Latitude = viper.GetFloat64("geolocation.latitude")
135135
config.Configuration.Geolocation.Longitude = viper.GetFloat64("geolocation.longitude")
136136

137+
config.Configuration.ChallengeCompletionTime = viper.GetDuration("challenge_completion_time")
138+
137139
fmt.Print(" [OK]\n")
138140
}
139141

code/go/0chain.net/blobbercore/challenge/challenge.go

Lines changed: 184 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7+
"fmt"
78
"time"
89

910
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/config"
@@ -121,6 +122,7 @@ func saveNewChallenge(c *ChallengeEntity, ctx context.Context) {
121122
logging.Logger.Info("[challenge]elapsed:add ",
122123
zap.String("challenge_id", c.ChallengeID),
123124
zap.Time("created", c.CreatedAt),
125+
zap.Time("start", startTime),
124126
zap.String("delay", startTime.Sub(c.CreatedAt).String()),
125127
zap.String("save", time.Since(startTime).String()))
126128

@@ -135,64 +137,127 @@ func processAccepted(ctx context.Context) {
135137
}()
136138

137139
db := datastore.GetStore().GetDB()
138-
challenges := make([]*ChallengeEntity, 0)
139-
db.Where(ChallengeEntity{Status: Accepted}).Find(&challenges)
140-
if len(challenges) > 0 {
141-
142-
startTime := time.Now()
143-
144-
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
145-
for _, c := range challenges {
146-
logging.Logger.Info("[challenge]process: ",
147-
zap.String("challenge_id", c.ChallengeID),
148-
zap.Time("created", c.CreatedAt))
149-
err := c.UnmarshalFields()
150-
if err != nil {
151-
logging.Logger.Error("[challenge]process: ",
152-
zap.String("challenge_id", c.ChallengeID),
153-
zap.Time("created", c.CreatedAt),
154-
zap.String("validators", string(c.ValidatorsString)),
155-
zap.String("lastCommitTxnList", string(c.LastCommitTxnList)),
156-
zap.String("validationTickets", string(c.ValidationTicketsString)),
157-
zap.String("ObjectPath", string(c.ObjectPathString)),
158-
zap.Error(err))
159-
continue
160-
}
161-
swg.Add()
162-
go validateChallenge(&swg, c)
140+
141+
rows, err := db.Model(&ChallengeEntity{}).
142+
Where("status = ?", Accepted).
143+
Select("challenge_id", "created_at").Rows()
144+
145+
if err != nil {
146+
logging.Logger.Error("[challenge]process: ",
147+
zap.Error(err))
148+
return
149+
}
150+
151+
defer rows.Close()
152+
153+
startTime := time.Now()
154+
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
155+
count := 0
156+
for rows.Next() {
157+
count++
158+
now := time.Now()
159+
160+
var challengeID string
161+
var createdAt time.Time
162+
163+
err := rows.Scan(&challengeID, &createdAt)
164+
if err != nil {
165+
logging.Logger.Error("[challenge]process: ",
166+
zap.Error(err))
167+
continue
163168
}
164-
swg.Wait()
165169

166-
logging.Logger.Info("[challenge]elapsed:process ",
167-
zap.Int("count", len(challenges)),
168-
zap.String("save", time.Since(startTime).String()))
170+
if time.Since(createdAt) > config.Configuration.ChallengeCompletionTime {
171+
172+
db.Model(&ChallengeEntity{}).
173+
Where("challenge_id =? and status =? ", challengeID, Accepted).
174+
Updates(map[string]interface{}{
175+
"status": Cancelled,
176+
"result": ChallengeFailure,
177+
"status_message": fmt.Sprintf("created: %s, start: %s , delay: %s, cct: %s", createdAt, now, now.Sub(createdAt).String(), config.Configuration.ChallengeCompletionTime.String()),
178+
})
179+
180+
logging.Logger.Error("[challenge]process: timeout ",
181+
zap.Any("challenge_id", challengeID),
182+
zap.Time("created", createdAt),
183+
zap.Time("start", now),
184+
zap.String("delay", now.Sub(createdAt).String()),
185+
zap.String("cct", config.Configuration.ChallengeCompletionTime.String()),
186+
zap.Error(err))
187+
continue
188+
}
189+
190+
swg.Add()
191+
go func(id string) {
192+
defer swg.Done()
193+
validateChallenge(id)
194+
}(challengeID)
195+
169196
}
170-
}
171197

172-
func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) {
173-
defer swg.Done()
198+
swg.Wait()
199+
200+
logging.Logger.Info("[challenge]elapsed:process ",
201+
zap.Int("count", count),
202+
zap.String("save", time.Since(startTime).String()))
203+
204+
}
174205

206+
func validateChallenge(id string) {
175207
startTime := time.Now()
176208

177209
ctx := datastore.GetStore().CreateTransaction(context.TODO())
178210
defer ctx.Done()
179211

180-
db := datastore.GetStore().GetTransaction(ctx)
212+
var c *ChallengeEntity
213+
214+
tx := datastore.GetStore().GetTransaction(ctx)
215+
216+
if err := tx.Model(&ChallengeEntity{}).
217+
Where("challenge_id = ? and status = ?", id, Accepted).
218+
Find(c).Error; err != nil {
219+
220+
logging.Logger.Error("[challenge]validate: ",
221+
zap.Any("challenge_id", id),
222+
zap.Error(err))
223+
224+
tx.Rollback()
225+
return
226+
}
227+
228+
logging.Logger.Info("[challenge]validate: ",
229+
zap.String("challenge_id", c.ChallengeID),
230+
zap.Time("created", c.CreatedAt))
231+
232+
err := c.UnmarshalFields()
233+
if err != nil {
234+
logging.Logger.Error("[challenge]validate: ",
235+
zap.String("challenge_id", c.ChallengeID),
236+
zap.Time("created", c.CreatedAt),
237+
zap.String("validators", string(c.ValidatorsString)),
238+
zap.String("lastCommitTxnList", string(c.LastCommitTxnList)),
239+
zap.String("validationTickets", string(c.ValidationTicketsString)),
240+
zap.String("ObjectPath", string(c.ObjectPathString)),
241+
zap.Error(err))
242+
tx.Rollback()
243+
return
244+
}
245+
181246
if err := c.LoadValidationTickets(ctx); err != nil {
182247
logging.Logger.Error("[challenge]validate: ",
183248
zap.Any("challenge_id", c.ChallengeID),
184249
zap.Time("created", c.CreatedAt),
185250
zap.Error(err))
186-
db.Rollback()
251+
tx.Rollback()
187252
return
188253
}
189254

190-
if err := db.Commit().Error; err != nil {
255+
if err := tx.Commit().Error; err != nil {
191256
logging.Logger.Error("[challenge]validate(Commit): ",
192257
zap.Any("challenge_id", c.ChallengeID),
193258
zap.Time("created", c.CreatedAt),
194259
zap.Error(err))
195-
db.Rollback()
260+
tx.Rollback()
196261
return
197262
}
198263

@@ -203,6 +268,7 @@ func validateChallenge(swg *sizedwaitgroup.SizedWaitGroup, c *ChallengeEntity) {
203268
logging.Logger.Info("[challenge]elapsed:validate ",
204269
zap.String("challenge_id", c.ChallengeID),
205270
zap.Time("created", c.CreatedAt),
271+
zap.Time("start", startTime),
206272
zap.String("delay", startTime.Sub(c.CreatedAt).String()),
207273
zap.String("save", time.Since(startTime).String()))
208274
}
@@ -215,33 +281,91 @@ func commitProcessed(ctx context.Context) {
215281
}()
216282

217283
db := datastore.GetStore().GetDB()
218-
var challenges []*ChallengeEntity
284+
count := 0
219285

220-
db.Where(ChallengeEntity{Status: Processed}).
221-
Order("sequence").
222-
Find(&challenges)
286+
rows, err := db.Model(&ChallengeEntity{}).
287+
Where("status = ?", Processed).
288+
Select("challenge_id", "created_at").Rows()
223289

224-
if len(challenges) > 0 {
290+
if err != nil {
291+
logging.Logger.Error("[challenge]commit: ",
292+
zap.Error(err))
293+
return
294+
}
295+
defer rows.Close()
225296

226-
startTime := time.Now()
297+
startTime := time.Now()
298+
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
299+
300+
for rows.Next() {
301+
count++
302+
now := time.Now()
303+
304+
var challengeID string
305+
var createdAt time.Time
227306

228-
swg := sizedwaitgroup.New(config.Configuration.ChallengeResolveNumWorkers)
229-
for _, challenge := range challenges {
230-
swg.Add()
231-
go func(c *ChallengeEntity) {
232-
defer swg.Done()
233-
commitChallenge(c)
234-
}(challenge)
307+
err := rows.Scan(&challengeID, &createdAt)
308+
if err != nil {
309+
logging.Logger.Error("[challenge]commit: ",
310+
zap.Error(err))
311+
continue
235312
}
236-
swg.Wait()
237313

238-
logging.Logger.Info("[challenge]elapsed:commit ",
239-
zap.Int("count", len(challenges)),
240-
zap.String("save", time.Since(startTime).String()))
314+
if time.Since(createdAt) > config.Configuration.ChallengeCompletionTime {
315+
316+
db.Model(&ChallengeEntity{}).
317+
Where("challenge_id =? and status =? ", challengeID, Accepted).
318+
Updates(map[string]interface{}{
319+
"status": Cancelled,
320+
"result": ChallengeFailure,
321+
"status_message": fmt.Sprintf("created: %s, start: %s , delay: %s, cct: %s", createdAt, now, now.Sub(createdAt).String(), config.Configuration.ChallengeCompletionTime.String()),
322+
})
323+
324+
logging.Logger.Error("[challenge]commit: timeout ",
325+
zap.Any("challenge_id", challengeID),
326+
zap.Time("created", createdAt),
327+
zap.Time("start", now),
328+
zap.String("delay", now.Sub(createdAt).String()),
329+
zap.String("cct", config.Configuration.ChallengeCompletionTime.String()),
330+
zap.Error(err))
331+
continue
332+
}
333+
334+
swg.Add()
335+
go func(id string) {
336+
defer swg.Done()
337+
commitChallenge(id)
338+
}(challengeID)
339+
241340
}
341+
342+
swg.Wait()
343+
344+
logging.Logger.Info("[challenge]elapsed:commit ",
345+
zap.Int("count", count),
346+
zap.String("save", time.Since(startTime).String()))
242347
}
243348

244-
func commitChallenge(c *ChallengeEntity) {
349+
func commitChallenge(id string) {
350+
351+
ctx := datastore.GetStore().CreateTransaction(context.TODO())
352+
defer ctx.Done()
353+
354+
tx := datastore.GetStore().GetTransaction(ctx)
355+
356+
var c *ChallengeEntity
357+
358+
if err := tx.Model(&ChallengeEntity{}).
359+
Where("challenge_id = ? and status = ?", id, Processed).
360+
Find(c).Error; err != nil {
361+
362+
logging.Logger.Error("[challenge]commit: ",
363+
zap.Any("challenge_id", id),
364+
zap.Error(err))
365+
366+
tx.Rollback()
367+
return
368+
}
245369

246370
startTime := time.Now()
247371

@@ -259,28 +383,25 @@ func commitChallenge(c *ChallengeEntity) {
259383
zap.String("validationTickets", string(c.ValidationTicketsString)),
260384
zap.String("ObjectPath", string(c.ObjectPathString)),
261385
zap.Error(err))
386+
tx.Rollback()
387+
return
262388
}
263389

264-
ctx := datastore.GetStore().CreateTransaction(context.TODO())
265-
defer ctx.Done()
266-
267-
db := datastore.GetStore().GetTransaction(ctx)
268-
269390
if err := c.CommitChallenge(ctx, false); err != nil {
270391
logging.Logger.Error("[challenge]commit",
271392
zap.String("challenge_id", c.ChallengeID),
272393
zap.Time("created", c.CreatedAt),
273394
zap.Error(err))
274-
db.Rollback()
395+
tx.Rollback()
275396
return
276397
}
277398

278-
if err := db.Commit().Error; err != nil {
399+
if err := tx.Commit().Error; err != nil {
279400
logging.Logger.Warn("[challenge]commit",
280401
zap.Any("challenge_id", c.ChallengeID),
281402
zap.Time("created", c.CreatedAt),
282403
zap.Error(err))
283-
db.Rollback()
404+
tx.Rollback()
284405
return
285406
}
286407

@@ -293,6 +414,7 @@ func commitChallenge(c *ChallengeEntity) {
293414
logging.Logger.Info("[challenge]elapsed:commit ",
294415
zap.String("challenge_id", c.ChallengeID),
295416
zap.Time("created", c.CreatedAt),
417+
zap.Time("start", startTime),
296418
zap.String("delay", startTime.Sub(c.CreatedAt).String()),
297419
zap.String("save", time.Since(startTime).String()))
298420

0 commit comments

Comments
 (0)