Skip to content

gh-106531: Refresh zipfile._path with zipp 3.18. #116835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Lib/test/test_zipfile/_path/test_complexity.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ def make_zip_path(self, depth=1, width=1) -> zipfile.Path:
@classmethod
def make_names(cls, width, letters=string.ascii_lowercase):
"""
>>> list(TestComplexity.make_names(1))
['a']
>>> list(TestComplexity.make_names(2))
['a', 'b']
>>> list(TestComplexity.make_names(30))
['aa', 'ab', ..., 'bd']
>>> list(TestComplexity.make_names(17124))
['aaa', 'aab', ..., 'zip']
"""
# determine how many products are needed to produce width
n_products = math.ceil(math.log(width, len(letters)))
n_products = max(1, math.ceil(math.log(width, len(letters))))
inputs = (letters,) * n_products
combinations = itertools.product(*inputs)
names = map(''.join, combinations)
Expand Down Expand Up @@ -80,7 +84,7 @@ def test_glob_depth(self):
max_n=100,
min_n=1,
)
assert best <= big_o.complexities.Quadratic
assert best <= big_o.complexities.Linear

@pytest.mark.flaky
def test_glob_width(self):
Expand Down
23 changes: 8 additions & 15 deletions Lib/test/test_zipfile/_path/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import sys
import unittest
import zipfile
import zipfile._path

from ._functools import compose
from ._itertools import Counter
Expand All @@ -20,16 +21,6 @@ class itertools:
Counter = Counter


def add_dirs(zf):
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf


def build_alpharep_fixture():
"""
Create a zip file with this structure:
Expand Down Expand Up @@ -76,7 +67,7 @@ def build_alpharep_fixture():

alpharep_generators = [
Invoked.wrap(build_alpharep_fixture),
Invoked.wrap(compose(add_dirs, build_alpharep_fixture)),
Invoked.wrap(compose(zipfile._path.CompleteDirs.inject, build_alpharep_fixture)),
]

pass_alpharep = parameterize(['alpharep'], alpharep_generators)
Expand Down Expand Up @@ -210,11 +201,12 @@ def test_open_write(self):
with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm:
strm.write('text file')

def test_open_extant_directory(self):
@pass_alpharep
def test_open_extant_directory(self, alpharep):
"""
Attempting to open a directory raises IsADirectoryError.
"""
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
zf = zipfile.Path(alpharep)
with self.assertRaises(IsADirectoryError):
zf.joinpath('b').open()

Expand All @@ -226,11 +218,12 @@ def test_open_binary_invalid_args(self, alpharep):
with self.assertRaises(ValueError):
root.joinpath('a.txt').open('rb', 'utf-8')

def test_open_missing_directory(self):
@pass_alpharep
def test_open_missing_directory(self, alpharep):
"""
Attempting to open a missing directory raises FileNotFoundError.
"""
zf = zipfile.Path(add_dirs(build_alpharep_fixture()))
zf = zipfile.Path(alpharep)
with self.assertRaises(FileNotFoundError):
zf.joinpath('z').open()

Expand Down
65 changes: 51 additions & 14 deletions Lib/zipfile/_path/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import contextlib
import pathlib
import re
import sys

from .glob import translate
from .glob import Translator


__all__ = ['Path']
Expand Down Expand Up @@ -147,6 +148,16 @@ def make(cls, source):
source.__class__ = cls
return source

@classmethod
def inject(cls, zf: zipfile.ZipFile) -> zipfile.ZipFile:
"""
Given a writable zip file zf, inject directory entries for
any directories implied by the presence of children.
"""
for name in cls._implied_dirs(zf.namelist()):
zf.writestr(name, b"")
return zf


class FastLookup(CompleteDirs):
"""
Expand All @@ -168,8 +179,10 @@ def _name_set(self):


def _extract_text_encoding(encoding=None, *args, **kwargs):
# stacklevel=3 so that the caller of the caller see any warning.
return io.text_encoding(encoding, 3), args, kwargs
# compute stack level so that the caller of the caller sees any warning.
is_pypy = sys.implementation.name == 'pypy'
stack_level = 3 + is_pypy
return io.text_encoding(encoding, stack_level), args, kwargs


class Path:
Expand All @@ -194,13 +207,13 @@ class Path:

Path accepts the zipfile object itself or a filename

>>> root = Path(zf)
>>> path = Path(zf)

From there, several path operations are available.

Directory iteration (including the zip file itself):

>>> a, b = root.iterdir()
>>> a, b = path.iterdir()
>>> a
Path('mem/abcde.zip', 'a.txt')
>>> b
Expand Down Expand Up @@ -238,16 +251,38 @@ class Path:
'mem/abcde.zip/b/c.txt'

At the root, ``name``, ``filename``, and ``parent``
resolve to the zipfile. Note these attributes are not
valid and will raise a ``ValueError`` if the zipfile
has no filename.
resolve to the zipfile.

