Skip to content

Read wheel metadata from wheel directly #7538

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 49 additions & 19 deletions src/pip/_internal/operations/install/wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import warnings
from base64 import urlsafe_b64encode
from email.parser import Parser
from zipfile import ZipFile

from pip._vendor import pkg_resources
from pip._vendor.distlib.scripts import ScriptMaker
from pip._vendor.distlib.util import get_export_entry
from pip._vendor.packaging.utils import canonicalize_name
from pip._vendor.six import StringIO, ensure_str
from pip._vendor.six import PY2, StringIO, ensure_str

from pip._internal.exceptions import InstallationError, UnsupportedWheel
from pip._internal.locations import get_major_minor_version
Expand All @@ -43,6 +44,11 @@

InstalledCSVRow = Tuple[str, ...]

if PY2:
from zipfile import BadZipfile as BadZipFile
else:
from zipfile import BadZipFile


VERSION_COMPATIBLE = (1, 0)

Expand Down Expand Up @@ -286,6 +292,7 @@ def make(self, specification, options=None):
def install_unpacked_wheel(
name, # type: str
wheeldir, # type: str
wheel_zip, # type: ZipFile
scheme, # type: Scheme
req_description, # type: str
pycompile=True, # type: bool
Expand All @@ -296,6 +303,7 @@ def install_unpacked_wheel(

:param name: Name of the project to install
:param wheeldir: Base directory of the unpacked wheel
:param wheel_zip: open ZipFile for wheel being installed
:param scheme: Distutils scheme dictating the install directories
:param req_description: String used in place of the requirement, for
logging
Expand All @@ -313,16 +321,7 @@ def install_unpacked_wheel(

source = wheeldir.rstrip(os.path.sep) + os.path.sep

try:
info_dir = wheel_dist_info_dir(source, name)
metadata = wheel_metadata(source, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)

check_compatibility(version, name)
info_dir, metadata = parse_wheel(wheel_zip, name)

if wheel_root_is_purelib(metadata):
lib_dir = scheme.purelib
Expand Down Expand Up @@ -612,26 +611,50 @@ def install_wheel(
# type: (...) -> None
with TempDirectory(
path=_temp_dir_for_testing, kind="unpacked-wheel"
) as unpacked_dir:
) as unpacked_dir, ZipFile(wheel_path, allowZip64=True) as z:
unpack_file(wheel_path, unpacked_dir.path)
install_unpacked_wheel(
name=name,
wheeldir=unpacked_dir.path,
wheel_zip=z,
scheme=scheme,
req_description=req_description,
pycompile=pycompile,
warn_script_location=warn_script_location,
)


def parse_wheel(wheel_zip, name):
# type: (ZipFile, str) -> Tuple[str, Message]
"""Extract information from the provided wheel, ensuring it meets basic
standards.

Returns the name of the .dist-info directory and the parsed WHEEL metadata.
"""
try:
info_dir = wheel_dist_info_dir(wheel_zip, name)
metadata = wheel_metadata(wheel_zip, info_dir)
version = wheel_version(metadata)
except UnsupportedWheel as e:
raise UnsupportedWheel(
"{} has an invalid wheel, {}".format(name, str(e))
)

check_compatibility(version, name)

return info_dir, metadata


def wheel_dist_info_dir(source, name):
# type: (str, str) -> str
# type: (ZipFile, str) -> str
"""Returns the name of the contained .dist-info directory.

Raises AssertionError or UnsupportedWheel if not found, >1 found, or
it doesn't match the provided name.
"""
subdirs = os.listdir(source)
# Zip file path separators must be /
subdirs = list(set(p.split("/")[0] for p in source.namelist()))

info_dirs = [s for s in subdirs if s.endswith('.dist-info')]

if not info_dirs:
Expand All @@ -655,19 +678,26 @@ def wheel_dist_info_dir(source, name):
)
)

return info_dir
# Zip file paths can be unicode or str depending on the zip entry flags,
# so normalize it.
return ensure_str(info_dir)


def wheel_metadata(source, dist_info_dir):
# type: (str, str) -> Message
# type: (ZipFile, str) -> Message
"""Return the WHEEL metadata of an extracted wheel, if possible.
Otherwise, raise UnsupportedWheel.
"""
try:
with open(os.path.join(source, dist_info_dir, "WHEEL"), "rb") as f:
wheel_text = ensure_str(f.read())
except (IOError, OSError) as e:
# Zip file path separators must be /
wheel_contents = source.read("{}/WHEEL".format(dist_info_dir))
# BadZipFile for general corruption, KeyError for missing entry,
# and RuntimeError for password-protected files
except (BadZipFile, KeyError, RuntimeError) as e:
raise UnsupportedWheel("could not read WHEEL file: {!r}".format(e))

try:
wheel_text = ensure_str(wheel_contents)
except UnicodeDecodeError as e:
raise UnsupportedWheel("error decoding WHEEL: {!r}".format(e))

Expand Down
69 changes: 53 additions & 16 deletions tests/unit/test_wheel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import os
import textwrap
from email import message_from_string
from io import BytesIO
from zipfile import ZipFile

import pytest
from mock import patch
from pip._vendor.contextlib2 import ExitStack
from pip._vendor.packaging.requirements import Requirement

from pip._internal.exceptions import UnsupportedWheel
Expand All @@ -22,9 +25,13 @@
)
from pip._internal.utils.compat import WINDOWS
from pip._internal.utils.misc import hash_file
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.utils.unpacking import unpack_file
from tests.lib import DATA_DIR, assert_paths_equal, skip_if_python2

