Skip to content

Commit c8e0516

Browse files
authored
gateway: add one more hierarchy to upload tmp/multiupload dir to reduce txn conflicts (#5549)
Signed-off-by: Changxin Miao <miaochangxin@step.ai>
1 parent 95f37c0 commit c8e0516

File tree

1 file changed

+77
-37
lines changed

1 file changed

+77
-37
lines changed

pkg/gateway/gateway.go

Lines changed: 77 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ import (
4848
)
4949

5050
const (
51-
sep = "/"
52-
metaBucket = ".sys"
51+
sep = "/"
52+
metaBucket = ".sys"
53+
subDirPrefix = 3 // 16^3=4096 slots
5354
)
5455

5556
var mctx meta.Context
@@ -186,10 +187,14 @@ func (n *jfsObjects) tpath(p ...string) string {
186187
}
187188

188189
func (n *jfsObjects) upath(bucket, uploadID string) string {
189-
return n.tpath(bucket, "uploads", uploadID)
190+
return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID)
190191
}
191192

192193
func (n *jfsObjects) ppath(bucket, uploadID, part string) string {
194+
return n.tpath(bucket, "uploads", uploadID[:subDirPrefix], uploadID, part)
195+
}
196+
197+
func (n *jfsObjects) ppathFlat(bucket, uploadID, part string) string { // compatible with tmp files uploaded by old versions(<1.2)
193198
return n.tpath(bucket, "uploads", uploadID, part)
194199
}
195200

