Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.4.12] - (Unreleased)

### Added

- Added `passwd` argument and `setpassword` for ReadZipFS to extract password
protected date from zip file. [#360](https://github.com/PyFilesystem/pyfilesystem2/issues/360)

### Changed

- Start testing on PyPy. Due to [#342](https://github.com/PyFilesystem/pyfilesystem2/issues/342)
Expand Down
8 changes: 8 additions & 0 deletions fs/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"NoURL",
"OperationFailed",
"OperationTimeout",
"PasswordUnsupported",
"PathError",
"PermissionDenied",
"RemoteConnectionError",
Expand Down Expand Up @@ -255,6 +256,13 @@ class RemoveRootError(OperationFailed):
default_message = "root directory may not be removed"


class PasswordUnsupported(Unsupported):
"""Attempt to create a password protected zip file.
"""

default_message = "can not create password protected zip"


class ResourceError(FSError):
"""Base exception class for error associated with a specific resource.
"""
Expand Down
53 changes: 41 additions & 12 deletions fs/zipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
if typing.TYPE_CHECKING:
from typing import (
Any,
AnyStr,
BinaryIO,
Collection,
Dict,
Expand All @@ -43,13 +44,27 @@
R = typing.TypeVar("R", bound="ReadZipFS")


def _bytes(s):
# type: (Optional[AnyStr]) -> Optional[bytes]
if s is None:
return None
elif isinstance(s, six.binary_type):
return s
elif isinstance(s, six.string_types):
return s.encode()
else:
raise TypeError("expected string type or byte type, not " + type(s).__name__)


class _ZipExtFile(RawWrapper):
def __init__(self, fs, name):
# type: (ReadZipFS, Text) -> None
def __init__(self, fs, name, passwd=None):
# type: (ReadZipFS, Text, Optional[AnyStr]) -> None
self._zip = _zip = fs._zip
self._end = _zip.getinfo(name).file_size
self._pos = 0
super(_ZipExtFile, self).__init__(_zip.open(name), "r", name)
super(_ZipExtFile, self).__init__(
_zip.open(name, pwd=_bytes(passwd)), "r", name
)

def read(self, size=-1):
# type: (int) -> bytes
Expand Down Expand Up @@ -160,6 +175,8 @@ class ZipFS(WrapFS):
defined in the `zipfile` module in the stdlib).
temp_fs (str): An FS URL for the temporary filesystem used to
store data prior to zipping.
passwd (str or bytes): Password for extracting file from zip file. Only
used for read mode.

"""

Expand All @@ -171,16 +188,19 @@ def __new__( # type: ignore
compression=zipfile.ZIP_DEFLATED, # type: int
encoding="utf-8", # type: Text
temp_fs="temp://__ziptemp__", # type: Text
passwd=None, # type: Optional[AnyStr]
):
# type: (...) -> FS
# This magic returns a different class instance based on the
# value of the ``write`` parameter.
if write:
if passwd is not None:
raise errors.PasswordUnsupported()
return WriteZipFS(
file, compression=compression, encoding=encoding, temp_fs=temp_fs
)
else:
return ReadZipFS(file, encoding=encoding)
return ReadZipFS(file, encoding=encoding, passwd=passwd)

if typing.TYPE_CHECKING:

Expand All @@ -191,6 +211,7 @@ def __init__(
compression=zipfile.ZIP_DEFLATED, # type: int
encoding="utf-8", # type: Text
temp_fs="temp://__ziptemp__", # type: Text
passwd=None, # type: Optional[AnyStr]
):
# type: (...) -> None
pass
Expand Down Expand Up @@ -290,13 +311,15 @@ class ReadZipFS(FS):
}

@errors.CreateFailed.catch_all
def __init__(self, file, encoding="utf-8"):
# type: (Union[BinaryIO, Text], Text) -> None
def __init__(self, file, encoding="utf-8", passwd=None):
# type: (Union[BinaryIO, Text], Text, Optional[AnyStr]) -> None
super(ReadZipFS, self).__init__()
self._file = file
self.encoding = encoding
self._zip = zipfile.ZipFile(file, "r")
self._directory_fs = None # type: Optional[MemoryFS]
if passwd is not None:
self.setpassword(_bytes(passwd))

def __repr__(self):
# type: () -> Text
Expand Down Expand Up @@ -409,8 +432,8 @@ def makedir(
self.check()
raise errors.ResourceReadOnly(path)

def openbin(self, path, mode="r", buffering=-1, **kwargs):
# type: (Text, Text, int, **Any) -> BinaryIO
def openbin(self, path, mode="r", buffering=-1, passwd=None, **kwargs):
# type: (Text, Text, int, Optional[AnyStr], **Any) -> BinaryIO
self.check()
if "w" in mode or "+" in mode or "a" in mode:
raise errors.ResourceReadOnly(path)
Expand All @@ -421,7 +444,7 @@ def openbin(self, path, mode="r", buffering=-1, **kwargs):
raise errors.FileExpected(path)

zip_name = self._path_to_zip_name(path)
return _ZipExtFile(self, zip_name) # type: ignore
return _ZipExtFile(self, zip_name, passwd) # type: ignore

def remove(self, path):
# type: (Text) -> None
Expand All @@ -439,13 +462,13 @@ def close(self):
if hasattr(self, "_zip"):
self._zip.close()

def readbytes(self, path):
# type: (Text) -> bytes
def readbytes(self, path, passwd=None):
# type: (Text, Optional[AnyStr]) -> bytes
self.check()
if not self._directory.isfile(path):
raise errors.ResourceNotFound(path)
zip_name = self._path_to_zip_name(path)
zip_bytes = self._zip.read(zip_name)
zip_bytes = self._zip.read(zip_name, pwd=_bytes(passwd))
return zip_bytes

def geturl(self, path, purpose="download"):
Expand All @@ -456,3 +479,9 @@ def geturl(self, path, purpose="download"):
return "zip://{}!/{}".format(quoted_file, quoted_path)
else:
raise errors.NoURL(path, purpose)

def setpassword(self, passwd):
# type: (AnyStr) -> None
"""Set *passwd* as default password to extract encrypted files.
"""
self._zip.setpassword(_bytes(passwd))
63 changes: 62 additions & 1 deletion tests/test_zipfs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- encoding: UTF-8
from __future__ import unicode_literals

import codecs
import os
import sys
import tempfile
Expand All @@ -13,13 +14,22 @@
from fs.compress import write_zip
from fs.opener import open_fs
from fs.opener.errors import NotWriteable
from fs.errors import NoURL
from fs.errors import NoURL, PasswordUnsupported
from fs.test import FSTestCases
from fs.enums import Seek

from .test_archives import ArchiveTestCases


class TestBytes(unittest.TestCase):
def test_conversion(self):
self.assertIsNone(zipfs._bytes(None))
self.assertEqual(zipfs._bytes("passwd"), b"passwd")
self.assertEqual(zipfs._bytes(b"passwd"), b"passwd")
with self.assertRaises(TypeError):
zipfs._bytes(1234)


class TestWriteReadZipFS(unittest.TestCase):
def setUp(self):
fh, self._temp_path = tempfile.mkstemp()
Expand All @@ -40,6 +50,10 @@ def test_unicode_paths(self):
with zip_fs.openbin(path) as f:
f.read()

def test_create_password(self):
with self.assertRaises(PasswordUnsupported):
zipfs.ZipFS(self._temp_path, write=True, passwd="hello")


class TestWriteZipFS(FSTestCases, unittest.TestCase):
"""
Expand Down Expand Up @@ -220,6 +234,53 @@ def test_implied(self):
os.remove(path)


class TestPasswordReadZipFS(unittest.TestCase):

ZIP_BIN = (
b"UEsDBAoACQAAAH2whk8tOwivGAAAAAwAAAADABwAZm9vVVQJAAPNX+pdzl/qXXV4CwABBPUBAAAE"
b"FAAAAJ6pj1kohibjIq4YqnEKUZ8SCJMeUkl9oVBLBwgtOwivGAAAAAwAAABQSwECHgMKAAkAAAB9"
b"sIZPLTsIrxgAAAAMAAAAAwAYAAAAAAABAAAApIEAAAAAZm9vVVQFAAPNX+pddXgLAAEE9QEAAAQU"
b"AAAAUEsFBgAAAAABAAEASQAAAGUAAAAAAA=="
)

PASSWD = "P@ssw0rd"

def setUp(self):
fh, path = tempfile.mkstemp("testzip.zip")
os.write(fh, codecs.decode(self.ZIP_BIN, "base64"))
os.close(fh)
self.path = path

def tearDown(self):
os.remove(self.path)

def test_openbin(self):
with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs:
with zip_fs.openbin("foo") as fp:
self.assertEqual(fp.read(), b"hello world\n")

with zipfs.ReadZipFS(self.path) as zip_fs:
with zip_fs.openbin("foo", passwd=self.PASSWD) as fp:
self.assertEqual(fp.read(), b"hello world\n")

def test_readbytes(self):
with zipfs.ReadZipFS(self.path, passwd=self.PASSWD) as zip_fs:
self.assertEqual(zip_fs.readbytes("foo"), b"hello world\n")

with zipfs.ReadZipFS(self.path) as zip_fs:
self.assertEqual(
zip_fs.readbytes("foo", passwd=self.PASSWD), b"hello world\n"
)

def test_setpassword(self):
with zipfs.ReadZipFS(self.path) as zip_fs:
with self.assertRaises(RuntimeError):
zip_fs._zip.read("foo")

zip_fs.setpassword(self.PASSWD)
self.assertEqual(zip_fs._zip.read("foo"), b"hello world\n")


class TestOpener(unittest.TestCase):
def test_not_writeable(self):
with self.assertRaises(NotWriteable):
Expand Down