if MYPY_CHECK_RUNNING:
from tests.lib.path import Path


def call_get_legacy_build_wheel_path(caplog, names):
wheel_path = get_legacy_build_wheel_path(
Expand Down Expand Up @@ -190,30 +197,60 @@ def test_get_csv_rows_for_installed__long_lines(tmpdir, caplog):
assert messages == expected


def test_wheel_dist_info_dir_found(tmpdir):
@pytest.fixture
def zip_dir():
def make_zip(path):
# type: (Path) -> ZipFile
buf = BytesIO()
with ZipFile(buf, "w", allowZip64=True) as z:
for dirpath, dirnames, filenames in os.walk(path):
for filename in filenames:
file_path = os.path.join(path, dirpath, filename)
# Zip files must always have / as path separator
archive_path = os.path.relpath(file_path, path).replace(
os.pathsep, "/"
)
z.write(file_path, archive_path)

return stack.enter_context(ZipFile(buf, "r", allowZip64=True))

stack = ExitStack()
with stack:
yield make_zip


def test_wheel_dist_info_dir_found(tmpdir, zip_dir):
expected = "simple-0.1.dist-info"
tmpdir.joinpath(expected).mkdir()
assert wheel.wheel_dist_info_dir(str(tmpdir), "simple") == expected
dist_info_dir = tmpdir / expected
dist_info_dir.mkdir()
dist_info_dir.joinpath("WHEEL").touch()
assert wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple") == expected


def test_wheel_dist_info_dir_multiple(tmpdir):
tmpdir.joinpath("simple-0.1.dist-info").mkdir()
tmpdir.joinpath("unrelated-0.1.dist-info").mkdir()
def test_wheel_dist_info_dir_multiple(tmpdir, zip_dir):
dist_info_dir_1 = tmpdir / "simple-0.1.dist-info"
dist_info_dir_1.mkdir()
dist_info_dir_1.joinpath("WHEEL").touch()
dist_info_dir_2 = tmpdir / "unrelated-0.1.dist-info"
dist_info_dir_2.mkdir()
dist_info_dir_2.joinpath("WHEEL").touch()
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "multiple .dist-info directories found" in str(e.value)


def test_wheel_dist_info_dir_none(tmpdir):
def test_wheel_dist_info_dir_none(tmpdir, zip_dir):
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "directory not found" in str(e.value)


def test_wheel_dist_info_dir_wrong_name(tmpdir):
tmpdir.joinpath("unrelated-0.1.dist-info").mkdir()
def test_wheel_dist_info_dir_wrong_name(tmpdir, zip_dir):
dist_info_dir = tmpdir / "unrelated-0.1.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("WHEEL").touch()
with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_dist_info_dir(str(tmpdir), "simple")
wheel.wheel_dist_info_dir(zip_dir(tmpdir), "simple")
assert "does not start with 'simple'" in str(e.value)


Expand All @@ -223,25 +260,25 @@ def test_wheel_version_ok(tmpdir, data):
) == (1, 9)


def test_wheel_metadata_fails_missing_wheel(tmpdir):
def test_wheel_metadata_fails_missing_wheel(tmpdir, zip_dir):
dist_info_dir = tmpdir / "simple-0.1.0.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("METADATA").touch()

with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(str(tmpdir), dist_info_dir.name)
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "could not read WHEEL file" in str(e.value)


@skip_if_python2
def test_wheel_metadata_fails_on_bad_encoding(tmpdir):
def test_wheel_metadata_fails_on_bad_encoding(tmpdir, zip_dir):
dist_info_dir = tmpdir / "simple-0.1.0.dist-info"
dist_info_dir.mkdir()
dist_info_dir.joinpath("METADATA").touch()
dist_info_dir.joinpath("WHEEL").write_bytes(b"\xff")

with pytest.raises(UnsupportedWheel) as e:
wheel.wheel_metadata(str(tmpdir), dist_info_dir.name)
wheel.wheel_metadata(zip_dir(tmpdir), dist_info_dir.name)
assert "error decoding WHEEL" in str(e.value)


Expand Down