@@ -539,7 +544,8 @@ func (n *jfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBu
539544
if minio.IsStringEqual(src, dst) {
540545
return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
541546
}
542-
tmp := n.tpath(dstBucket, "tmp", minio.MustGetUUID())
547+
uuid := minio.MustGetUUID()
548+
tmp := n.tpath(dstBucket, "tmp", uuid[:subDirPrefix], uuid)
543549
f, eno := n.fs.Create(mctx, tmp, 0666, n.gConf.Umask)
544550
if eno == syscall.ENOENT {
545551
_ = n.mkdirAll(ctx, path.Dir(tmp))
@@ -730,7 +736,8 @@ func (n *jfsObjects) mkdirAll(ctx context.Context, p string) error {
730736
}
731737

732738
func (n *jfsObjects) putObject(ctx context.Context, bucket, object string, r *minio.PutObjReader, opts minio.ObjectOptions, applyObjTaggingFunc func(tmpName string)) (err error) {
733-
tmpname := n.tpath(bucket, "tmp", minio.MustGetUUID())
739+
uuid := minio.MustGetUUID()
740+
tmpname := n.tpath(bucket, "tmp", uuid[:subDirPrefix], uuid)
734741
f, eno := n.fs.Create(mctx, tmpname, 0666, n.gConf.Umask)
735742
if eno == syscall.ENOENT {
736743
_ = n.mkdirAll(ctx, path.Dir(tmpname))
@@ -889,7 +896,7 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
889896
return // no found
890897
}
891898
defer f.Close(mctx)
892-
entries, eno := f.ReaddirPlus(mctx, 0)
899+
parents, eno := f.ReaddirPlus(mctx, 0)
893900
if eno != 0 {
894901
err = jfsToObjectErr(ctx, eno, bucket)
895902
return
@@ -900,22 +907,38 @@ func (n *jfsObjects) ListMultipartUploads(ctx context.Context, bucket string, pr
900907
lmi.MaxUploads = maxUploads
901908
lmi.Delimiter = delimiter
902909
commPrefixSet := make(map[string]struct{})
903-
for _, e := range entries {
904-
uploadID := string(e.Name)
905-
// todo: parallel
906-
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
910+
for _, p := range parents {
911+
f, eno := n.fs.Open(mctx, n.tpath(bucket, "uploads", string(p.Name)), 0)
907912
if eno != 0 {
908-
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
909-
continue
913+
return
914+
}
915+
defer f.Close(mctx)
916+
entries, eno := f.ReaddirPlus(mctx, 0)
917+
if eno != 0 {
918+
err = jfsToObjectErr(ctx, eno, bucket)
919+
return
910920
}
911-
object := string(object_)
912-
if strings.HasPrefix(object, prefix) {
913-
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
914-
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
915-
Object: object,
916-
UploadID: uploadID,
917-
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
918-
})
921+
922+
for _, e := range entries {
923+
if len(e.Name) != 36 {
924+
continue // not an uuid
925+
}
926+
uploadID := string(e.Name)
927+
// todo: parallel
928+
object_, eno := n.fs.GetXattr(mctx, n.upath(bucket, uploadID), uploadKeyName)
929+
if eno != 0 {
930+
logger.Warnf("get object xattr error %s: %s, ignore this item", n.upath(bucket, uploadID), eno)
931+
continue
932+
}
933+
object := string(object_)
934+
if strings.HasPrefix(object, prefix) {
935+
if keyMarker != "" && object+uploadID > keyMarker+uploadIDMarker || keyMarker == "" {
936+
lmi.Uploads = append(lmi.Uploads, minio.MultipartInfo{
937+
Object: object,
938+
UploadID: uploadID,
939+
Initiated: time.Unix(e.Attr.Atime, int64(e.Attr.Atimensec)),
940+
})
941+
}
919942
}
920943
}
921944
}
@@ -1080,6 +1103,10 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object
10801103
for _, part := range parts {
10811104
p := n.ppath(bucket, uploadID, strconv.Itoa(part.PartNumber))
10821105
copied, eno := n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30)
1106+
if eno == syscall.ENOENT { // try lookup from old path
1107+
p = n.ppathFlat(bucket, uploadID, strconv.Itoa(part.PartNumber))
1108+
copied, eno = n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30)
1109+
}
10831110
if eno != 0 {
10841111
err = jfsToObjectErr(ctx, eno, bucket, object, uploadID)
10851112
logger.Errorf("merge parts: %s", err)
@@ -1157,7 +1184,7 @@ func (n *jfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, u
11571184
}
11581185

11591186
func (n *jfsObjects) cleanup() {
1160-
for t := range time.Tick(24 * time.Hour) {
1187+
for range time.Tick(24 * time.Hour) {
11611188
// default bucket tmp dirs
11621189
tmpDirs := []string{".sys/tmp/", ".sys/uploads/"}
11631190
if n.gConf.MultiBucket {
@@ -1172,27 +1199,40 @@ func (n *jfsObjects) cleanup() {
11721199
}
11731200
}
11741201
for _, dir := range tmpDirs {
1175-
f, errno := n.fs.Open(mctx, dir, 0)
1176-
if errno != 0 {
1202+
n.cleanupDir(dir)
1203+
}
1204+
}
1205+
}
1206+
1207+
func (n *jfsObjects) cleanupDir(dir string) bool {
1208+
f, errno := n.fs.Open(mctx, dir, 0)
1209+
if errno != 0 {
1210+
return false
1211+
}
1212+
defer f.Close(mctx)
1213+
entries, _ := f.ReaddirPlus(mctx, 0)
1214+
now := time.Now()
1215+
deleted := 0
1216+
for _, entry := range entries {
1217+
dirPath := n.path(dir, string(entry.Name))
1218+
if entry.Attr.Typ == meta.TypeDirectory && len(entry.Name) == subDirPrefix {
1219+
if !n.cleanupDir(strings.TrimPrefix(dirPath, "/")) {
11771220
continue
11781221
}
1179-
entries, _ := f.ReaddirPlus(mctx, 0)
1180-
for _, entry := range entries {
1181-
if _, err := uuid.Parse(string(entry.Name)); err != nil {
1182-
continue
1183-
}
1184-
if t.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
1185-
p := n.path(dir, string(entry.Name))
1186-
if errno := n.fs.Rmr(mctx, p); errno != 0 {
1187-
logger.Errorf("failed to delete expired temporary files path: %s,", p)
1188-
} else {
1189-
logger.Infof("delete expired temporary files path: %s, mtime: %s", p, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
1190-
}
1191-
}
1222+
} else if _, err := uuid.Parse(string(entry.Name)); err != nil {
1223+
logger.Warnf("unexpected file path: %s", dirPath)
1224+
continue
1225+
}
1226+
if now.Sub(time.Unix(entry.Attr.Mtime, 0)) > 7*24*time.Hour {
1227+
if errno = n.fs.Rmr(mctx, dirPath); errno != 0 {
1228+
logger.Errorf("failed to delete expired temporary files path: %s, err: %s", dirPath, errno)
1229+
} else {
1230+
deleted += 1
1231+
logger.Infof("delete expired temporary files path: %s, mtime: %s", dirPath, time.Unix(entry.Attr.Mtime, 0).Format(time.RFC3339))
11921232
}
1193-
_ = f.Close(mctx)
11941233
}
11951234
}
1235+
return deleted == len(entries)
11961236
}
11971237

11981238
type jfsFLock struct {

0 commit comments

Comments
 (0)