Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions Doc/library/pathlib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,34 @@ Copying, renaming and deleting
Remove this directory. The directory must be empty.


.. method:: Path.rmtree(ignore_errors=False, on_error=None)

Delete this entire directory tree. The path must not refer to a symlink.

If *ignore_errors* is true, errors resulting from failed removals will be
ignored. If *ignore_errors* is false or omitted, and a function is given to
*on_error*, it will be called each time an exception is raised. If neither
*ignore_errors* nor *on_error* are supplied, exceptions are propagated to
the caller.

.. note::

On platforms that support the necessary fd-based functions a symlink
attack resistant version of :meth:`~Path.rmtree` is used by default. On
other platforms, the :func:`~Path.rmtree` implementation is susceptible
to a symlink attack: given proper timing and circumstances, attackers
can manipulate symlinks on the filesystem to delete files they wouldn't
be able to access otherwise.

If the optional argument *on_error* is specified, it should be a callable;
it will be called with one argument, an :exc:`OSError` instance. The
callable can handle the error to continue the deletion process or re-raise
it to stop. Note that the filename is available as the ``filename``
attribute of the exception object.

.. versionadded:: 3.14


Permissions and ownership
^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
2 changes: 2 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pathlib
* Add :meth:`pathlib.Path.copytree`, which copies one directory tree to
another.
(Contributed by Barney Gale in :gh:`73991`.)
* Add :meth:`pathlib.Path.rmtree`, which recursively removes a directory.
(Contributed by Barney Gale in :gh:`73991`.)

symtable
--------
Expand Down
39 changes: 39 additions & 0 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,45 @@ def rmdir(self):
"""
raise UnsupportedOperation(self._unsupported_msg('rmdir()'))

def rmtree(self, ignore_errors=False, on_error=None):
"""
Recursively delete this directory tree.

If *ignore_errors* is true, exceptions raised from scanning the tree
and removing files and directories are ignored. Otherwise, if
*on_error* is set, it will be called to handle the error. If neither
*ignore_errors* nor *on_error* are set, exceptions are propagated to
the caller.
"""
if ignore_errors:
def on_error(err):
pass
elif on_error is None:
def on_error(err):
raise err
try:
if self.is_symlink() or self.is_junction():
raise OSError("Cannot call rmtree on a symbolic link")
results = self.walk(
on_error=on_error,
top_down=False,
follow_symlinks=False)
for dirpath, dirnames, filenames in results:
for name in filenames:
try:
dirpath.joinpath(name).unlink()
except OSError as err:
on_error(err)
for name in dirnames:
try:
dirpath.joinpath(name).rmdir()
except OSError as err:
on_error(err)
self.rmdir()
except OSError as err:
err.filename = str(self)
on_error(err)

def owner(self, *, follow_symlinks=True):
"""
Return the login name of the file owner.
Expand Down
19 changes: 19 additions & 0 deletions Lib/pathlib/_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,25 @@ def rmdir(self):
"""
os.rmdir(self)

def rmtree(self, ignore_errors=False, on_error=None):
"""
Recursively delete this directory tree.

If *ignore_errors* is true, exceptions raised from scanning the tree
and removing files and directories are ignored. Otherwise, if
*on_error* is set, it will be called to handle the error. If neither
*ignore_errors* nor *on_error* are set, exceptions are propagated to
the caller.
"""
if on_error:
def onexc(func, filename, err):
err.filename = filename
on_error(err)
else:
onexc = None
import shutil
shutil.rmtree(str(self), ignore_errors, onexc=onexc)

def rename(self, target):
"""
Rename this path to the target path.
Expand Down
251 changes: 251 additions & 0 deletions Lib/test/test_pathlib/test_pathlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from test.support import import_helper
from test.support import is_emscripten, is_wasi
from test.support import infinite_recursion
from test.support import swap_attr
from test.support import os_helper
from test.support.os_helper import TESTFN, FakePath
from test.test_pathlib import test_pathlib_abc
Expand All @@ -31,6 +32,10 @@
if hasattr(os, 'geteuid'):
root_in_posix = (os.geteuid() == 0)

rmtree_use_fd_functions = (
{os.open, os.stat, os.unlink, os.rmdir} <= os.supports_dir_fd and
os.listdir in os.supports_fd and os.stat in os.supports_follow_symlinks)

#
# Tests for the pure classes.
#
Expand Down Expand Up @@ -777,6 +782,252 @@ def test_group_no_follow_symlinks(self):
self.assertEqual(expected_gid, gid_2)
self.assertEqual(expected_name, link.group(follow_symlinks=False))

def test_rmtree_uses_safe_fd_version_if_available(self):
if rmtree_use_fd_functions:
d = self.cls(self.base, 'a')
d.mkdir()
try:
real_open = os.open

class Called(Exception):
pass

def _raiser(*args, **kwargs):
raise Called

os.open = _raiser
self.assertRaises(Called, d.rmtree)
finally:
os.open = real_open

@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_unwritable(self):
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
child_file_path = tmp / 'a'
child_dir_path = tmp / 'b'
child_file_path.write_text("")
child_dir_path.mkdir()
old_dir_mode = tmp.stat().st_mode
old_child_file_mode = child_file_path.stat().st_mode
old_child_dir_mode = child_dir_path.stat().st_mode
# Make unwritable.
new_mode = stat.S_IREAD | stat.S_IEXEC
try:
child_file_path.chmod(new_mode)
child_dir_path.chmod(new_mode)
tmp.chmod(new_mode)

errors = []
tmp.rmtree(on_error=errors.append)
# Test whether onerror has actually been called.
print(errors)
self.assertEqual(len(errors), 3)
finally:
tmp.chmod(old_dir_mode)
child_file_path.chmod(old_child_file_mode)
child_dir_path.chmod(old_child_dir_mode)

