-
Notifications
You must be signed in to change notification settings - Fork 23
Refactor UploadHandler and add ResumeUpload #166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0853905
f8ed134
dba705b
280928b
8705b0f
b497a94
afb6db2
d6d85b9
c66b448
aea90b5
4721c8e
f8d70ff
4c861be
306b6dd
c3161ef
bfa366c
86a8ee4
2246567
9951887
a1774d9
849936e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
blobber | ||
/config |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,9 @@ const ( | |
RENAME_OPERATION = "rename" | ||
COPY_OPERATION = "copy" | ||
UPDATE_ATTRS_OPERATION = "update_attrs" | ||
|
||
// RESUME_OPERATION upload operation for INIT/APPEND/FINALIZE | ||
RESUME_OPERATION = "resume" | ||
) | ||
|
||
const ( | ||
|
@@ -31,6 +34,7 @@ const ( | |
|
||
var OperationNotApplicable = common.NewError("operation_not_valid", "Not an applicable operation") | ||
|
||
// AllocationChangeProcessor request transaction of file operation. it is president in postgres, and can be rebuilt for next http reqeust(eg CommitHandler) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 155 characters is quite long, maybe you could split the line. |
||
type AllocationChangeProcessor interface { | ||
CommitToFileStore(ctx context.Context) error | ||
DeleteTempFile() error | ||
|
@@ -67,6 +71,9 @@ func (AllocationChange) TableName() string { | |
return "allocation_changes" | ||
} | ||
|
||
// GetAllocationChanges reload connection's changes in allocation from postgres. | ||
// 1. update connection's status with NewConnection if connection_id is not found in postgres | ||
// 2. mark as NewConnection if connection_id is marked as DeleteConnection | ||
func GetAllocationChanges(ctx context.Context, connectionID string, allocationID string, clientID string) (*AllocationChangeCollector, error) { | ||
cc := &AllocationChangeCollector{} | ||
db := datastore.GetStore().GetTransaction(ctx) | ||
|
@@ -104,14 +111,13 @@ func (cc *AllocationChangeCollector) Save(ctx context.Context) error { | |
db := datastore.GetStore().GetTransaction(ctx) | ||
if cc.Status == NewConnection { | ||
cc.Status = InProgressConnection | ||
err := db.Create(cc).Error | ||
return err | ||
} else { | ||
err := db.Save(cc).Error | ||
return err | ||
return db.Create(cc).Error | ||
} | ||
|
||
return db.Session(&gorm.Session{FullSaveAssociations: true}).Updates(cc).Error | ||
} | ||
|
||
// ComputeProperties unmarshal all ChangeProcesses from postgres | ||
func (cc *AllocationChangeCollector) ComputeProperties() { | ||
cc.AllocationChanges = make([]AllocationChangeProcessor, 0, len(cc.Changes)) | ||
for _, change := range cc.Changes { | ||
|
@@ -129,6 +135,8 @@ func (cc *AllocationChangeCollector) ComputeProperties() { | |
acp = new(CopyFileChange) | ||
case UPDATE_ATTRS_OPERATION: | ||
acp = new(AttributesChange) | ||
case RESUME_OPERATION: | ||
acp = new(ResumeFileChange) | ||
} | ||
|
||
if acp == nil { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ package allocation | |
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"path/filepath" | ||
"strings" | ||
|
||
|
@@ -14,33 +15,39 @@ import ( | |
) | ||
|
||
type NewFileChange struct { | ||
ConnectionID string `json:"connection_id" validation:"required"` | ||
AllocationID string `json:"allocation_id"` | ||
Filename string `json:"filename" validation:"required"` | ||
ThumbnailFilename string `json:"thumbnail_filename"` | ||
Path string `json:"filepath" validation:"required"` | ||
Size int64 `json:"size"` | ||
Hash string `json:"content_hash,omitempty"` | ||
ThumbnailSize int64 `json:"thumbnail_size"` | ||
ThumbnailHash string `json:"thumbnail_content_hash,omitempty"` | ||
MerkleRoot string `json:"merkle_root,omitempty"` | ||
ActualHash string `json:"actual_hash,omitempty" validation:"required"` | ||
ActualSize int64 `json:"actual_size,omitempty" validation:"required"` | ||
ActualThumbnailSize int64 `json:"actual_thumb_size"` | ||
ActualThumbnailHash string `json:"actual_thumb_hash"` | ||
MimeType string `json:"mimetype,omitempty"` | ||
EncryptedKey string `json:"encrypted_key,omitempty"` | ||
CustomMeta string `json:"custom_meta,omitempty"` | ||
Attributes reference.Attributes `json:"attributes,omitempty"` | ||
|
||
// IsResumable the request is resumable upload | ||
IsResumable bool `json:"is_resumable,omitempty"` | ||
// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. | ||
UploadLength int64 `json:"upload_length,omitempty"` | ||
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. | ||
UploadOffset int64 `json:"upload_offset,omitempty"` | ||
// IsFinal the request is final chunk | ||
IsFinal bool `json:"is_final,omitempty"` | ||
//client side: unmarshal them from 'updateMeta'/'uploadMeta' | ||
ConnectionID string `json:"connection_id" validation:"required"` | ||
//client side: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This style is confusing to read. I suggest grouping the members under one |
||
Filename string `json:"filename" validation:"required"` | ||
//client side: | ||
Path string `json:"filepath" validation:"required"` | ||
//client side: | ||
ActualHash string `json:"actual_hash,omitempty" validation:"required"` | ||
//client side: | ||
ActualSize int64 `json:"actual_size,omitempty" validation:"required"` | ||
//client side: | ||
ActualThumbnailSize int64 `json:"actual_thumb_size"` | ||
//client side: | ||
ActualThumbnailHash string `json:"actual_thumb_hash"` | ||
//client side: | ||
MimeType string `json:"mimetype,omitempty"` | ||
//client side: | ||
Attributes reference.Attributes `json:"attributes,omitempty"` | ||
//client side: | ||
MerkleRoot string `json:"merkle_root,omitempty"` | ||
|
||
//server side: update them by ChangeProcessor | ||
AllocationID string `json:"allocation_id"` | ||
//client side: | ||
Hash string `json:"content_hash,omitempty"` | ||
Size int64 `json:"size"` | ||
//server side: | ||
ThumbnailHash string `json:"thumbnail_content_hash,omitempty"` | ||
ThumbnailSize int64 `json:"thumbnail_size"` | ||
ThumbnailFilename string `json:"thumbnail_filename"` | ||
|
||
EncryptedKey string `json:"encrypted_key,omitempty"` | ||
CustomMeta string `json:"custom_meta,omitempty"` | ||
} | ||
|
||
func (nf *NewFileChange) ProcessChange(ctx context.Context, | ||
|
@@ -101,6 +108,7 @@ func (nf *NewFileChange) ProcessChange(ctx context.Context, | |
newFile.LookupHash = reference.GetReferenceLookup(dirRef.AllocationID, nf.Path) | ||
newFile.Size = nf.Size | ||
newFile.MimeType = nf.MimeType | ||
fmt.Println(allocationRoot) | ||
newFile.WriteMarker = allocationRoot | ||
newFile.ThumbnailHash = nf.ThumbnailHash | ||
newFile.ThumbnailSize = nf.ThumbnailSize | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
package allocation | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"path/filepath" | ||
"strings" | ||
|
||
"0chain.net/blobbercore/filestore" | ||
"0chain.net/blobbercore/reference" | ||
"0chain.net/blobbercore/stats" | ||
"0chain.net/blobbercore/util" | ||
|
||
"0chain.net/core/common" | ||
gosdk "github.com/0chain/gosdk/core/util" | ||
) | ||
|
||
// ResumeFileChange file change processor for continuous upload in INIT/APPEND/FINALIZE | ||
type ResumeFileChange struct { | ||
NewFileChange | ||
|
||
ShardHash string `json:"shard_hash,omitempty"` | ||
MerkleHasher gosdk.StreamMerkleHasher `json:"hasher,omitempty"` // streaming merkle hasher to save current state of tree | ||
IsFinal bool `json:"is_final,omitempty"` // current chunk is last or not | ||
ChunkIndex int `json:"chunk_index,omitempty"` // the seq of current chunk. all chunks MUST be uploaded one by one because of streaming merkle hash | ||
UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to | ||
} | ||
|
||
// ProcessChange update references, and create a new FileRef | ||
func (nf *ResumeFileChange) ProcessChange(ctx context.Context, | ||
change *AllocationChange, allocationRoot string) (*reference.Ref, error) { | ||
|
||
path, _ := filepath.Split(nf.Path) | ||
path = filepath.Clean(path) | ||
tSubDirs := reference.GetSubDirsFromPath(path) | ||
|
||
rootRef, err := reference.GetReferencePath(ctx, nf.AllocationID, nf.Path) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
dirRef := rootRef | ||
treelevel := 0 | ||
for { | ||
found := false | ||
for _, child := range dirRef.Children { | ||
if child.Type == reference.DIRECTORY && treelevel < len(tSubDirs) { | ||
if child.Name == tSubDirs[treelevel] { | ||
dirRef = child | ||
found = true | ||
break | ||
} | ||
} | ||
} | ||
if found { | ||
treelevel++ | ||
continue | ||
} | ||
if len(tSubDirs) > treelevel { | ||
newRef := reference.NewDirectoryRef() | ||
newRef.AllocationID = dirRef.AllocationID | ||
newRef.Path = "/" + strings.Join(tSubDirs[:treelevel+1], "/") | ||
newRef.ParentPath = "/" + strings.Join(tSubDirs[:treelevel], "/") | ||
newRef.Name = tSubDirs[treelevel] | ||
newRef.LookupHash = reference.GetReferenceLookup(dirRef.AllocationID, newRef.Path) | ||
dirRef.AddChild(newRef) | ||
dirRef = newRef | ||
treelevel++ | ||
continue | ||
} else { | ||
break | ||
} | ||
} | ||
|
||
var newFile = reference.NewFileRef() | ||
newFile.ActualFileHash = nf.ActualHash | ||
newFile.ActualFileSize = nf.ActualSize | ||
newFile.AllocationID = dirRef.AllocationID | ||
newFile.ContentHash = nf.Hash | ||
newFile.CustomMeta = nf.CustomMeta | ||
newFile.MerkleRoot = nf.MerkleRoot | ||
newFile.Name = nf.Filename | ||
newFile.ParentPath = dirRef.Path | ||
newFile.Path = nf.Path | ||
newFile.LookupHash = reference.GetReferenceLookup(dirRef.AllocationID, nf.Path) | ||
newFile.Size = nf.Size | ||
newFile.MimeType = nf.MimeType | ||
fmt.Println(allocationRoot) | ||
newFile.WriteMarker = allocationRoot | ||
newFile.ThumbnailHash = nf.ThumbnailHash | ||
newFile.ThumbnailSize = nf.ThumbnailSize | ||
newFile.ActualThumbnailHash = nf.ActualThumbnailHash | ||
newFile.ActualThumbnailSize = nf.ActualThumbnailSize | ||
newFile.EncryptedKey = nf.EncryptedKey | ||
|
||
if err = newFile.SetAttributes(&nf.Attributes); err != nil { | ||
return nil, common.NewErrorf("process_new_file_change", | ||
"setting file attributes: %v", err) | ||
} | ||
|
||
dirRef.AddChild(newFile) | ||
if _, err := rootRef.CalculateHash(ctx, true); err != nil { | ||
return nil, err | ||
} | ||
stats.NewFileCreated(ctx, newFile.ID) | ||
return rootRef, nil | ||
} | ||
|
||
// Marshal marshal and change to persistent to postgres | ||
func (nf *ResumeFileChange) Marshal() (string, error) { | ||
ret, err := json.Marshal(nf) | ||
if err != nil { | ||
return "", err | ||
} | ||
return string(ret), nil | ||
} | ||
|
||
// Unmarshal reload and unmarshal change from allocation_changes.input on postgres | ||
func (nf *ResumeFileChange) Unmarshal(input string) error { | ||
if err := json.Unmarshal([]byte(input), nf); err != nil { | ||
return err | ||
} | ||
|
||
return util.UnmarshalValidation(nf) | ||
} | ||
|
||
// DeleteTempFile delete temp files from allocation's temp dir | ||
func (nf *ResumeFileChange) DeleteTempFile() error { | ||
fileInputData := &filestore.FileInputData{} | ||
fileInputData.Name = nf.Filename | ||
fileInputData.Path = nf.Path | ||
fileInputData.Hash = nf.Hash | ||
err := filestore.GetFileStore().DeleteTempFile(nf.AllocationID, fileInputData, nf.ConnectionID) | ||
if nf.ThumbnailSize > 0 { | ||
fileInputData := &filestore.FileInputData{} | ||
fileInputData.Name = nf.ThumbnailFilename | ||
fileInputData.Path = nf.Path | ||
fileInputData.Hash = nf.ThumbnailHash | ||
err = filestore.GetFileStore().DeleteTempFile(nf.AllocationID, fileInputData, nf.ConnectionID) | ||
} | ||
return err | ||
} | ||
|
||
// CommitToFileStore move files from temp dir to object dir | ||
func (nf *ResumeFileChange) CommitToFileStore(ctx context.Context) error { | ||
fileInputData := &filestore.FileInputData{} | ||
fileInputData.Name = nf.Filename | ||
fileInputData.Path = nf.Path | ||
fileInputData.Hash = nf.Hash | ||
_, err := filestore.GetFileStore().CommitWrite(nf.AllocationID, fileInputData, nf.ConnectionID) | ||
if err != nil { | ||
return common.NewError("file_store_error", "Error committing to file store. "+err.Error()) | ||
} | ||
if nf.ThumbnailSize > 0 { | ||
fileInputData := &filestore.FileInputData{} | ||
fileInputData.Name = nf.ThumbnailFilename | ||
fileInputData.Path = nf.Path | ||
fileInputData.Hash = nf.ThumbnailHash | ||
_, err := filestore.GetFileStore().CommitWrite(nf.AllocationID, fileInputData, nf.ConnectionID) | ||
if err != nil { | ||
return common.NewError("file_store_error", "Error committing thumbnail to file store. "+err.Error()) | ||
} | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove.
Any scripts for building blobber should not use go source directories for executables please put them somewhere else.
If its just your own private build for debugging, then you should keep them private and not push to the main repository.