Skip to content

Commit 2acdd9d

Browse files
committed
feat: Instant rename
1 parent 5bf51d1 commit 2acdd9d

File tree

15 files changed

+456
-47
lines changed

15 files changed

+456
-47
lines changed

core/aws_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func (s *AwsTest) SetUpSuite(t *C) {
4242
func (s *AwsTest) TestRegionDetection(t *C) {
4343
s.s3.bucket = "goofys-eu-west-1.kahing.xyz"
4444

45-
if TigrisDetected(s.s3.flags) {
45+
if TigrisDetectedForTests(s.s3.flags) {
4646
t.Skip("Not relevant for Tigris detected")
4747
}
4848

@@ -55,7 +55,7 @@ func (s *AwsTest) TestRegionDetection(t *C) {
5555
func (s *AwsTest) TestBucket404(t *C) {
5656
s.s3.bucket = RandStringBytesMaskImprSrc(63)
5757

58-
if TigrisDetected(s.s3.flags) {
58+
if TigrisDetectedForTests(s.s3.flags) {
5959
t.Skip("Not relevant for Tigris detected")
6060
}
6161

core/backend.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import (
2828
type Capabilities struct {
2929
MaxMultipartSize uint64
3030
// indicates that the blob store has native support for directories
31-
DirBlob bool
32-
Name string
31+
DirBlob bool
32+
Name string
33+
IsTigris bool
3334
}
3435

3536
type HeadBlobInput struct {

core/backend_s3.go

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ func (s *S3Backend) detectBucketLocationByHEAD() (err error, isAws bool) {
332332
isAws = true
333333
}
334334

335+
if server != nil && strings.Contains(server[0], "Tigris") {
336+
s.cap.IsTigris = true
337+
}
338+
335339
switch resp.StatusCode {
336340
case 200:
337341
// note that this only happen if the bucket is in us-east-1
@@ -408,7 +412,7 @@ func (s *S3Backend) Init(key string) error {
408412
}
409413

410414
if !s.config.RegionSet {
411-
_, _ = s.detectBucketLocationByHEAD()
415+
_, isAws = s.detectBucketLocationByHEAD()
412416
// if err == nil {
413417
// we detected a region header, this is probably AWS S3,
414418
// or we can use anonymous access, or both
@@ -809,7 +813,46 @@ func (s *S3Backend) DeleteBlobs(param *DeleteBlobsInput) (*DeleteBlobsOutput, er
809813
}
810814

811815
func (s *S3Backend) RenameBlob(param *RenameBlobInput) (*RenameBlobOutput, error) {
812-
return nil, syscall.ENOTSUP
816+
from := s.bucket + "/" + param.Source
817+
818+
params := &s3.CopyObjectInput{
819+
Bucket: &s.bucket,
820+
CopySource: aws.String(pathEscape(from)),
821+
Key: &param.Destination,
822+
MetadataDirective: aws.String(s3.MetadataDirectiveCopy),
823+
}
824+
825+
S3Debug(s3Log, params, "RenameObject")
826+
827+
if s.config.UseSSE {
828+
params.ServerSideEncryption = &s.sseType
829+
if s.config.UseKMS && s.config.KMSKeyID != "" {
830+
params.SSEKMSKeyId = &s.config.KMSKeyID
831+
}
832+
} else if s.config.SseC != "" {
833+
params.SSECustomerAlgorithm = PString("AES256")
834+
params.SSECustomerKey = &s.config.SseC
835+
params.SSECustomerKeyMD5 = &s.config.SseCDigest
836+
params.CopySourceSSECustomerAlgorithm = PString("AES256")
837+
params.CopySourceSSECustomerKey = &s.config.SseC
838+
params.CopySourceSSECustomerKeyMD5 = &s.config.SseCDigest
839+
}
840+
841+
if s.config.ACL != "" {
842+
params.ACL = &s.config.ACL
843+
}
844+
845+
req, _ := s.CopyObjectRequest(params)
846+
847+
withHeader(req, "X-Tigris-Rename", "true")
848+
849+
err := req.Send()
850+
if err != nil {
851+
s3Log.Warn().Interface("params", params).Err(err).Msg("RenameObject failed")
852+
return nil, err
853+
}
854+
855+
return &RenameBlobOutput{s.getRequestId(req)}, nil
813856
}
814857

815858
func (s *S3Backend) mpuCopyPart(from string, to string, mpuId string, bytes string, part int64, srcEtag *string) (*string, error) {

core/cfg/config.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,9 @@ type FlagStorage struct {
131131

132132
TigrisPrefetch bool
133133
TigrisListContent bool
134+
TigrisRename bool
135+
136+
IsTigris bool
134137
}
135138

136139
func (flags *FlagStorage) GetMimeType(fileName string) (retMime *string) {
@@ -163,9 +166,9 @@ func (flags *FlagStorage) Cleanup() {
163166
}
164167
}
165168

166-
func (flags *FlagStorage) IsTigris() bool {
167-
return strings.Contains(flags.Endpoint, "tigris.dev") ||
168-
strings.Contains(flags.Endpoint, "storage.dev")
169+
func (flags *FlagStorage) IsTigrisEndpoint() {
170+
flags.IsTigris = strings.Contains(flags.Endpoint, ".tigris.dev") ||
171+
strings.Contains(flags.Endpoint, ".storage.dev")
169172
}
170173

171174
var defaultHTTPTransport = http.Transport{

core/cfg/flags.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,6 @@ MISC OPTIONS:
147147
Usage: "Drop root group and change to this group ID (defaults to --gid).",
148148
},
149149

150-
cli.BoolFlag{
151-
Name: "tigris-prefetch",
152-
Usage: "Enable Tigris prefetch on list (default: off)",
153-
},
154-
155-
cli.BoolFlag{
156-
Name: "tigris-list-content",
157-
Usage: "Include inlined objects content in list (default: on)",
158-
},
159-
160150
cli.BoolFlag{
161151
Name: "refresh-dirs",
162152
Usage: "Automatically refresh open directories using notifications under Windows",
@@ -660,6 +650,21 @@ MISC OPTIONS:
660650
Value: 512,
661651
Usage: "Simultaneously opened cache file descriptor limit",
662652
},
653+
654+
cli.BoolFlag{
655+
Name: "tigris-prefetch",
656+
Usage: "Enable Tigris prefetch on list (default: off)",
657+
},
658+
659+
cli.BoolFlag{
660+
Name: "tigris-list-content",
661+
Usage: "Include inlined objects content in list (default: on)",
662+
},
663+
664+
cli.BoolFlag{
665+
Name: "no-instant-rename",
666+
Usage: "Disable Tigris 'instant' rename (default: off)",
667+
},
663668
}
664669

665670
if runtime.GOOS == "windows" {
@@ -956,6 +961,7 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
956961
ClusterGrpcReflection: c.Bool("grpc-reflection"),
957962

958963
TigrisPrefetch: c.Bool("tigris-prefetch"),
964+
TigrisRename: !c.Bool("no-tigris-rename"),
959965
TigrisListContent: c.Bool("tigris-list-content"),
960966
}
961967

@@ -1001,8 +1007,10 @@ func PopulateFlags(c *cli.Context) (ret *FlagStorage) {
10011007
panic("Unknown --iam-flavor: " + config.IAMFlavor)
10021008
}
10031009

1010+
flags.IsTigrisEndpoint()
1011+
10041012
// special enabled for the Tigris by default
1005-
if flags.IsTigris() {
1013+
if flags.IsTigris {
10061014
flags.EnableSpecials = !c.IsSet("no-specials")
10071015
}
10081016

@@ -1140,5 +1148,6 @@ func DefaultFlags() *FlagStorage {
11401148
},
11411149
TigrisPrefetch: false,
11421150
TigrisListContent: true,
1151+
TigrisRename: true,
11431152
}
11441153
}

core/cfg/logger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,12 @@ func InitLoggers(flags *FlagStorage) error {
4242
Format: flags.LogFormat,
4343
}
4444

45-
if (lib.IsTTY(os.Stdout) || lib.IsTTY(os.Stderr)) && log.DefaultLogConfig.Format == "" && lf == "stderr" {
45+
if (lib.IsTTY(os.Stdout) || lib.IsTTY(os.Stderr)) && log.DefaultLogConfig.Format == "" && (lf == "stderr" || lf == "syslog") {
4646
log.DefaultLogConfig.Format = "console"
4747
}
4848

4949
log.DefaultLogConfig.Color = true
50-
if flags.NoLogColor {
50+
if flags.NoLogColor || lf == "syslog" {
5151
log.DefaultLogConfig.Color = false
5252
}
5353

core/file.go

Lines changed: 146 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -768,14 +768,145 @@ func (inode *Inode) sendUpload(priority int) bool {
768768
return false
769769
}
770770

771-
func (inode *Inode) sendRename() {
772-
cloud, key := inode.cloud()
773-
if inode.isDir() {
774-
key += "/"
771+
func (inode *Inode) finishErrorSendRenameSpecial(err error, from string, key string, oldParent *Inode, oldName string) {
772+
mappedErr := mapAwsError(err)
773+
if mappedErr == syscall.ENOENT || mappedErr == syscall.ERANGE {
774+
s3Log.Warnf("Conflict detected (inode %v): failed to copy %v to %v: %v. File is removed remotely, dropping cache", inode.Id, from, key, err)
775+
inode.mu.Lock()
776+
newParent := inode.Parent
777+
oldParent := inode.oldParent
778+
oldName := inode.oldName
779+
inode.oldParent = nil
780+
inode.oldName = ""
781+
inode.renamingTo = false
782+
inode.resetCache()
783+
inode.mu.Unlock()
784+
newParent.removeChild(inode)
785+
if oldParent != nil {
786+
oldParent.mu.Lock()
787+
if _, ok := oldParent.dir.DeletedChildren[oldName]; ok {
788+
delete(oldParent.dir.DeletedChildren, oldName)
789+
oldParent.addModified(-1)
790+
}
791+
oldParent.mu.Unlock()
792+
}
793+
} else {
794+
fuseLog.Warnf("Failed to copy %v to %v (rename): %v", from, key, err)
795+
inode.mu.Lock()
796+
inode.recordFlushError(err)
797+
if inode.Parent == oldParent && inode.Name == oldName {
798+
// Someone renamed the inode back to the original name
799+
// ...while we failed to copy it :)
800+
inode.oldParent = nil
801+
inode.oldName = ""
802+
inode.renamingTo = false
803+
inode.Parent.addModified(-1)
804+
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
805+
!inode.isStillDirty() {
806+
inode.SetCacheState(ST_CACHED)
807+
inode.SetAttrTime(time.Now())
808+
}
809+
}
810+
inode.mu.Unlock()
811+
}
812+
}
813+
814+
func (inode *Inode) finishSuccessSendRenameSpecial(from string, key string, oldParent *Inode, oldName string, newParent *Inode, newName string) {
815+
fuseLog.Debugf("Renamed %v to %v (rename)", from, key)
816+
inode.mu.Lock()
817+
818+
// Now we know that the object is accessible by the new name
819+
if inode.Parent == newParent && inode.Name == newName {
820+
// Just clear the old path
821+
inode.oldParent = nil
822+
inode.oldName = ""
823+
} else if inode.Parent == oldParent && inode.Name == oldName {
824+
// Someone renamed the inode back to the original name(!)
825+
inode.oldParent = nil
826+
inode.oldName = ""
827+
// Delete the new key instead of the old one (?)
828+
} else {
829+
// Someone renamed the inode again(!)
830+
inode.oldParent = newParent
831+
inode.oldName = newName
775832
}
833+
if (inode.CacheState == ST_MODIFIED || inode.CacheState == ST_CREATED) &&
834+
!inode.isStillDirty() {
835+
inode.SetCacheState(ST_CACHED)
836+
inode.SetAttrTime(time.Now())
837+
}
838+
inode.renamingTo = false
839+
inode.mu.Unlock()
840+
841+
oldParent.mu.Lock()
842+
delete(oldParent.dir.DeletedChildren, oldName)
843+
oldParent.mu.Unlock()
844+
// And track ModifiedChildren because rename is special - it takes two parents
845+
oldParent.addModified(-1)
846+
}
847+
848+
func (inode *Inode) finishRenameFlush() {
849+
inode.mu.Lock()
850+
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
851+
atomic.AddInt64(&inode.fs.activeFlushers, -1)
852+
inode.fs.WakeupFlusher()
853+
inode.mu.Unlock()
854+
}
855+
856+
func (inode *Inode) startRenameFlush() {
776857
inode.IsFlushing += inode.fs.flags.MaxParallelParts
777858
atomic.AddInt64(&inode.fs.stats.flushes, 1)
778859
atomic.AddInt64(&inode.fs.activeFlushers, 1)
860+
}
861+
862+
func (inode *Inode) sendRenameSpecial() {
863+
if inode.isDir() && inode.fs.flags.NoDirObject {
864+
return
865+
}
866+
867+
cloud, key := inode.cloud()
868+
_, from := inode.oldParent.cloud()
869+
870+
from = appendChildName(from, inode.oldName)
871+
inode.renamingTo = true
872+
oldParent := inode.oldParent
873+
oldName := inode.oldName
874+
newParent := inode.Parent
875+
newName := inode.Name
876+
if inode.isDir() {
877+
from += "/"
878+
key += "/"
879+
}
880+
881+
inode.startRenameFlush()
882+
883+
go func() {
884+
inode.fs.addInflightChange(key)
885+
_, err := cloud.RenameBlob(&RenameBlobInput{
886+
Source: from,
887+
Destination: key,
888+
})
889+
inode.fs.completeInflightChange(key)
890+
if err != nil {
891+
mappedErr := mapAwsError(err)
892+
if mappedErr != syscall.ENOENT || !inode.isDir() {
893+
inode.finishErrorSendRenameSpecial(err, from, key, oldParent, oldName)
894+
} else {
895+
inode.finishSuccessSendRenameSpecial(from, key, oldParent, oldName, newParent, newName)
896+
}
897+
} else {
898+
inode.finishSuccessSendRenameSpecial(from, key, oldParent, oldName, newParent, newName)
899+
}
900+
inode.finishRenameFlush()
901+
}()
902+
}
903+
904+
func (inode *Inode) sendRenameCopy() {
905+
cloud, key := inode.cloud()
906+
if inode.isDir() {
907+
key += "/"
908+
}
909+
inode.startRenameFlush()
779910
_, from := inode.oldParent.cloud()
780911
from = appendChildName(from, inode.oldName)
781912
oldParent := inode.oldParent
@@ -914,14 +1045,20 @@ func (inode *Inode) sendRename() {
9141045
}
9151046
}
9161047
}
917-
inode.mu.Lock()
918-
inode.IsFlushing -= inode.fs.flags.MaxParallelParts
919-
atomic.AddInt64(&inode.fs.activeFlushers, -1)
920-
inode.fs.WakeupFlusher()
921-
inode.mu.Unlock()
1048+
inode.finishRenameFlush()
9221049
}()
9231050
}
9241051

1052+
func (inode *Inode) sendRename() {
1053+
flags := inode.fs.flags
1054+
cloud := *inode.fs.cloud.Load()
1055+
if cloud.Capabilities().IsTigris && flags.TigrisRename {
1056+
inode.sendRenameSpecial()
1057+
return
1058+
}
1059+
inode.sendRenameCopy()
1060+
}
1061+
9251062
func (inode *Inode) sendUpdateMeta() {
9261063
// Update metadata by COPYing into the same object
9271064
// It results in the optimized implementation in S3

0 commit comments

Comments
 (0)