Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
353 changes: 351 additions & 2 deletions pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5059,8 +5059,357 @@ func (m *redisMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name strin
}

func (m *redisMeta) doBatchClone(ctx Context, srcParent Ino, dstParent Ino, entries []*Entry, cmode uint8, cumask uint16, result *batchCloneResult) syscall.Errno {
// TODO: Implement batch clone for Redis backend
return syscall.ENOTSUP
type cloneInfo struct {
entry *Entry
srcIno Ino
dstIno Ino
srcAttr Attr
dstAttr Attr
srcXattr map[string]string
}
type chunkData struct {
vals []string
slices []*slice
}
type sourceData struct {
attr Attr
xattr map[string]string
sym string
chunks []chunkData
}

if len(entries) == 0 {
return 0
}
if result != nil {
*result = batchCloneResult{}
}

accumulateFallbackResult := func(attr Attr) {
if result == nil {
return
}
uid, gid := attr.Uid, attr.Gid
if cmode&CLONE_MODE_PRESERVE_ATTR == 0 {
uid = ctx.Uid()
gid = ctx.Gid()
}
space := align4K(attr.Length)
result.length += int64(attr.Length)
result.space += space
result.inodes++
result.userGroupQuotas = append(result.userGroupQuotas, userGroupQuotaDelta{
Uid: uid,
Gid: gid,
Space: space,
Inodes: 1,
})
}

fallbackEntries := func(remaining []*Entry) syscall.Errno {
for _, e := range remaining {
dstIno, err := m.nextInode()
if err != nil {
return errno(err)
}
var srcAttr Attr
st := m.doCloneEntry(ctx, e.Inode, dstParent, string(e.Name), dstIno, &srcAttr, cmode, cumask, false)
if st == syscall.ENOENT {
continue
}
if st != 0 {
return st
}
accumulateFallbackResult(srcAttr)
}
return 0
}

const batchSize = 1000
for start := 0; start < len(entries); start += batchSize {
end := start + batchSize
if end > len(entries) {
end = len(entries)
}
batch := entries[start:end]

infos := make([]*cloneInfo, 0, len(batch))
nameSet := make(map[string]struct{}, len(batch))
srcSet := make(map[Ino]struct{}, len(batch))
srcList := make([]Ino, 0, len(batch))
for _, e := range batch {
name := string(e.Name)
if _, ok := nameSet[name]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shouldn’t happen, we can log a warning and continue.

return syscall.EEXIST
}
nameSet[name] = struct{}{}
dstIno, err := m.nextInode()
if err != nil {
return errno(err)
}
info := &cloneInfo{
entry: e,
srcIno: e.Inode,
dstIno: dstIno,
}
infos = append(infos, info)
if _, ok := srcSet[e.Inode]; !ok {
srcSet[e.Inode] = struct{}{}
srcList = append(srcList, e.Inode)
}
}

watchKeys := make([]string, 0, len(srcList)*2+2)
watchKeys = append(watchKeys, m.inodeKey(dstParent), m.entryKey(dstParent))
for _, ino := range srcList {
watchKeys = append(watchKeys, m.inodeKey(ino), m.xattrKey(ino))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many files being watched (inode keys), and continuously retrying (default 50 times) in batches may not be acceptable. We should fail fast (just try once?) and then fall back.

}

var batchResult batchCloneResult
err := m.txn(ctx, func(tx *redis.Tx) error {
now := time.Now()
var pattr Attr
pval, err := tx.Get(ctx, m.inodeKey(dstParent)).Bytes()
if err == redis.Nil {
return syscall.ENOENT
}
if err != nil {
return err
}
m.parseAttr(pval, &pattr)
if pattr.Typ != TypeDirectory {
return syscall.ENOTDIR
}
if (pattr.Flags & FlagImmutable) != 0 {
return syscall.EPERM
}
if st := m.Access(ctx, dstParent, MODE_MASK_W|MODE_MASK_X, &pattr); st != 0 {
return st
}

existsPipe := tx.Pipeline()
existsCmds := make([]*redis.BoolCmd, 0, len(infos))
for _, info := range infos {
existsCmds = append(existsCmds, existsPipe.HExists(ctx, m.entryKey(dstParent), string(info.entry.Name)))
}
if _, err := existsPipe.Exec(ctx); err != nil {
return err
}
for _, cmd := range existsCmds {
exist, err := cmd.Result()
if err != nil {
return err
}
if exist {
return syscall.EEXIST
}
}

srcKeys := make([]string, 0, len(srcList))
for _, ino := range srcList {
srcKeys = append(srcKeys, m.inodeKey(ino))
}
srcVals, err := tx.MGet(ctx, srcKeys...).Result()
if err != nil {
return err
}
srcData := make(map[Ino]*sourceData, len(srcList))
for i, v := range srcVals {
if v == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for deleted source entry, just skip them?

// Deleted source entry: fallback to per-entry clone path.
return syscall.ENOTSUP
}
var a Attr
m.parseAttr([]byte(v.(string)), &a)
if a.Typ == TypeDirectory {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not happen

return syscall.ENOTSUP
}
if st := m.Access(ctx, srcList[i], MODE_MASK_R, &a); st != 0 {
return st
}
srcData[srcList[i]] = &sourceData{attr: a}
}

readPipe := tx.Pipeline()
xcmds := make(map[Ino]*redis.MapStringStringCmd, len(srcList))
scmds := make(map[Ino]*redis.StringCmd)
type chunkCmd struct {
srcIno Ino
indx uint32
cmd *redis.StringSliceCmd
}
var ccmds []chunkCmd
for _, ino := range srcList {
xcmds[ino] = readPipe.HGetAll(ctx, m.xattrKey(ino))
a := srcData[ino].attr
switch a.Typ {
case TypeFile:
if a.Length != 0 {
chunkNum := int(a.Length/ChunkSize) + 1
srcData[ino].chunks = make([]chunkData, chunkNum)
for i := 0; i < chunkNum; i++ {
cmd := readPipe.LRange(ctx, m.chunkKey(ino, uint32(i)), 0, -1)
ccmds = append(ccmds, chunkCmd{srcIno: ino, indx: uint32(i), cmd: cmd})
}
}
case TypeSymlink:
scmds[ino] = readPipe.Get(ctx, m.symKey(ino))
default:
return syscall.ENOTSUP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

donot fallback here, better to use another error type for fallback

}
}
if _, err := readPipe.Exec(ctx); err != nil && err != redis.Nil {
return err
}

for ino, cmd := range xcmds {
val, err := cmd.Result()
if err != nil {
return err
}
srcData[ino].xattr = val
}
for _, c := range ccmds {
val, err := c.cmd.Result()
if err != nil {
return err
}
if len(val) == 0 {
continue
}
ss := readSlices(val)
if ss == nil {
return syscall.EIO
}
srcData[c.srcIno].chunks[c.indx] = chunkData{vals: val, slices: ss}
}
for ino, cmd := range scmds {
sym, err := cmd.Result()
if err == redis.Nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip

return syscall.ENOTSUP
}
if err != nil {
return err
}
srcData[ino].sym = sym
}

batchResult = batchCloneResult{
userGroupQuotas: make([]userGroupQuotaDelta, 0, len(infos)),
}
refDelta := make(map[string]int64)
for _, info := range infos {
sd, ok := srcData[info.srcIno]
if !ok {
return syscall.ENOTSUP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip

}
info.srcAttr = sd.attr
Copy link
Contributor

@jiefenghuang jiefenghuang Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

info.srcAttr is not used.
some fields in other auxiliary structs are also unused and can be removed.

info.dstAttr = sd.attr
info.dstAttr.Parent = dstParent
if cmode&CLONE_MODE_PRESERVE_ATTR == 0 {
info.dstAttr.Uid = ctx.Uid()
info.dstAttr.Gid = ctx.Gid()
info.dstAttr.Mode &= ^cumask
info.dstAttr.Atime = now.Unix()
info.dstAttr.Mtime = now.Unix()
info.dstAttr.Ctime = now.Unix()
info.dstAttr.Atimensec = uint32(now.Nanosecond())
info.dstAttr.Mtimensec = uint32(now.Nanosecond())
info.dstAttr.Ctimensec = uint32(now.Nanosecond())
}
if info.dstAttr.Typ == TypeFile && info.dstAttr.Nlink > 1 {
info.dstAttr.Nlink = 1
}
info.srcXattr = sd.xattr

batchResult.length += int64(sd.attr.Length)
entrySpace := align4K(sd.attr.Length)
batchResult.space += entrySpace
batchResult.inodes++
batchResult.userGroupQuotas = append(batchResult.userGroupQuotas, userGroupQuotaDelta{
Uid: info.dstAttr.Uid,
Gid: info.dstAttr.Gid,
Space: entrySpace,
Inodes: 1,
})

if info.dstAttr.Typ == TypeFile {
for _, chunk := range sd.chunks {
if len(chunk.slices) == 0 {
continue
}
for _, s := range chunk.slices {
if s.id > 0 {
refDelta[m.sliceKey(s.id, s.size)]++
}
}
}
}
}

_, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error {
for _, info := range infos {
sd := srcData[info.srcIno]
p.Set(ctx, m.inodeKey(info.dstIno), m.marshal(&info.dstAttr), 0)
p.HSet(ctx, m.entryKey(dstParent), string(info.entry.Name), m.packEntry(info.dstAttr.Typ, info.dstIno))
if len(info.srcXattr) > 0 {
p.HMSet(ctx, m.xattrKey(info.dstIno), info.srcXattr)
}
switch info.dstAttr.Typ {
case TypeFile:
for i, chunk := range sd.chunks {
if len(chunk.vals) == 0 {
continue
}
p.RPush(ctx, m.chunkKey(info.dstIno, uint32(i)), chunk.vals)
}
case TypeSymlink:
p.Set(ctx, m.symKey(info.dstIno), sd.sym, 0)
default:
return syscall.ENOTSUP
}
}
if cmode&CLONE_MODE_PRESERVE_ATTR == 0 {
Copy link
Contributor

@jiefenghuang jiefenghuang Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not update parent time. cc @zhijian-pro

pattr.Mtime = now.Unix()
pattr.Mtimensec = uint32(now.Nanosecond())
pattr.Ctime = now.Unix()
pattr.Ctimensec = uint32(now.Nanosecond())
p.Set(ctx, m.inodeKey(dstParent), m.marshal(&pattr), 0)
}
if batchResult.space != 0 {
p.IncrBy(ctx, m.usedSpaceKey(), batchResult.space)
}
if batchResult.inodes != 0 {
p.IncrBy(ctx, m.totalInodesKey(), batchResult.inodes)
}
for field, delta := range refDelta {
if delta != 0 {
p.HIncrBy(ctx, m.sliceRefs(), field, delta)
}
}
return nil
})
return err
}, watchKeys...)
if err != nil {
// Fallback to per-entry clone for transient/deleted-source cases.
if errno(err) == syscall.ENOTSUP {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change syscall.ENOTSUP to another error type, also for cloneEntry in base.go.

cc @zhijian-pro

if start > 0 {
return fallbackEntries(entries[start:])
}
return syscall.ENOTSUP
}
return errno(err)
}

if result != nil {
result.length += batchResult.length
result.space += batchResult.space
result.inodes += batchResult.inodes
result.userGroupQuotas = append(result.userGroupQuotas, batchResult.userGroupQuotas...)
}
}
return 0
}

func (m *redisMeta) doCleanupDetachedNode(ctx Context, ino Ino) syscall.Errno {
Expand Down
Loading
Loading