@needs_windows
def test_rmtree_inner_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir2 = dir1 / 'dir2'
dir3 = tmp / 'dir3'
for d in dir1, dir2, dir3:
d.mkdir()
file1 = tmp / 'file1'
file1.write_text('foo')
link1 = dir1 / 'link1'
_winapi.CreateJunction(str(dir2), str(link1))
link2 = dir1 / 'link2'
_winapi.CreateJunction(str(dir3), str(link2))
link3 = dir1 / 'link3'
_winapi.CreateJunction(str(file1), str(link3))
# make sure junctions are removed but not followed
dir1.rmtree()
self.assertFalse(dir1.exists())
self.assertTrue(dir3.exists())
self.assertTrue(file1.exists())

@needs_windows
def test_rmtree_outer_junction(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
try:
src = tmp / 'cheese'
dst = tmp / 'shop'
src.mkdir()
spam = src / 'spam'
spam.write_text('')
_winapi.CreateJunction(str(src), str(dst))
self.assertRaises(OSError, dst.rmtree)
dst.rmtree(ignore_errors=True)
finally:
tmp.rmtree(ignore_errors=True)

@needs_windows
def test_rmtree_outer_junction_on_error(self):
import _winapi
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir_ = tmp / 'dir'
dir_.mkdir()
link = tmp / 'link'
_winapi.CreateJunction(str(dir_), str(link))
try:
self.assertRaises(OSError, link.rmtree)
self.assertTrue(dir_.exists())
self.assertTrue(link.exists(follow_symlinks=False))
errors = []

def on_error(error):
errors.append(error)

link.rmtree(on_error=on_error)
self.assertEqual(len(errors), 1)
self.assertIsInstance(errors[0], OSError)
self.assertEqual(errors[0].filename, str(link))
finally:
os.unlink(str(link))

@unittest.skipUnless(rmtree_use_fd_functions, "requires safe rmtree")
def test_rmtree_fails_on_close(self):
# Test that the error handler is called for failed os.close() and that
# os.close() is only called once for a file descriptor.
tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
dir1 = tmp / 'dir1'
dir1.mkdir()
dir2 = dir1 / 'dir2'
dir2.mkdir()

def close(fd):
orig_close(fd)
nonlocal close_count
close_count += 1
raise OSError

close_count = 0
with swap_attr(os, 'close', close) as orig_close:
with self.assertRaises(OSError):
dir1.rmtree()
self.assertTrue(dir2.is_dir())
self.assertEqual(close_count, 2)

close_count = 0
errors = []

with swap_attr(os, 'close', close) as orig_close:
dir1.rmtree(on_error=errors.append)
print(errors)
self.assertEqual(len(errors), 2)
self.assertEqual(errors[0].filename, str(dir2))
self.assertEqual(errors[1].filename, str(dir1))
self.assertEqual(close_count, 2)

@unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
@unittest.skipIf(sys.platform == "vxworks",
"fifo requires special path on VxWorks")
def test_rmtree_on_named_pipe(self):
p = self.cls(self.base, 'pipe')
os.mkfifo(p)
try:
with self.assertRaises(NotADirectoryError):
p.rmtree()
self.assertTrue(p.exists())
finally:
p.unlink()

p = self.cls(self.base, 'dir')
p.mkdir()
os.mkfifo(p / 'mypipe')
p.rmtree()
self.assertFalse(p.exists())

@unittest.skipIf(sys.platform[:6] == 'cygwin',
"This test can't be run on Cygwin (issue #1071513).")
@os_helper.skip_if_dac_override
@os_helper.skip_unless_working_chmod
def test_rmtree_deleted_race_condition(self):
# bpo-37260
#
# Test that a file or a directory deleted after it is enumerated
# by scandir() but before unlink() or rmdr() is called doesn't
# generate any errors.
def on_error(exc):
assert exc.filename
if not isinstance(exc, PermissionError):
raise
# Make the parent and the children writeable.
for p, mode in zip(paths, old_modes):
p.chmod(mode)
# Remove other dirs except one.
keep = next(p for p in dirs if str(p) != exc.filename)
for p in dirs:
if p != keep:
p.rmdir()
# Remove other files except one.
keep = next(p for p in files if str(p) != exc.filename)
for p in files:
if p != keep:
p.unlink()

tmp = self.cls(self.base, 'rmtree')
tmp.mkdir()
paths = [tmp] + [tmp / f'child{i}' for i in range(6)]
dirs = paths[1::2]
files = paths[2::2]
for path in dirs:
path.mkdir()
for path in files:
path.write_text('')

old_modes = [path.stat().st_mode for path in paths]

# Make the parent and the children non-writeable.
new_mode = stat.S_IREAD | stat.S_IEXEC
for path in reversed(paths):
path.chmod(new_mode)

try:
tmp.rmtree(on_error=on_error)
except:
# Test failed, so cleanup artifacts.
for path, mode in zip(paths, old_modes):
try:
path.chmod(mode)
except OSError:
pass
tmp.rmtree()
raise

def test_rmtree_does_not_choke_on_failing_lstat(self):
try:
orig_lstat = os.lstat
tmp = self.cls(self.base, 'rmtree')

def raiser(fn, *args, **kwargs):
if fn != str(tmp):
raise OSError()
else:
return orig_lstat(fn)

os.lstat = raiser

tmp.mkdir()
foo = tmp / 'foo'
foo.write_text('')
tmp.rmtree()
finally:
os.lstat = orig_lstat

@os_helper.skip_unless_hardlink
def test_hardlink_to(self):
P = self.cls(self.base)
Expand Down
Loading