Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions py7zr/archiveinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def write(self, file: BinaryIO):
def is_simple(self, coder):
return coder['numinstreams'] == 1 and coder['numoutstreams'] == 1

def get_decompressor(self, size: int) -> SevenZipDecompressor:
if self.decompressor is not None:
def get_decompressor(self, size: int, reset: bool = False) -> SevenZipDecompressor:
if self.decompressor is not None and not reset:
return self.decompressor
else:
try:
Expand Down
2 changes: 1 addition & 1 deletion py7zr/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def calculate_key(password: bytes, cycles: int, salt: bytes, digest: str) -> byt
ba.extend(password)
for i in range(32):
ba.append(0)
key = ba[:32] # type: bytes
key = bytes(ba[:32]) # type: bytes
else:
rounds = 1 << cycles
m = hashlib.sha256()
Expand Down
29 changes: 23 additions & 6 deletions py7zr/py7zr.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r',
try:
if mode == "r":
self._real_get_contents(self.fp)
self.reset()
self._reset_worker()
elif mode in 'w':
# FIXME: check filters here
self.folder = self._create_folder(filters)
Expand Down Expand Up @@ -472,7 +472,13 @@ def _set_file_property(self, outfilename: pathlib.Path, properties: Dict[str, An
ro_mask = 0o777 ^ (stat.S_IWRITE | stat.S_IWGRP | stat.S_IWOTH)
outfilename.chmod(outfilename.stat().st_mode & ro_mask)

def reset(self) -> None:
def _reset_decompressor(self) -> None:
if self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 0:
for i, folder in enumerate(self.header.main_streams.unpackinfo.folders):
compressed_size = self.header.main_streams.packinfo.packsizes[i]
folder.get_decompressor(compressed_size, reset=True)

def _reset_worker(self) -> None:
"""Seek to where archive data start in archive and recreate new worker."""
self.fp.seek(self.afterheader)
self.worker = Worker(self.files, self.afterheader, self.header)
Expand Down Expand Up @@ -503,7 +509,7 @@ def _test_digest_raw(self, pos: int, size: int, crc: int) -> bool:
return digest == crc

def _test_pack_digest(self) -> bool:
self.reset()
self._reset_worker()
crcs = self.header.main_streams.packinfo.crcs
if crcs is not None and len(crcs) > 0:
# check packed stream's crc
Expand All @@ -513,7 +519,7 @@ def _test_pack_digest(self) -> bool:
return True

def _test_unpack_digest(self) -> bool:
self.reset()
self._reset_worker()
for f in self.files:
self.worker.register_filelike(f.id, None)
try:
Expand Down Expand Up @@ -693,11 +699,13 @@ def extractall(self, path: Optional[Any] = None) -> None:
directories afterwards. `path' specifies a different directory
to extract to.
"""
return self.extract(path)

def extract(self, path: Optional[Any] = None, targets: Optional[List[str]] = None) -> None:
target_junction = [] # type: List[Tuple[BinaryIO, str]]
target_sym = [] # type: List[Tuple[BinaryIO, str]]
target_files = [] # type: List[Tuple[pathlib.Path, Dict[str, Any]]]
target_dirs = [] # type: List[pathlib.Path]
self.reset()
if path is not None:
if isinstance(path, str):
path = pathlib.Path(path)
Expand All @@ -714,7 +722,7 @@ def extractall(self, path: Optional[Any] = None) -> None:

multi_thread = self.header.main_streams is not None and self.header.main_streams.unpackinfo.numfolders > 1 and \
self.header.main_streams.packinfo.numstreams == self.header.main_streams.unpackinfo.numfolders
fnames = [] # type: List[str]
fnames = [] # type: List[str] # check duplicated filename in one archive?
for f in self.files:
# TODO: sanity check
# check whether f.filename with invalid characters: '../'
Expand All @@ -737,6 +745,9 @@ def extractall(self, path: Optional[Any] = None) -> None:
outfilename = path.joinpath(outname)
else:
outfilename = pathlib.Path(outname)
if targets is not None and f.filename not in targets:
self.worker.register_filelike(f.id, None)
continue
if f.is_directory:
if not outfilename.exists():
target_dirs.append(outfilename)
Expand Down Expand Up @@ -831,6 +842,12 @@ def close(self):
self._write_archive()
self._fpclose()

def reset(self) -> None:
"""When read mode, it reset file pointer, decompress worker and decompressor"""
if self.mode == 'r':
self._reset_worker()
self._reset_decompressor()


# --------------------
# exported functions
Expand Down
Binary file added tests/data/bzip2.7z
Binary file not shown.
Binary file added tests/data/ppmd.7z
Binary file not shown.
47 changes: 47 additions & 0 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import lzma
import os
import re
import sys

import pytest
Expand Down Expand Up @@ -367,3 +368,49 @@ def test_py7zr_writeall_dir(tmp_path):
assert len(archive.files) == 2
for f in archive.files:
assert f.filename in ('src', os.path.join('src', 'bra.txt'))


@pytest.mark.api
def test_py7zr_extract_specified_file(tmp_path):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'test_1.7z'), 'rb'))
expected = [{'filename': 'scripts/py7zr', 'mode': 33261, 'mtime': 1552522208,
'digest': 'b0385e71d6a07eb692f5fb9798e9d33aaf87be7dfff936fd2473eab2a593d4fd'}
]
archive.extract(path=tmp_path, targets=['scripts', 'scripts/py7zr'])
archive.close()
assert tmp_path.joinpath('scripts').is_dir()
assert tmp_path.joinpath('scripts/py7zr').exists()
assert not tmp_path.joinpath('setup.cfg').exists()
assert not tmp_path.joinpath('setup.py').exists()
check_output(expected, tmp_path)


@pytest.mark.api
def test_py7zr_extract_and_getnames(tmp_path):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'test_1.7z'), 'rb'))
allfiles = archive.getnames()
filter_pattern = re.compile(r'scripts.*')
targets = []
for f in allfiles:
if filter_pattern.match(f):
targets.append(f)
archive.extract(path=tmp_path, targets=targets)
archive.close()
assert tmp_path.joinpath('scripts').is_dir()
assert tmp_path.joinpath('scripts/py7zr').exists()
assert not tmp_path.joinpath('setup.cfg').exists()
assert not tmp_path.joinpath('setup.py').exists()


@pytest.mark.api
def test_py7zr_extract_and_reset_iteration(tmp_path):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'test_1.7z'), 'rb'))
iterations = archive.getnames()
for target in iterations:
archive.extract(path=tmp_path, targets=[target])
archive.reset()
archive.close()
assert tmp_path.joinpath('scripts').is_dir()
assert tmp_path.joinpath('scripts/py7zr').exists()
assert tmp_path.joinpath('setup.cfg').exists()
assert tmp_path.joinpath('setup.py').exists()
16 changes: 16 additions & 0 deletions tests/test_extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import py7zr
from py7zr import unpack_7zarchive
from py7zr.exceptions import UnsupportedCompressionMethodError
from py7zr.helpers import UTC

from . import aio7zr, decode_all
Expand Down Expand Up @@ -251,3 +252,18 @@ def test_extract_encrypted(tmp_path):
def test_extract_encrypted_2(tmp_path):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'encrypted_2.7z'), 'rb'), password='secret')
archive.extractall(path=tmp_path)


@pytest.mark.files
def test_extract_bzip2(tmp_path):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'bzip2.7z'), 'rb'))
archive.extractall(path=tmp_path)
archive.close()


@pytest.mark.files
def test_extract_ppmd(tmp_path):
with pytest.raises(UnsupportedCompressionMethodError):
archive = py7zr.SevenZipFile(open(os.path.join(testdata_path, 'ppmd.7z'), 'rb'))
archive.extractall(path=tmp_path)
archive.close()
4 changes: 3 additions & 1 deletion tests/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ def test_simple_compress_and_decompress():
@pytest.mark.unit
@pytest.mark.parametrize("password, cycle, salt, expected",
[('secret', 19, b'',
b'e\x11\xf1Pz<*\x98*\xe6\xde\xf4\xf6X\x18\xedl\xf2Be\x1a\xca\x19\xd1\\\xeb\xc6\xa6z\xe2\x89\x1d')
b'e\x11\xf1Pz<*\x98*\xe6\xde\xf4\xf6X\x18\xedl\xf2Be\x1a\xca\x19\xd1\\\xeb\xc6\xa6z\xe2\x89\x1d'),
('secret^&', 0x3f, b'i@#ri#Ildajfdk',
b'i@#ri#Ildajfdks\x00e\x00c\x00r\x00e\x00t\x00^\x00&\x00\x00\x00')
])
def test_calculate_key(password: str, cycle: int, salt: bytes, expected: bytes):
key = py7zr.helpers.calculate_key(password.encode('utf-16LE'), cycle, salt, 'sha256')
Expand Down