Skip to content

Commit 1e852ac

Browse files
authored
Make OpenFile filelike (#1024)
* Make OpenFile filelike * putative fixes
1 parent 3f8c8e0 commit 1e852ac

File tree

5 files changed

+111
-112
lines changed

5 files changed

+111
-112
lines changed

fsspec/core.py

Lines changed: 76 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from .compression import compr
1919
from .registry import filesystem, get_filesystem_class
2020
from .utils import (
21-
IOWrapper,
2221
_unstrip_protocol,
2322
build_name_function,
2423
infer_compression,
@@ -29,7 +28,7 @@
2928
logger = logging.getLogger("fsspec")
3029

3130

32-
class OpenFile(object):
31+
class OpenFile(io.IOBase):
3332
"""
3433
File-like object to be used in a context
3534
@@ -56,6 +55,10 @@ class OpenFile(object):
5655
How to handle encoding errors if opened in text mode.
5756
newline: None or str
5857
Passed to TextIOWrapper in text mode, how to handle line endings.
58+
autoopen: bool
59+
If True, calls open() immediately. Mostly used by pickle
60+
pos: int
61+
If given and autoopen is True, seek to this location immediately
5962
"""
6063

6164
def __init__(
@@ -67,6 +70,8 @@ def __init__(
6770
encoding=None,
6871
errors=None,
6972
newline=None,
73+
autoopen=False,
74+
pos=0,
7075
):
7176
self.fs = fs
7277
self.path = path
@@ -76,8 +81,17 @@ def __init__(
7681
self.errors = errors
7782
self.newline = newline
7883
self.fobjects = []
84+
if autoopen:
85+
self.open()
86+
self.seek(pos)
7987

8088
def __reduce__(self):
89+
if (
90+
("w" in self.mode or "a" in self.mode or "+" in self.mode)
91+
and not self.closed
92+
and self.tell()
93+
):
94+
raise ValueError("cannot pickle write-mode file after a write")
8195
return (
8296
OpenFile,
8397
(
@@ -88,6 +102,8 @@ def __reduce__(self):
88102
self.encoding,
89103
self.errors,
90104
self.newline,
105+
not self.closed,
106+
not self.closed and self.tell(),
91107
),
92108
)
93109

@@ -113,19 +129,27 @@ def __enter__(self):
113129
)
114130
self.fobjects.append(f)
115131

116-
return self.fobjects[-1]
132+
return self
117133

118134
def __exit__(self, *args):
119135
self.close()
120136

121137
def __del__(self):
122138
if hasattr(self, "fobjects"):
123-
self.fobjects.clear() # may cause cleanup of objects and close files
139+
self.close()
124140

125141
@property
126142
def full_name(self):
127143
return _unstrip_protocol(self.path, self.fs)
128144

145+
@property
146+
def f(self):
147+
if self.fobjects:
148+
return self.fobjects[-1]
149+
raise ValueError(
150+
"I/O operation on closed file. Please call " "open() or use a with context"
151+
)
152+
129153
def open(self):
130154
"""Materialise this as a real open file without context
131155
@@ -134,26 +158,58 @@ def open(self):
134158
objects, so they can close even if the parent OpenFile object has already
135159
been deleted; but a with-context is better style.
136160
"""
137-
out = self.__enter__()
138-
closer = out.close
139-
fobjects = self.fobjects.copy()[:-1]
140-
mode = self.mode
161+
return self.__enter__()
141162

142-
def close():
143-
# this func has no reference to
144-
closer() # original close bound method of the final file-like
145-
_close(fobjects, mode) # call close on other dependent file-likes
163+
def close(self):
164+
"""Close all encapsulated file objects"""
165+
for f in reversed(self.fobjects):
166+
if "r" not in self.mode and not f.closed:
167+
f.flush()
168+
f.close()
169+
self.fobjects.clear()
146170

147-
try:
148-
out.close = close
149-
except AttributeError:
150-
out = IOWrapper(out, lambda: _close(fobjects, mode))
171+
def readable(self) -> bool:
172+
return "r" in self.mode or "a" in self.mode
151173

152-
return out
174+
def writable(self) -> bool:
175+
return "r" not in self.mode
153176

154-
def close(self):
155-
"""Close all encapsulated file objects"""
156-
_close(self.fobjects, self.mode)
177+
def read(self, *args, **kwargs):
178+
return self.f.read(*args, **kwargs)
179+
180+
def write(self, *args, **kwargs):
181+
return self.f.write(*args, **kwargs)
182+
183+
def tell(self, *args, **kwargs):
184+
return self.f.tell(*args, **kwargs)
185+
186+
def seek(self, *args, **kwargs):
187+
return self.f.seek(*args, **kwargs)
188+
189+
def seekable(self, *args, **kwargs):
190+
return self.f.seekable(*args, **kwargs)
191+
192+
def readline(self, *args, **kwargs):
193+
return self.f.readline(*args, **kwargs)
194+
195+
def readlines(self, *args, **kwargs):
196+
return self.f.readlines(*args, **kwargs)
197+
198+
def flush(self) -> None:
199+
self.f.flush()
200+
201+
@property
202+
def closed(self):
203+
return len(self.fobjects) == 0 or self.f.closed
204+
205+
def fileno(self):
206+
return self.f.fileno()
207+
208+
def __iter__(self):
209+
return self.f.__iter__()
210+
211+
def __getattr__(self, item):
212+
return getattr(self.f, item)
157213

158214

159215
class OpenFiles(list):
@@ -214,14 +270,6 @@ def __repr__(self):
214270
return "<List of %s OpenFile instances>" % len(self)
215271

216272

217-
def _close(fobjects, mode):
218-
for f in reversed(fobjects):
219-
if "r" not in mode and not f.closed:
220-
f.flush()
221-
f.close()
222-
fobjects.clear()
223-
224-
225273
def open_files(
226274
urlpath,
227275
mode="rb",

fsspec/implementations/tar.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(
8383

8484
self._fo_ref = fo
8585
weakref.finalize(self, fo.close)
86-
self.fo = fo.__enter__() # the whole instance is a context
86+
self.fo = fo # the whole instance is a context
8787
self.tar = tarfile.TarFile(fileobj=self.fo)
8888
self.dir_cache = None
8989

fsspec/implementations/tests/test_cached.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,16 +188,39 @@ def test_pop():
188188

189189

190190
def test_write_pickle_context():
191+
tmp = str(tempfile.mkdtemp())
192+
fn = tmp + "afile"
193+
url = "simplecache::file://" + fn
194+
with fsspec.open(url, "wb") as f:
195+
pickle.loads(pickle.dumps(f)) # ok before write
196+
f.write(b"hello ")
197+
with pytest.raises(ValueError):
198+
pickle.dumps(f)
199+
200+
f2 = pickle.loads(pickle.dumps(f))
201+
assert f2.closed
202+
203+
assert open(fn, "rb").read() == b"hello "
204+
205+
206+
def test_write_pickle_nocontext():
191207
tmp = str(tempfile.mkdtemp())
192208
fn = tmp + "afile"
193209
url = "simplecache::file://" + fn
194210
f = fsspec.open(url, "wb").open()
211+
f2 = pickle.loads(pickle.dumps(f))
212+
195213
f.write(b"hello ")
196-
f.flush()
197-
with pickle.loads(pickle.dumps(f)) as f2:
198-
f2.write(b"world")
199214

200-
assert open(fn, "rb").read() == b"hello world"
215+
with pytest.raises(ValueError):
216+
pickle.loads(pickle.dumps(f))
217+
f2.write(b"world")
218+
f2.close()
219+
f.close()
220+
221+
assert (
222+
open(fn, "rb").read() == b"hello "
223+
) # content is first write, since f flushed/closed last
201224

202225

203226
def test_blocksize(ftp_writable):

fsspec/tests/test_core.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,13 @@ def test_openfile_pickle_newline():
183183
assert test.newline == restored.newline
184184

185185

186+
def test_pickle_after_open_open():
187+
test = fsspec.open(__file__, mode="rt").open()
188+
test2 = pickle.loads(pickle.dumps(test))
189+
test.close()
190+
assert not test2.closed
191+
192+
186193
def test_mismatch():
187194
with pytest.raises(ValueError, match="protocol"):
188195
open_files(["s3://test/path.csv", "/other/path.csv"])

fsspec/utils.py

Lines changed: 0 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
from contextlib import contextmanager
88
from functools import partial
99
from hashlib import md5
10-
from types import TracebackType
11-
from typing import IO, AnyStr, Callable, Iterable, Iterator, List, Optional, Type
1210
from urllib.parse import urlsplit
1311

1412
DEFAULT_BLOCK_SIZE = 5 * 2**20
@@ -544,83 +542,6 @@ def merge_offset_ranges(paths, starts, ends, max_gap=0, max_block=None, sort=Tru
544542
return paths, starts, ends
545543

546544

547-
class IOWrapper(IO):
548-
"""Wrapper for a file-like object that can be used in situations where we might
549-
want to, e.g., monkey-patch the close method but can't.
550-
(cf https://github.com/fsspec/filesystem_spec/issues/725)
551-
"""
552-
553-
def __init__(self, fp: IO, closer: Callable[[], None]):
554-
self.fp = fp
555-
self.closer = closer
556-
557-
def close(self) -> None:
558-
self.fp.close()
559-
560-
def fileno(self) -> int:
561-
return self.fp.fileno()
562-
563-
def flush(self) -> None:
564-
self.fp.flush()
565-
566-
def isatty(self) -> bool:
567-
return self.fp.isatty()
568-
569-
def read(self, n: int = ...) -> AnyStr:
570-
return self.fp.read(n)
571-
572-
def readable(self) -> bool:
573-
return self.fp.readable()
574-
575-
def readline(self, limit: int = ...) -> AnyStr:
576-
return self.fp.readline(limit)
577-
578-
def readlines(self, hint: int = ...) -> List[AnyStr]:
579-
return self.fp.readlines(hint)
580-
581-
def seek(self, offset: int, whence: int = ...) -> int:
582-
return self.fp.seek(offset, whence)
583-
584-
def seekable(self) -> bool:
585-
return self.fp.seekable()
586-
587-
def tell(self) -> int:
588-
return self.fp.tell()
589-
590-
def truncate(self, size: Optional[int] = ...) -> int:
591-
return self.fp.truncate(size)
592-
593-
def writable(self) -> bool:
594-
return self.fp.writable()
595-
596-
def write(self, s: AnyStr) -> int:
597-
return self.fp.write(s)
598-
599-
def writelines(self, lines: Iterable[AnyStr]) -> None:
600-
self.fp.writelines(lines)
601-
602-
def __next__(self) -> AnyStr:
603-
return next(self.fp)
604-
605-
def __iter__(self) -> Iterator[AnyStr]:
606-
return iter(self.fp)
607-
608-
def __enter__(self) -> IO[AnyStr]:
609-
return self.fp.__enter__()
610-
611-
def __exit__(
612-
self,
613-
t: Optional[Type[BaseException]],
614-
value: Optional[BaseException],
615-
traceback: Optional[TracebackType],
616-
) -> Optional[bool]:
617-
return self.fp.__exit__(t, value, traceback)
618-
619-
# forward anything else too
620-
def __getattr__(self, name):
621-
return getattr(self.fp, name)
622-
623-
624545
def file_size(filelike):
625546
"""Find length of any open read-mode file-like"""
626547
pos = filelike.tell()

0 commit comments

Comments
 (0)