>>> root.name
>>> str(path)
'mem/abcde.zip/'
>>> path.name
'abcde.zip'
>>> str(root.filename).replace(os.sep, posixpath.sep)
'mem/abcde.zip'
>>> str(root.parent)
>>> path.filename == pathlib.Path('mem/abcde.zip')
True
>>> str(path.parent)
'mem'

If the zipfile has no filename, such attribtues are not
valid and accessing them will raise an Exception.

>>> zf.filename = None
>>> path.name
Traceback (most recent call last):
...
TypeError: ...

>>> path.filename
Traceback (most recent call last):
...
TypeError: ...

>>> path.parent
Traceback (most recent call last):
...
TypeError: ...

# workaround python/cpython#106763
>>> pass
"""

__repr = "{self.__class__.__name__}({self.root.filename!r}, {self.at!r})"
Expand Down Expand Up @@ -364,8 +399,10 @@ def glob(self, pattern):
raise ValueError(f"Unacceptable pattern: {pattern!r}")

prefix = re.escape(self.at)
matches = re.compile(prefix + translate(pattern)).fullmatch
return map(self._next, filter(matches, self.root.namelist()))
tr = Translator(seps='/')
matches = re.compile(prefix + tr.translate(pattern)).fullmatch
names = (data.filename for data in self.root.filelist)
return map(self._next, filter(matches, names))

def rglob(self, pattern):
return self.glob(f'**/{pattern}')
Expand Down
112 changes: 89 additions & 23 deletions Lib/zipfile/_path/glob.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,97 @@
import os
import re


def translate(pattern):
r"""
Given a glob pattern, produce a regex that matches it.
_default_seps = os.sep + str(os.altsep) * bool(os.altsep)

>>> translate('*.txt')
'[^/]*\\.txt'
>>> translate('a?txt')
'a.txt'
>>> translate('**/*')
'.*/[^/]*'

class Translator:
"""
>>> Translator('xyz')
Traceback (most recent call last):
...
AssertionError: Invalid separators

>>> Translator('')
Traceback (most recent call last):
...
AssertionError: Invalid separators
"""
return ''.join(map(replace, separate(pattern)))

seps: str

def __init__(self, seps: str = _default_seps):
assert seps and set(seps) <= set(_default_seps), "Invalid separators"
self.seps = seps

def translate(self, pattern):
"""
Given a glob pattern, produce a regex that matches it.
"""
return self.extend(self.translate_core(pattern))

def extend(self, pattern):
r"""
Extend regex for pattern-wide concerns.

Apply '(?s:)' to create a non-matching group that
matches newlines (valid on Unix).

Append '\Z' to imply fullmatch even when match is used.
"""
return rf'(?s:{pattern})\Z'

def translate_core(self, pattern):
r"""
Given a glob pattern, produce a regex that matches it.

>>> t = Translator()
>>> t.translate_core('*.txt').replace('\\\\', '')
'[^/]*\\.txt'
>>> t.translate_core('a?txt')
'a[^/]txt'
>>> t.translate_core('**/*').replace('\\\\', '')
'.*/[^/][^/]*'
"""
self.restrict_rglob(pattern)
return ''.join(map(self.replace, separate(self.star_not_empty(pattern))))

def replace(self, match):
"""
Perform the replacements for a match from :func:`separate`.
"""
return match.group('set') or (
re.escape(match.group(0))
.replace('\\*\\*', r'.*')
.replace('\\*', rf'[^{re.escape(self.seps)}]*')
.replace('\\?', r'[^/]')
)

def restrict_rglob(self, pattern):
"""
Raise ValueError if ** appears in anything but a full path segment.

>>> Translator().translate('**foo')
Traceback (most recent call last):
...
ValueError: ** must appear alone in a path segment
"""
seps_pattern = rf'[{re.escape(self.seps)}]+'
segments = re.split(seps_pattern, pattern)
if any('**' in segment and segment != '**' for segment in segments):
raise ValueError("** must appear alone in a path segment")

def star_not_empty(self, pattern):
"""
Ensure that * will not match an empty segment.
"""

def handle_segment(match):
segment = match.group(0)
return '?*' if segment == '*' else segment

not_seps_pattern = rf'[^{re.escape(self.seps)}]+'
return re.sub(not_seps_pattern, handle_segment, pattern)


def separate(pattern):
Expand All @@ -25,16 +104,3 @@ def separate(pattern):
['a', '[?]', 'txt']
"""
return re.finditer(r'([^\[]+)|(?P<set>[\[].*?[\]])|([\[][^\]]*$)', pattern)


def replace(match):
"""
Perform the replacements for a match from :func:`separate`.
"""

return match.group('set') or (
re.escape(match.group(0))
.replace('\\*\\*', r'.*')
.replace('\\*', r'[^/]*')
.replace('\\?', r'.')
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Refreshed zipfile._path from `zipp 3.18
<https://zipp.readthedocs.io/en/latest/history.html#v3-18-0>`_, providing
better compatibility for PyPy, better glob performance for deeply nested
zipfiles, and providing internal access to ``CompleteDirs.inject`` for use
in other tests (like importlib.resources).