Skip to content

Commit 8f0f94a

Browse files
meta: validate entry existence in doBatchUnlink transaction (#6603)
Signed-off-by: jiefenghuang <jiefeng@juicedata.io> Co-authored-by: jiefenghuang <jiefeng@juicedata.io>
1 parent ba90688 commit 8f0f94a

File tree

4 files changed

+68
-46
lines changed

4 files changed

+68
-46
lines changed

pkg/meta/base.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,6 +1676,7 @@ func (m *baseMeta) Rmdir(ctx Context, parent Ino, name string, skipCheckTrash ..
16761676
return st
16771677
}
16781678

1679+
// BatchUnlink delete multiple files in the same directory (case-sensitive filenames)
16791680
func (m *baseMeta) BatchUnlink(ctx Context, parent Ino, entries []*Entry, count *uint64, skipCheckTrash bool) syscall.Errno {
16801681
if len(entries) == 0 {
16811682
return 0

pkg/meta/redis.go

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,23 +1775,34 @@ func (m *redisMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, len
17751775
return syscall.EPERM
17761776
}
17771777

1778+
entryKey := m.entryKey(parent)
17781779
entryInfos = make([]*entryInfo, 0, len(entries))
17791780
now := time.Now()
1780-
1781-
if len(entries) > 0 {
1782-
for _, entry := range entries {
1783-
if entry.Attr.Typ == TypeDirectory {
1784-
continue
1785-
}
1786-
info := entryInfo{
1787-
name: string(entry.Name),
1788-
inode: entry.Inode,
1789-
typ: entry.Attr.Typ,
1790-
trash: trash,
1791-
buf: m.packEntry(entry.Attr.Typ, entry.Inode),
1792-
}
1793-
entryInfos = append(entryInfos, &info)
1781+
enames := make([]string, 0, len(entries))
1782+
for _, entry := range entries {
1783+
enames = append(enames, string(entry.Name))
1784+
}
1785+
vals, err := tx.HMGet(ctx, entryKey, enames...).Result()
1786+
if err != nil {
1787+
return err
1788+
}
1789+
for idx, entry := range entries {
1790+
val := vals[idx]
1791+
if val == nil {
1792+
continue
1793+
}
1794+
buf := []byte(val.(string))
1795+
typ, ino := m.parseEntry(buf)
1796+
if entry.Inode != ino || typ == TypeDirectory || (entry.Attr != nil && entry.Attr.Typ != typ) {
1797+
continue
17941798
}
1799+
entryInfos = append(entryInfos, &entryInfo{
1800+
name: string(entry.Name),
1801+
inode: ino,
1802+
typ: typ,
1803+
trash: trash,
1804+
buf: buf,
1805+
})
17951806
}
17961807

17971808
inodesSet := make(map[Ino]struct{}, len(entryInfos))
@@ -1906,9 +1917,6 @@ func (m *redisMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, len
19061917
stats := make(map[string]int64) // key -> delta
19071918

19081919
for _, info := range entryInfos {
1909-
if info.typ == TypeDirectory {
1910-
continue
1911-
}
19121920
names = append(names, info.name)
19131921
if info.attr == nil {
19141922
continue

pkg/meta/sql.go

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2638,10 +2638,6 @@ func (m *dbMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, length
26382638
}
26392639
delNodes := make(map[Ino]*dNode)
26402640
var totalLength, totalSpace, totalInodes int64
2641-
if userGroupQuotas != nil {
2642-
*userGroupQuotas = make([]userGroupQuotaDelta, 0, len(entries))
2643-
}
2644-
// main transaction: validate, collect metadata, update inode/link counts, and prepare DB mutations
26452641
err := m.txn(func(s *xorm.Session) error {
26462642
pn := node{Inode: parent}
26472643
ok, err := s.Get(&pn)
@@ -2662,19 +2658,30 @@ func (m *dbMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, length
26622658
if (pn.Flags&FlagAppend != 0) || (pn.Flags&FlagImmutable) != 0 {
26632659
return syscall.EPERM
26642660
}
2665-
entryInfos = make([]*entryInfo, 0, len(entries))
26662661
now := time.Now().UnixNano()
2662+
entryInfos = make([]*entryInfo, 0, len(entries))
2663+
names := make([][]byte, 0, len(entries))
2664+
for _, entry := range entries {
2665+
names = append(names, entry.Name)
2666+
}
2667+
var foundEdges []edge
2668+
if err := s.Where("parent=?", parent).In("name", names).Find(&foundEdges); err != nil {
2669+
return err
2670+
}
2671+
entryMap := make(map[string]*edge)
2672+
for i := range foundEdges {
2673+
entryMap[string(foundEdges[i].Name)] = &foundEdges[i]
2674+
}
26672675

2668-
// collect unique inode ids from entries (avoid operating N times on same inode for hard links)
26692676
inodes := make([]Ino, 0, len(entries))
26702677
inodeM := make(map[Ino]struct{}) // filter hardlinks
26712678
for _, entry := range entries {
2672-
e := &edge{Parent: parent, Name: entry.Name, Inode: entry.Inode}
2673-
if entry.Attr != nil {
2674-
if entry.Attr.Typ == TypeDirectory {
2675-
continue
2676-
}
2677-
e.Type = entry.Attr.Typ
2679+
e, ok := entryMap[string(entry.Name)]
2680+
if !ok {
2681+
continue
2682+
}
2683+
if e.Inode != entry.Inode || e.Type == TypeDirectory || (entry.Attr != nil && e.Type != entry.Attr.Typ) {
2684+
continue
26782685
}
26792686
entryInfos = append(entryInfos, &entryInfo{e: e, trash: trash})
26802687
if _, exists := inodeM[entry.Inode]; !exists {
@@ -2766,7 +2773,9 @@ func (m *dbMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, length
27662773
symlinksDel := make([]Ino, 0)
27672774
xattrsDel := make([]Ino, 0)
27682775
edgesIns := make([]interface{}, 0)
2769-
2776+
if userGroupQuotas != nil {
2777+
*userGroupQuotas = make([]userGroupQuotaDelta, 0, len(entries))
2778+
}
27702779
// walk each edge to decide whether to move to trash, decrement nlink or delete inode & xattrs
27712780
for _, info := range entryInfos {
27722781
edgesDel = append(edgesDel, edge{Parent: parent, Name: info.e.Name})

pkg/meta/tkv.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1491,7 +1491,6 @@ func (m *kvMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, length
14911491
batchUserGroupQuotas = make([]userGroupQuotaDelta, 0, len(batch))
14921492
}
14931493
delNodes = make(map[Ino]*dNode)
1494-
// Get parent directory attribute
14951494
pbuf := tx.get(m.inodeKey(parent))
14961495
if pbuf == nil {
14971496
return syscall.ENOENT
@@ -1510,22 +1509,27 @@ func (m *kvMeta) doBatchUnlink(ctx Context, parent Ino, entries []*Entry, length
15101509

15111510
entryInfos = make([]*entryInfo, 0, len(batch))
15121511
now := time.Now()
1513-
1514-
// Collect entry info and filter out directories
1515-
if len(batch) > 0 {
1516-
for _, entry := range batch {
1517-
if entry.Attr.Typ == TypeDirectory {
1518-
continue
1519-
}
1520-
info := entryInfo{
1521-
name: string(entry.Name),
1522-
inode: entry.Inode,
1523-
typ: entry.Attr.Typ,
1524-
trash: trash,
1525-
buf: m.packEntry(entry.Attr.Typ, entry.Inode),
1526-
}
1527-
entryInfos = append(entryInfos, &info)
1512+
keys := make([][]byte, 0, len(batch))
1513+
for _, entry := range batch {
1514+
keys = append(keys, m.entryKey(parent, string(entry.Name)))
1515+
}
1516+
vals := tx.gets(keys...)
1517+
for idx, entry := range batch {
1518+
if vals[idx] == nil {
1519+
continue
1520+
}
1521+
typ, ino := m.parseEntry(vals[idx])
1522+
if ino != entry.Inode || typ == TypeDirectory || (entry.Attr != nil && typ != entry.Attr.Typ) {
1523+
continue
1524+
}
1525+
info := entryInfo{
1526+
name: string(entry.Name),
1527+
inode: ino,
1528+
typ: typ,
1529+
trash: trash,
1530+
buf: vals[idx],
15281531
}
1532+
entryInfos = append(entryInfos, &info)
15291533
}
15301534

15311535
// Collect unique inodes

0 commit comments

Comments
 (0)