Skip to content

Make OpenFile filelike #1024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 76 additions & 28 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from .compression import compr
from .registry import filesystem, get_filesystem_class
from .utils import (
IOWrapper,
_unstrip_protocol,
build_name_function,
infer_compression,
Expand All @@ -29,7 +28,7 @@
logger = logging.getLogger("fsspec")


class OpenFile(object):
class OpenFile(io.IOBase):
"""
File-like object to be used in a context

Expand All @@ -56,6 +55,10 @@ class OpenFile(object):
How to handle encoding errors if opened in text mode.
newline: None or str
Passed to TextIOWrapper in text mode, how to handle line endings.
autoopen: bool
If True, calls open() immediately. Mostly used by pickle
pos: int
If given and autoopen is True, seek to this location immediately
"""

def __init__(
Expand All @@ -67,6 +70,8 @@ def __init__(
encoding=None,
errors=None,
newline=None,
autoopen=False,
pos=0,
):
self.fs = fs
self.path = path
Expand All @@ -76,8 +81,17 @@ def __init__(
self.errors = errors
self.newline = newline
self.fobjects = []
if autoopen:
self.open()
self.seek(pos)

def __reduce__(self):
if (
("w" in self.mode or "a" in self.mode or "+" in self.mode)
and not self.closed
and self.tell()
):
raise ValueError("cannot pickle write-mode file after a write")
return (
OpenFile,
(
Expand All @@ -88,6 +102,8 @@ def __reduce__(self):
self.encoding,
self.errors,
self.newline,
not self.closed,
not self.closed and self.tell(),
),
)

Expand All @@ -113,19 +129,27 @@ def __enter__(self):
)
self.fobjects.append(f)

return self.fobjects[-1]
return self

def __exit__(self, *args):
self.close()

def __del__(self):
if hasattr(self, "fobjects"):
self.fobjects.clear() # may cause cleanup of objects and close files
self.close()

@property
def full_name(self):
return _unstrip_protocol(self.path, self.fs)

@property
def f(self):
if self.fobjects:
return self.fobjects[-1]
raise ValueError(
"I/O operation on closed file. Please call " "open() or use a with context"
)

def open(self):
"""Materialise this as a real open file without context

Expand All @@ -134,26 +158,58 @@ def open(self):
objects, so they can close even if the parent OpenFile object has already
been deleted; but a with-context is better style.
"""
out = self.__enter__()
closer = out.close
fobjects = self.fobjects.copy()[:-1]
mode = self.mode
return self.__enter__()

def close():
# this func has no reference to
closer() # original close bound method of the final file-like
_close(fobjects, mode) # call close on other dependent file-likes
def close(self):
"""Close all encapsulated file objects"""
for f in reversed(self.fobjects):
if "r" not in self.mode and not f.closed:
f.flush()
f.close()
self.fobjects.clear()

try:
out.close = close
except AttributeError:
out = IOWrapper(out, lambda: _close(fobjects, mode))
def readable(self) -> bool:
return "r" in self.mode or "a" in self.mode

return out
def writable(self) -> bool:
return "r" not in self.mode

def close(self):
"""Close all encapsulated file objects"""
_close(self.fobjects, self.mode)
def read(self, *args, **kwargs):
return self.f.read(*args, **kwargs)

def write(self, *args, **kwargs):
return self.f.write(*args, **kwargs)

def tell(self, *args, **kwargs):
return self.f.tell(*args, **kwargs)

def seek(self, *args, **kwargs):
return self.f.seek(*args, **kwargs)

def seekable(self, *args, **kwargs):
return self.f.seekable(*args, **kwargs)

def readline(self, *args, **kwargs):
return self.f.readline(*args, **kwargs)

def readlines(self, *args, **kwargs):
return self.f.readlines(*args, **kwargs)

def flush(self) -> None:
self.f.flush()

@property
def closed(self):
return len(self.fobjects) == 0 or self.f.closed

def fileno(self):
return self.f.fileno()

def __iter__(self):
return self.f.__iter__()

def __getattr__(self, item):
return getattr(self.f, item)


class OpenFiles(list):
Expand Down Expand Up @@ -214,14 +270,6 @@ def __repr__(self):
return "<List of %s OpenFile instances>" % len(self)


def _close(fobjects, mode):
for f in reversed(fobjects):
if "r" not in mode and not f.closed:
f.flush()
f.close()
fobjects.clear()


def open_files(
urlpath,
mode="rb",
Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(

self._fo_ref = fo
weakref.finalize(self, fo.close)
self.fo = fo.__enter__() # the whole instance is a context
self.fo = fo # the whole instance is a context
self.tar = tarfile.TarFile(fileobj=self.fo)
self.dir_cache = None

Expand Down
31 changes: 27 additions & 4 deletions fsspec/implementations/tests/test_cached.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,39 @@ def test_pop():


def test_write_pickle_context():
tmp = str(tempfile.mkdtemp())
fn = tmp + "afile"
url = "simplecache::file://" + fn
with fsspec.open(url, "wb") as f:
pickle.loads(pickle.dumps(f)) # ok before write
f.write(b"hello ")
with pytest.raises(ValueError):
pickle.dumps(f)

f2 = pickle.loads(pickle.dumps(f))
assert f2.closed

assert open(fn, "rb").read() == b"hello "


def test_write_pickle_nocontext():
tmp = str(tempfile.mkdtemp())
fn = tmp + "afile"
url = "simplecache::file://" + fn
f = fsspec.open(url, "wb").open()
f2 = pickle.loads(pickle.dumps(f))

f.write(b"hello ")
f.flush()
with pickle.loads(pickle.dumps(f)) as f2:
f2.write(b"world")

assert open(fn, "rb").read() == b"hello world"
with pytest.raises(ValueError):
pickle.loads(pickle.dumps(f))
f2.write(b"world")
f2.close()
f.close()

assert (
open(fn, "rb").read() == b"hello "
) # content is first write, since f flushed/closed last


def test_blocksize(ftp_writable):
Expand Down
7 changes: 7 additions & 0 deletions fsspec/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,13 @@ def test_openfile_pickle_newline():
assert test.newline == restored.newline


def test_pickle_after_open_open():
test = fsspec.open(__file__, mode="rt").open()
test2 = pickle.loads(pickle.dumps(test))
test.close()
assert not test2.closed


def test_mismatch():
with pytest.raises(ValueError, match="protocol"):
open_files(["s3://test/path.csv", "/other/path.csv"])
Expand Down
79 changes: 0 additions & 79 deletions fsspec/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from contextlib import contextmanager
from functools import partial
from hashlib import md5
from types import TracebackType
from typing import IO, AnyStr, Callable, Iterable, Iterator, List, Optional, Type
from urllib.parse import urlsplit

DEFAULT_BLOCK_SIZE = 5 * 2**20
Expand Down Expand Up @@ -544,83 +542,6 @@ def merge_offset_ranges(paths, starts, ends, max_gap=0, max_block=None, sort=Tru
return paths, starts, ends


class IOWrapper(IO):
"""Wrapper for a file-like object that can be used in situations where we might
want to, e.g., monkey-patch the close method but can't.
(cf https://github.com/fsspec/filesystem_spec/issues/725)
"""

def __init__(self, fp: IO, closer: Callable[[], None]):
self.fp = fp
self.closer = closer

def close(self) -> None:
self.fp.close()

def fileno(self) -> int:
return self.fp.fileno()

def flush(self) -> None:
self.fp.flush()

def isatty(self) -> bool:
return self.fp.isatty()

def read(self, n: int = ...) -> AnyStr:
return self.fp.read(n)

def readable(self) -> bool:
return self.fp.readable()

def readline(self, limit: int = ...) -> AnyStr:
return self.fp.readline(limit)

def readlines(self, hint: int = ...) -> List[AnyStr]:
return self.fp.readlines(hint)

def seek(self, offset: int, whence: int = ...) -> int:
return self.fp.seek(offset, whence)

def seekable(self) -> bool:
return self.fp.seekable()

def tell(self) -> int:
return self.fp.tell()

def truncate(self, size: Optional[int] = ...) -> int:
return self.fp.truncate(size)

def writable(self) -> bool:
return self.fp.writable()

def write(self, s: AnyStr) -> int:
return self.fp.write(s)

def writelines(self, lines: Iterable[AnyStr]) -> None:
self.fp.writelines(lines)

def __next__(self) -> AnyStr:
return next(self.fp)

def __iter__(self) -> Iterator[AnyStr]:
return iter(self.fp)

def __enter__(self) -> IO[AnyStr]:
return self.fp.__enter__()

def __exit__(
self,
t: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
return self.fp.__exit__(t, value, traceback)

# forward anything else too
def __getattr__(self, name):
return getattr(self.fp, name)


def file_size(filelike):
"""Find length of any open read-mode file-like"""
pos = filelike.tell()
Expand Down