Skip to content

Commit ee7306d

Browse files
committed
Merge branch 'staging' into fix/panicking_on_wm
2 parents 490018d + 4b11189 commit ee7306d

File tree

17 files changed

+693
-473
lines changed

17 files changed

+693
-473
lines changed

code/go/0chain.net/blobbercore/allocation/entity.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,10 @@ func sizeInGB(size int64) float64 {
7676
}
7777

7878
// GetRequiredReadBalance Get tokens required to read the given size
79-
func (a *Allocation) GetRequiredReadBalance(blobberID string, readSize int64) (value float64) {
79+
func (a *Allocation) GetRequiredReadBalance(blobberID string, numBlocks int64) (value float64) {
8080
for _, d := range a.Terms {
8181
if d.BlobberID == blobberID {
82-
value = sizeInGB(readSize) * float64(d.ReadPrice)
82+
value = sizeInGB(numBlocks*CHUNK_SIZE) * float64(d.ReadPrice)
8383
break
8484
}
8585
}
@@ -101,8 +101,6 @@ type Pending struct {
101101
// ID of format client_id:allocation_id
102102
ID string `gorm:"column:id;primaryKey"`
103103
PendingWrite int64 `gorm:"column:pending_write;not null;default:0;"`
104-
PendingRead int64 `gorm:"column:pending_read;pending_read;not null;default:0"` // size
105-
106104
}
107105

108106
func (*Pending) TableName() string {
@@ -139,11 +137,11 @@ func GetPendingRead(db *gorm.DB, clientID, allocationID string) (pendingReadSize
139137
return
140138
}
141139

142-
func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite, pendingRead int64) (err error) {
140+
func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite int64) (err error) {
143141
key := clientID + ":" + allocationID
144142
// Lock is required because two process can simultaneously call this function and read pending data
145143
// thus giving same value leading to inconsistent data
146-
lock := pendingMapLock.GetLock(key)
144+
lock, _ := pendingMapLock.GetLock(key)
147145
lock.Lock()
148146
defer lock.Unlock()
149147

@@ -152,12 +150,10 @@ func AddToPending(db *gorm.DB, clientID, allocationID string, pendingWrite, pend
152150
switch {
153151
case err == nil:
154152
pending.PendingWrite += pendingWrite
155-
pending.PendingRead += pendingRead
156153
db.Save(pending)
157154
case errors.Is(err, gorm.ErrRecordNotFound):
158155
pending.ID = key
159156
pending.PendingWrite = pendingWrite
160-
pending.PendingRead = pendingRead
161157
db.Create(pending)
162158
default:
163159
return err

code/go/0chain.net/blobbercore/blobbergrpc/proto/blobber_contract.pb.go

Lines changed: 237 additions & 236 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

code/go/0chain.net/blobbercore/blobbergrpc/proto/blobber_contract.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ message ReadMarker {
206206
string allocation_id = 4;
207207
string owner_id = 5;
208208
int64 timestamp = 6;
209-
int64 read_size = 7;
209+
int64 read_counter = 7;
210210
string signature = 8;
211211
string payer_id = 9;
212212
bytes auth_ticket = 10;

code/go/0chain.net/blobbercore/convert/convert.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func ReadMarkerToReadMarkerGRPC(rm *readmarker.ReadMarker) *blobbergrpc.ReadMark
156156
AllocationId: rm.AllocationID,
157157
OwnerId: rm.OwnerID,
158158
Timestamp: int64(rm.Timestamp),
159-
ReadSize: rm.ReadSize,
159+
ReadCounter: rm.ReadCounter,
160160
Signature: rm.Signature,
161161
PayerId: rm.PayerID,
162162
AuthTicket: rm.AuthTicket,
@@ -175,7 +175,7 @@ func ReadMakerGRPCToReadMaker(rm *blobbergrpc.ReadMarker) *readmarker.ReadMarker
175175
AllocationID: rm.AllocationId,
176176
OwnerID: rm.OwnerId,
177177
Timestamp: common.Timestamp(rm.Timestamp),
178-
ReadSize: rm.ReadSize,
178+
ReadCounter: rm.ReadCounter,
179179
Signature: rm.Signature,
180180
PayerID: rm.PayerId,
181181
AuthTicket: rm.AuthTicket,

code/go/0chain.net/blobbercore/handler/download_integration_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestBlobberGRPCService_DownloadFile(t *testing.T) {
7373
ClientID: clientId,
7474
OwnerID: clientId,
7575
Timestamp: now,
76-
ReadSize: 64 * KB,
76+
ReadCounter: 1,
7777
}
7878

7979
rmSig, err := signScheme.Sign(encryption.Hash(rm.GetHashData()))

code/go/0chain.net/blobbercore/handler/handler.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,7 @@ func FileStatsHandler(ctx context.Context, r *http.Request) (interface{}, error)
261261
func DownloadHandler(ctx context.Context, r *http.Request) (interface{}, error) {
262262

263263
ctx = setupHandlerContext(ctx, r)
264-
response, err := storageHandler.DownloadFile(ctx, r)
265-
if err != nil {
266-
return nil, err
267-
}
268-
return response, nil
264+
return storageHandler.DownloadFile(ctx, r)
269265
}
270266

271267
/*ListHandler is the handler to respond to upload requests fro clients*/

code/go/0chain.net/blobbercore/handler/handler_download_test.go

Lines changed: 152 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func TestHandlers_Download(t *testing.T) {
221221
rm.BlobberID = ""
222222
rm.AllocationID = alloc.ID
223223
rm.OwnerID = ownerClient.ClientID
224-
rm.ReadSize = 64 * KB
224+
rm.ReadCounter = 1
225225
rm.Signature, err = signHash(ownerClient, rm.GetHash())
226226
if err != nil {
227227
t.Fatal(err)
@@ -299,7 +299,7 @@ func TestHandlers_Download(t *testing.T) {
299299
rm.ClientPublicKey = ownerClient.ClientKey
300300
rm.BlobberID = ""
301301
rm.AllocationID = alloc.ID
302-
rm.ReadSize = 64 * KB
302+
rm.ReadCounter = 1
303303
rm.OwnerID = ownerClient.ClientID
304304
rm.Signature, err = signHash(ownerClient, rm.GetHash())
305305
if err != nil {
@@ -364,17 +364,125 @@ func TestHandlers_Download(t *testing.T) {
364364
WithArgs(ownerClient.ClientID).
365365
WillReturnError(gorm.ErrRecordNotFound)
366366

367+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "read_markers" WHERE`)).
368+
WithArgs(ownerClient.ClientID).
369+
WillReturnRows(
370+
sqlmock.NewRows([]string{"client_id"}).
371+
AddRow(ownerClient.ClientID),
372+
)
373+
367374
aa := sqlmock.AnyArg()
368375

369-
mock.ExpectQuery(`INSERT INTO "read_markers"`).
370-
WithArgs(ownerClient.ClientID, ownerClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa, aa, aa, aa, aa).
371-
WillReturnRows(sqlmock.NewRows([]string{}))
376+
mock.ExpectExec(`UPDATE "read_markers"`).
377+
WithArgs(ownerClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa).
378+
WillReturnResult(sqlmock.NewResult(0, 0))
372379

373380
mock.ExpectCommit()
374381
},
375382
wantCode: http.StatusOK,
376383
wantBody: "\"bW9jaw==\"\n", //base64encoded for mock string
377384
},
385+
{
386+
name: "DownloadFile_file_return_stale_readmarker",
387+
args: args{
388+
w: httptest.NewRecorder(),
389+
r: func() *http.Request {
390+
handlerName := handlers["/v1/file/download/{allocation}"]
391+
url, err := router.Get(handlerName).URL("allocation", alloc.Tx)
392+
if err != nil {
393+
t.Fatal()
394+
}
395+
396+
remotePath := "/file.txt"
397+
398+
rm := &marker.ReadMarker{}
399+
rm.ClientID = ownerClient.ClientID
400+
rm.ClientPublicKey = ownerClient.ClientKey
401+
rm.BlobberID = ""
402+
rm.AllocationID = alloc.ID
403+
rm.ReadCounter = 1
404+
rm.OwnerID = ownerClient.ClientID
405+
rm.Signature, err = signHash(ownerClient, rm.GetHash())
406+
if err != nil {
407+
t.Fatal(err)
408+
}
409+
rmData, err := json.Marshal(rm)
410+
require.NoError(t, err)
411+
r, err := http.NewRequest(http.MethodGet, url.String(), nil)
412+
if err != nil {
413+
t.Fatal(err)
414+
}
415+
416+
hash := encryption.Hash(alloc.Tx)
417+
sign, err := sch.Sign(hash)
418+
if err != nil {
419+
t.Fatal(err)
420+
}
421+
422+
r.Header.Set("X-Path-Hash", fileref.GetReferenceLookup(alloc.Tx, remotePath))
423+
r.Header.Set("X-Block-Num", fmt.Sprintf("%d", 1))
424+
r.Header.Set("X-Read-Marker", string(rmData))
425+
r.Header.Set(common.ClientSignatureHeader, sign)
426+
r.Header.Set(common.ClientHeader, alloc.OwnerID)
427+
r.Header.Set(common.ClientKeyHeader, alloc.OwnerPublicKey)
428+
429+
return r
430+
}(),
431+
},
432+
alloc: alloc,
433+
setupDbMock: func(mock sqlmock.Sqlmock) {
434+
mock.ExpectBegin()
435+
436+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "allocations" WHERE`)).
437+
WithArgs(alloc.Tx).
438+
WillReturnRows(
439+
sqlmock.NewRows(
440+
[]string{
441+
"id", "tx", "expiration_date", "owner_public_key", "owner_id", "blobber_size",
442+
},
443+
).
444+
AddRow(
445+
alloc.ID, alloc.Tx, alloc.Expiration, alloc.OwnerPublicKey, alloc.OwnerID, int64(1<<30),
446+
),
447+
)
448+
449+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "terms" WHERE`)).
450+
WithArgs(alloc.ID).
451+
WillReturnRows(
452+
sqlmock.NewRows([]string{"id", "allocation_id"}).
453+
AddRow(alloc.Terms[0].ID, alloc.Terms[0].AllocationID),
454+
)
455+
456+
filePathHash := fileref.GetReferenceLookup(alloc.Tx, "/file.txt")
457+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "reference_objects" WHERE`)).
458+
WithArgs(alloc.ID, filePathHash).
459+
WillReturnRows(
460+
sqlmock.NewRows([]string{"path", "type", "lookup_hash", "content_hash"}).
461+
AddRow("/file.txt", "f", filePathHash, "abcd"),
462+
)
463+
464+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT count(*) FROM "collaborators" WHERE`)).
465+
WithArgs(ownerClient.ClientID).
466+
WillReturnError(gorm.ErrRecordNotFound)
467+
468+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "read_markers" WHERE`)).
469+
WithArgs(ownerClient.ClientID).
470+
WillReturnRows(
471+
sqlmock.NewRows([]string{"client_id", "counter"}).
472+
AddRow(ownerClient.ClientID, 23),
473+
)
474+
475+
aa := sqlmock.AnyArg()
476+
477+
mock.ExpectExec(`UPDATE "read_markers"`).
478+
WithArgs(ownerClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa).
479+
WillReturnResult(sqlmock.NewResult(0, 0))
480+
481+
mock.ExpectCommit()
482+
},
483+
wantCode: http.StatusBadRequest,
484+
wantBody: "{\"code\":\"stale_read_marker\",\"error\":\"stale_read_marker: \"}\n\n",
485+
},
378486
{
379487
name: "DownloadFile_Encrypted_Permission_Denied_Unshared_File",
380488
args: args{
@@ -399,7 +507,7 @@ func TestHandlers_Download(t *testing.T) {
399507
rm.ClientPublicKey = guestClient.ClientKey
400508
rm.BlobberID = ""
401509
rm.AllocationID = alloc.ID
402-
rm.ReadSize = 64 * KB
510+
rm.ReadCounter = 1
403511
rm.OwnerID = ownerClient.ClientID
404512
rm.Signature, err = signHash(guestClient, rm.GetHash())
405513
if err != nil {
@@ -511,7 +619,7 @@ func TestHandlers_Download(t *testing.T) {
511619
rm.ClientPublicKey = guestClient.ClientKey
512620
rm.BlobberID = ""
513621
rm.AllocationID = alloc.ID
514-
rm.ReadSize = 64 * KB
622+
rm.ReadCounter = 1
515623
rm.OwnerID = ownerClient.ClientID
516624
rm.Signature, err = signHash(guestClient, rm.GetHash())
517625
if err != nil {
@@ -610,11 +718,18 @@ func TestHandlers_Download(t *testing.T) {
610718
AddRow(reEncryptionKey, guestPublicEncryptedKey),
611719
)
612720

721+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "read_markers" WHERE`)).
722+
WithArgs(guestClient.ClientID).
723+
WillReturnRows(
724+
sqlmock.NewRows([]string{"client_id"}).
725+
AddRow(guestClient.ClientID),
726+
)
727+
613728
aa := sqlmock.AnyArg()
614729

615-
mock.ExpectQuery(`INSERT INTO "read_markers"`).
616-
WithArgs(guestClient.ClientID, guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa, aa, aa, aa, aa).
617-
WillReturnRows(sqlmock.NewRows([]string{}))
730+
mock.ExpectExec(`UPDATE "read_markers"`).
731+
WithArgs(guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa).
732+
WillReturnResult(sqlmock.NewResult(0, 0))
618733

619734
mock.ExpectCommit()
620735
},
@@ -645,7 +760,7 @@ func TestHandlers_Download(t *testing.T) {
645760
rm.ClientPublicKey = guestClient.ClientKey
646761
rm.BlobberID = ""
647762
rm.AllocationID = alloc.ID
648-
rm.ReadSize = 64 * KB
763+
rm.ReadCounter = 1
649764
rm.OwnerID = ownerClient.ClientID
650765
rm.Signature, err = signHash(guestClient, rm.GetHash())
651766
if err != nil {
@@ -749,11 +864,18 @@ func TestHandlers_Download(t *testing.T) {
749864
AddRow(reEncryptionKey, gpbk),
750865
)
751866

867+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "read_markers" WHERE`)).
868+
WithArgs(guestClient.ClientID).
869+
WillReturnRows(
870+
sqlmock.NewRows([]string{"client_id"}).
871+
AddRow(guestClient.ClientID),
872+
)
873+
752874
aa := sqlmock.AnyArg()
753875

754-
mock.ExpectQuery(`INSERT INTO "read_markers"`).
755-
WithArgs(guestClient.ClientID, guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa, aa, aa, aa, aa).
756-
WillReturnRows(sqlmock.NewRows([]string{}))
876+
mock.ExpectExec(`UPDATE "read_markers"`).
877+
WithArgs(guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa).
878+
WillReturnResult(sqlmock.NewResult(0, 0))
757879

758880
mock.ExpectCommit()
759881
},
@@ -784,7 +906,7 @@ func TestHandlers_Download(t *testing.T) {
784906
rm.ClientPublicKey = guestClient.ClientKey
785907
rm.BlobberID = ""
786908
rm.AllocationID = alloc.ID
787-
rm.ReadSize = 64 * KB
909+
rm.ReadCounter = 1
788910
rm.OwnerID = alloc.OwnerID
789911
rm.Signature, err = signHash(guestClient, rm.GetHash())
790912
if err != nil {
@@ -888,11 +1010,18 @@ func TestHandlers_Download(t *testing.T) {
8881010
AddRow(reEncryptionKey, gpbk),
8891011
)
8901012

1013+
mock.ExpectQuery(regexp.QuoteMeta(`SELECT * FROM "read_markers" WHERE`)).
1014+
WithArgs(guestClient.ClientID).
1015+
WillReturnRows(
1016+
sqlmock.NewRows([]string{"client_id"}).
1017+
AddRow(guestClient.ClientID),
1018+
)
1019+
8911020
aa := sqlmock.AnyArg()
8921021

893-
mock.ExpectQuery(`INSERT INTO "read_markers"`).
894-
WithArgs(guestClient.ClientID, guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa, aa, aa, aa, aa).
895-
WillReturnRows(sqlmock.NewRows([]string{}))
1022+
mock.ExpectExec(`UPDATE "read_markers"`).
1023+
WithArgs(guestClient.ClientKey, alloc.ID, alloc.OwnerID, aa, aa, aa, aa, aa, aa).
1024+
WillReturnResult(sqlmock.NewResult(0, 0))
8961025

8971026
mock.ExpectCommit()
8981027
},
@@ -923,7 +1052,7 @@ func TestHandlers_Download(t *testing.T) {
9231052
rm.ClientPublicKey = guestClient.ClientKey
9241053
rm.BlobberID = ""
9251054
rm.AllocationID = alloc.ID
926-
rm.ReadSize = 64 * KB
1055+
rm.ReadCounter = 1
9271056
rm.OwnerID = alloc.OwnerID
9281057
rm.Signature, err = signHash(guestClient, rm.GetHash())
9291058
if err != nil {
@@ -1019,6 +1148,10 @@ func TestHandlers_Download(t *testing.T) {
10191148
tests := append(positiveTests, negativeTests...)
10201149

10211150
for _, test := range tests {
1151+
if test.name != "DownloadFile_file_return_stale_readmarker" {
1152+
fmt.Printf("\n\nSkipping Test: %s\n\n", test.name)
1153+
continue
1154+
}
10221155
t.Run(test.name, func(t *testing.T) {
10231156
mock := datastore.MockTheStore(t)
10241157
test.setupDbMock(mock)

0 commit comments

Comments
 (0)