From 618b6ebb6d3aebe433827e3c53923e77f243e29a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Thu, 8 Oct 2020 13:52:57 -0400 Subject: [PATCH 1/9] BUG/REF: read_csv shouldn't close user-provided file handles --- doc/source/whatsnew/v1.2.0.rst | 1 + pandas/_libs/parsers.pyx | 87 +++-------------- pandas/_typing.py | 2 +- pandas/io/common.py | 35 ++++--- pandas/io/parsers.py | 111 ++++++++++++++++------ pandas/tests/io/parser/test_common.py | 69 +++++++++++++- pandas/tests/io/parser/test_encoding.py | 4 + pandas/tests/io/parser/test_textreader.py | 4 +- 8 files changed, 192 insertions(+), 121 deletions(-) diff --git a/doc/source/whatsnew/v1.2.0.rst b/doc/source/whatsnew/v1.2.0.rst index 7111d54d65815..d321651a10646 100644 --- a/doc/source/whatsnew/v1.2.0.rst +++ b/doc/source/whatsnew/v1.2.0.rst @@ -492,6 +492,7 @@ I/O - Bug in output rendering of complex numbers showing too many trailing zeros (:issue:`36799`) - Bug in :class:`HDFStore` threw a ``TypeError`` when exporting an empty :class:`DataFrame` with ``datetime64[ns, tz]`` dtypes with a fixed HDF5 store (:issue:`20594`) - Bug in :class:`HDFStore` was dropping timezone information when exporting :class:`Series` with ``datetime64[ns, tz]`` dtypes with a fixed HDF5 store (:issue:`20594`) +- :func:`read_csv` was closing user-provided binary file handles when ``engine="c"`` and an ``encoding`` was requested (:issue:`36980`) Plotting ^^^^^^^^ diff --git a/pandas/_libs/parsers.pyx b/pandas/_libs/parsers.pyx index b87e46f9b6648..ce9b6e5d623eb 100644 --- a/pandas/_libs/parsers.pyx +++ b/pandas/_libs/parsers.pyx @@ -87,6 +87,7 @@ lzma = import_lzma() cdef: float64_t INF = np.inf float64_t NEGINF = -INF + int64_t DEFAULT_CHUNKSIZE = 256 * 1024 cdef extern from "headers/portable.h": @@ -275,14 +276,16 @@ cdef extern from "parser/io.h": size_t *bytes_read, int *status) -DEFAULT_CHUNKSIZE = 256 * 1024 - - cdef class TextReader: """ # source: StringIO or file object + ..versionchange:: 1.2.0 + removed 'compression' argument (outsourced to CParserWrapper). + removed 'memory_map' argument: CParserWrapper converts all + non-memory-mapped files to file handles. The remaining + (string) objects will be opened with 'memory_map=True'. """ cdef: @@ -299,7 +302,7 @@ cdef class TextReader: cdef public: int64_t leading_cols, table_width, skipfooter, buffer_lines - bint allow_leading_cols, mangle_dupe_cols, memory_map, low_memory + bint allow_leading_cols, mangle_dupe_cols, low_memory bint delim_whitespace object delimiter, converters object na_values @@ -307,8 +310,6 @@ cdef class TextReader: object index_col object skiprows object dtype - object encoding - object compression object usecols list dtype_cast_order set unnamed_cols @@ -321,10 +322,8 @@ cdef class TextReader: header_end=0, index_col=None, names=None, - bint memory_map=False, tokenize_chunksize=DEFAULT_CHUNKSIZE, bint delim_whitespace=False, - compression=None, converters=None, bint skipinitialspace=False, escapechar=None, @@ -364,8 +363,6 @@ cdef class TextReader: else: self.c_encoding = NULL - self.encoding = encoding - self.parser = parser_new() self.parser.chunksize = tokenize_chunksize @@ -374,9 +371,6 @@ cdef class TextReader: # For timekeeping self.clocks = [] - self.compression = compression - self.memory_map = memory_map - self.parser.usecols = (usecols is not None) self._setup_parser_source(source) @@ -562,11 +556,6 @@ cdef class TextReader: parser_del(self.parser) def close(self): - # we need to properly close an open derived - # filehandle here, e.g. and UTFRecoder - if self.handle is not None: - self.handle.close() - # also preemptively free all allocated memory parser_free(self.parser) if self.true_set: @@ -617,69 +606,21 @@ cdef class TextReader: self.parser.cb_io = NULL self.parser.cb_cleanup = NULL - if self.compression: - if self.compression == 'gzip': - if isinstance(source, str): - source = gzip.GzipFile(source, 'rb') - else: - source = gzip.GzipFile(fileobj=source) - elif self.compression == 'bz2': - source = bz2.BZ2File(source, 'rb') - elif self.compression == 'zip': - zip_file = zipfile.ZipFile(source) - zip_names = zip_file.namelist() - - if len(zip_names) == 1: - file_name = zip_names.pop() - source = zip_file.open(file_name) - - elif len(zip_names) == 0: - raise ValueError(f'Zero files found in compressed ' - f'zip file {source}') - else: - raise ValueError(f'Multiple files found in compressed ' - f'zip file {zip_names}') - elif self.compression == 'xz': - if isinstance(source, str): - source = get_lzma_file(lzma)(source, 'rb') - else: - source = get_lzma_file(lzma)(filename=source) - else: - raise ValueError(f'Unrecognized compression type: ' - f'{self.compression}') - - if (self.encoding and hasattr(source, "read") and - not hasattr(source, "encoding")): - source = io.TextIOWrapper( - source, self.encoding.decode('utf-8'), newline='') - - self.encoding = b'utf-8' - self.c_encoding = self.encoding - - self.handle = source - if isinstance(source, str): encoding = sys.getfilesystemencoding() or "utf-8" - usource = source source = source.encode(encoding) - - if self.memory_map: - ptr = new_mmap(source) - if ptr == NULL: - # fall back - ptr = new_file_source(source, self.parser.chunksize) - self.parser.cb_io = &buffer_file_bytes - self.parser.cb_cleanup = &del_file_source - else: - self.parser.cb_io = &buffer_mmap_bytes - self.parser.cb_cleanup = &del_mmap - else: + ptr = new_mmap(source) + if ptr == NULL: + # fall back ptr = new_file_source(source, self.parser.chunksize) self.parser.cb_io = &buffer_file_bytes self.parser.cb_cleanup = &del_file_source + else: + self.parser.cb_io = &buffer_mmap_bytes + self.parser.cb_cleanup = &del_mmap self.parser.source = ptr - elif hasattr(source, 'read'): + elif hasattr(source, "read"): # e.g., StringIO ptr = new_rd_source(source) diff --git a/pandas/_typing.py b/pandas/_typing.py index 3376559fb23ff..e2297ed2f10e4 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -153,7 +153,7 @@ @dataclass -class IOargs(Generic[ModeVar, EncodingVar]): +class IOArgs(Generic[ModeVar, EncodingVar]): """ Return value of io/common.py:get_filepath_or_buffer. diff --git a/pandas/io/common.py b/pandas/io/common.py index c147ae9fd0aa8..67af9a0087dee 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -3,7 +3,7 @@ import bz2 from collections import abc import gzip -from io import BufferedIOBase, BytesIO, RawIOBase +from io import BufferedIOBase, BytesIO, RawIOBase, TextIOWrapper import mmap import os import pathlib @@ -36,7 +36,7 @@ EncodingVar, FileOrBuffer, FilePathOrBuffer, - IOargs, + IOArgs, ModeVar, StorageOptions, ) @@ -176,7 +176,7 @@ def get_filepath_or_buffer( compression: CompressionOptions = None, mode: ModeVar = None, # type: ignore[assignment] storage_options: StorageOptions = None, -) -> IOargs[ModeVar, EncodingVar]: +) -> IOArgs[ModeVar, EncodingVar]: """ If the filepath_or_buffer is a url, translate and return the buffer. Otherwise passthrough. @@ -201,7 +201,7 @@ def get_filepath_or_buffer( ..versionchange:: 1.2.0 - Returns the dataclass IOargs. + Returns the dataclass IOArgs. """ filepath_or_buffer = stringify_path(filepath_or_buffer) @@ -225,6 +225,10 @@ def get_filepath_or_buffer( compression = dict(compression, method=compression_method) + # uniform encoding names + if encoding is not None: + encoding = encoding.replace("_", "-").lower() + # bz2 and xz do not write the byte order mark for utf-16 and utf-32 # print a warning when writing such files if ( @@ -258,7 +262,7 @@ def get_filepath_or_buffer( compression = {"method": "gzip"} reader = BytesIO(req.read()) req.close() - return IOargs( + return IOArgs( filepath_or_buffer=reader, encoding=encoding, compression=compression, @@ -310,7 +314,7 @@ def get_filepath_or_buffer( filepath_or_buffer, mode=fsspec_mode, **(storage_options or {}) ).open() - return IOargs( + return IOArgs( filepath_or_buffer=file_obj, encoding=encoding, compression=compression, @@ -323,7 +327,7 @@ def get_filepath_or_buffer( ) if isinstance(filepath_or_buffer, (str, bytes, mmap.mmap)): - return IOargs( + return IOArgs( filepath_or_buffer=_expand_user(filepath_or_buffer), encoding=encoding, compression=compression, @@ -335,7 +339,7 @@ def get_filepath_or_buffer( msg = f"Invalid file path or buffer object type: {type(filepath_or_buffer)}" raise ValueError(msg) - return IOargs( + return IOArgs( filepath_or_buffer=filepath_or_buffer, encoding=encoding, compression=compression, @@ -535,6 +539,10 @@ def get_handle( handles: List[Union[IO, _MMapWrapper]] = list() f = path_or_buf + # Windows does not default to utf-8. Set to utf-8 for a consistent behavior + if encoding is None: + encoding = "utf-8" + # Convert pathlib.Path/py.path.local or string path_or_buf = stringify_path(path_or_buf) is_path = isinstance(path_or_buf, str) @@ -594,18 +602,17 @@ def get_handle( if encoding and not is_binary_mode: # Encoding f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="") - elif is_text and not is_binary_mode: - # No explicit encoding - f = open(path_or_buf, mode, errors="replace", newline="") else: # Binary mode f = open(path_or_buf, mode) handles.append(f) # Convert BytesIO or file objects passed with an encoding - if is_text and (compression or isinstance(f, need_text_wrapping)): - from io import TextIOWrapper - + if is_text and ( + compression + or isinstance(f, need_text_wrapping) + or "b" in getattr(f, "mode", "") + ): g = TextIOWrapper(f, encoding=encoding, errors=errors, newline="") if not isinstance(f, (BufferedIOBase, RawIOBase)): handles.append(g) diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 2110a2d400be8..956b96aff3bc1 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -10,7 +10,7 @@ import re import sys from textwrap import fill -from typing import Any, Dict, Iterable, List, Optional, Sequence, Set +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple import warnings import numpy as np @@ -63,7 +63,13 @@ from pandas.core.series import Series from pandas.core.tools import datetimes as tools -from pandas.io.common import get_filepath_or_buffer, get_handle, validate_header_arg +from pandas.io.common import ( + get_compression_method, + get_filepath_or_buffer, + get_handle, + stringify_path, + validate_header_arg, +) from pandas.io.date_converters import generic_parser # BOM character (byte order mark) @@ -428,17 +434,16 @@ def _validate_names(names): def _read(filepath_or_buffer: FilePathOrBuffer, kwds): """Generic reader of line files.""" - encoding = kwds.get("encoding", None) storage_options = kwds.get("storage_options", None) - if encoding is not None: - encoding = re.sub("_", "-", encoding).lower() - kwds["encoding"] = encoding - compression = kwds.get("compression", "infer") ioargs = get_filepath_or_buffer( - filepath_or_buffer, encoding, compression, storage_options=storage_options + filepath_or_buffer, + kwds.get("encoding", None), + kwds.get("compression", "infer"), + storage_options=storage_options, ) kwds["compression"] = ioargs.compression + kwds["encoding"] = ioargs.encoding if kwds.get("date_parser", None) is not None: if isinstance(kwds["parse_dates"], bool): @@ -1403,7 +1408,19 @@ def _validate_parse_dates_presence(self, columns: List[str]) -> None: ) def close(self): + if self._original_handle or isinstance(self.source, str): + # user-provided file handle or a string when using memory_map=True + pass + elif self._non_closeable_wrapper and not self.source.closed: + # user provided file handle that is wrap inside TextIOWrapper + # closing TextIOWrapper would close the file handle as well + self.source.detach() + else: + self.source.close() for f in self.handles: + if self.source == f: + # skip: if we alreay detached/closed the buffer + continue f.close() @property @@ -1838,23 +1855,39 @@ def __init__(self, src, **kwds): ParserBase.__init__(self, kwds) - encoding = kwds.get("encoding") + if kwds.get("memory_map", False): + # memory-mapped files are directly handled by the TextReader. + # compression and file-object were never supported by it. + src = stringify_path(src) - # parsers.TextReader doesn't support compression dicts - if isinstance(kwds.get("compression"), dict): - kwds["compression"] = kwds["compression"]["method"] - - if kwds.get("compression") is None and encoding: - if isinstance(src, str): - src = open(src, "rb") - self.handles.append(src) + if not isinstance(src, str): + raise ValueError( + "read_csv supports only string-like objects with engine='c' " + + "and memory_map=True. Please use engine='python' instead." + ) - # Handle the file object with universal line mode enabled. - # We will handle the newline character ourselves later on. - if hasattr(src, "read") and not hasattr(src, "encoding"): - src = TextIOWrapper(src, encoding=encoding, newline="") + if get_compression_method(kwds.get("compression", None))[0] is not None: + raise ValueError( + "read_csv does not support compression with memory_map=True. " + + "Please use memory_map=False instead." + ) - kwds["encoding"] = "utf-8" + self.source = src + self.handles = [] + else: + self.source, self.handles = get_handle( + src, + mode="r", + encoding=kwds.get("encoding", None), + compression=kwds.get("compression", None), + is_text=True, + ) + kwds.pop("encoding", None) + kwds.pop("memory_map", None) + kwds.pop("compression", None) + self._original_handle, self._non_closeable_wrapper = _can_close( + src, self.source + ) # #2442 kwds["allow_leading_cols"] = self.index_col is not False @@ -1863,7 +1896,7 @@ def __init__(self, src, **kwds): self.usecols, self.usecols_dtype = _validate_usecols_arg(kwds["usecols"]) kwds["usecols"] = self.usecols - self._reader = parsers.TextReader(src, **kwds) + self._reader = parsers.TextReader(self.source, **kwds) self.unnamed_cols = self._reader.unnamed_cols passed_names = self.names is None @@ -1942,11 +1975,10 @@ def __init__(self, src, **kwds): self._implicit_index = self._reader.leading_cols > 0 - def close(self): - for f in self.handles: - f.close() + def close(self) -> None: + super().close() - # close additional handles opened by C parser (for compression) + # close additional handles opened by C parser (for memory_map) try: self._reader.close() except ValueError: @@ -2237,7 +2269,7 @@ def __init__(self, f, **kwds): self.comment = kwds["comment"] self._comment_lines = [] - f, handles = get_handle( + self.source, handles = get_handle( f, "r", encoding=self.encoding, @@ -2245,12 +2277,13 @@ def __init__(self, f, **kwds): memory_map=self.memory_map, ) self.handles.extend(handles) + self._original_handle, self._non_closeable_wrapper = _can_close(f, self.source) # Set self.data to something that can read lines. - if hasattr(f, "readline"): - self._make_reader(f) + if hasattr(self.source, "readline"): + self._make_reader(self.source) else: - self.data = f + self.data = self.source # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. @@ -3852,3 +3885,19 @@ def _validate_skipfooter(kwds: Dict[str, Any]) -> None: raise ValueError("'skipfooter' not supported for iteration") if kwds.get("nrows"): raise ValueError("'skipfooter' not supported with 'nrows'") + + +def _can_close( + original_handle: FilePathOrBuffer, wrapped_handle: FilePathOrBuffer +) -> Tuple[bool, bool]: + """Check whether the output of get_handle can be closed.""" + # do not close user-provided file handles + is_original_handle = id(wrapped_handle) == id(original_handle) + + # TextIOWrapper closes wrapped file handles, we cannot close it if the user + # provided a file handler. Needs to be detached (and flushed when writing). + non_closeable_wrapper = hasattr(original_handle, "read") and isinstance( + wrapped_handle, TextIOWrapper + ) + + return is_original_handle, non_closeable_wrapper diff --git a/pandas/tests/io/parser/test_common.py b/pandas/tests/io/parser/test_common.py index b33289213e258..8ee98b7d29df1 100644 --- a/pandas/tests/io/parser/test_common.py +++ b/pandas/tests/io/parser/test_common.py @@ -6,7 +6,7 @@ import csv from datetime import datetime from inspect import signature -from io import StringIO +from io import BytesIO, StringIO import os import platform from urllib.error import URLError @@ -2253,3 +2253,70 @@ def test_dict_keys_as_names(all_parsers): result = parser.read_csv(StringIO(data), names=keys) expected = DataFrame({"a": [1], "b": [2]}) tm.assert_frame_equal(result, expected) + + +@pytest.mark.parametrize("io_class", [StringIO, BytesIO]) +@pytest.mark.parametrize("encoding", [None, "utf-8"]) +def test_read_csv_file_handle(all_parsers, io_class, encoding): + """ + Test whether read_csv does not close user-provided file handles. + + GH 36980 + """ + parser = all_parsers + expected = DataFrame({"a": [1], "b": [2]}) + + content = "a,b\n1,2" + if io_class == BytesIO: + content = content.encode("utf-8") + handle = io_class(content) + + tm.assert_frame_equal(parser.read_csv(handle, encoding=encoding), expected) + assert not handle.closed + + +def test_memory_map_compression_error(c_parser_only): + """ + c-parsers do not support memory_map=True with compression. + + GH 36997 + """ + parser = c_parser_only + df = DataFrame({"a": [1], "b": [2]}) + msg = ( + "read_csv does not support compression with memory_map=True. " + + "Please use memory_map=False instead." + ) + + with tm.ensure_clean() as path: + df.to_csv(path, compression="gzip", index=False) + + with pytest.raises(ValueError, match=msg): + parser.read_csv(path, memory_map=True, compression="gzip") + + +def test_memory_map_file_handle_error(all_parsers): + """ + c-parsers do support only string-like files with memory_map=True. + + GH 36997 + """ + parser = all_parsers + expected = DataFrame({"a": [1], "b": [2]}) + msg = ( + "read_csv supports only string-like objects with engine='c' " + + "and memory_map=True. Please use engine='python' instead." + ) + + handle = StringIO("a,b\n1,2") + + if parser.engine != "python": + # c engine should fail + with pytest.raises(ValueError, match=msg): + parser.read_csv(handle, memory_map=True) + else: + # verify that python engine supports the same call + tm.assert_frame_equal( + parser.read_csv(handle, memory_map=True), + expected, + ) diff --git a/pandas/tests/io/parser/test_encoding.py b/pandas/tests/io/parser/test_encoding.py index 876696ecdad9c..e74265da3e966 100644 --- a/pandas/tests/io/parser/test_encoding.py +++ b/pandas/tests/io/parser/test_encoding.py @@ -152,14 +152,17 @@ def test_binary_mode_file_buffers( with open(fpath, mode="r", encoding=encoding) as fa: result = parser.read_csv(fa) + assert not fa.closed tm.assert_frame_equal(expected, result) with open(fpath, mode="rb") as fb: result = parser.read_csv(fb, encoding=encoding) + assert not fb.closed tm.assert_frame_equal(expected, result) with open(fpath, mode="rb", buffering=0) as fb: result = parser.read_csv(fb, encoding=encoding) + assert not fb.closed tm.assert_frame_equal(expected, result) @@ -199,6 +202,7 @@ def test_encoding_named_temp_file(all_parsers): result = parser.read_csv(f, encoding=encoding) tm.assert_frame_equal(result, expected) + assert not f.closed @pytest.mark.parametrize( diff --git a/pandas/tests/io/parser/test_textreader.py b/pandas/tests/io/parser/test_textreader.py index 1c2518646bb29..59aecb4b393aa 100644 --- a/pandas/tests/io/parser/test_textreader.py +++ b/pandas/tests/io/parser/test_textreader.py @@ -32,12 +32,14 @@ def test_file_handle(self): reader.read() def test_string_filename(self): + # this is using memory_map=True reader = TextReader(self.csv1, header=None) reader.read() def test_file_handle_mmap(self): + # this was never using memory_map=True with open(self.csv1, "rb") as f: - reader = TextReader(f, memory_map=True, header=None) + reader = TextReader(f, header=None) reader.read() def test_StringIO(self): From 4f26aeaaedc086c17a9391de98339f5b9ec328da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Fri, 23 Oct 2020 23:27:54 -0400 Subject: [PATCH 2/9] get_handle: typing, returns is_wrapped, use dataclass, and make sure that all created handlers are returned --- pandas/_typing.py | 20 +++- pandas/io/common.py | 80 ++++++++----- pandas/io/formats/csvs.py | 32 +++--- pandas/io/json/_json.py | 64 +++++------ pandas/io/parsers.py | 69 ++++-------- pandas/io/pickle.py | 26 ++--- pandas/io/stata.py | 125 +++++++++------------ pandas/tests/frame/methods/test_to_csv.py | 6 +- pandas/tests/io/json/test_readlines.py | 2 +- pandas/tests/io/test_compression.py | 26 ++--- pandas/tests/series/methods/test_to_csv.py | 6 +- 11 files changed, 214 insertions(+), 242 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index e2297ed2f10e4..5f67cc16c3daa 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import datetime, timedelta, tzinfo -from io import IOBase +from io import BufferedIOBase, RawIOBase, TextIOBase, TextIOWrapper from pathlib import Path from typing import ( IO, @@ -77,8 +77,13 @@ "ExtensionDtype", str, np.dtype, Type[Union[str, float, int, complex, bool, object]] ] DtypeObj = Union[np.dtype, "ExtensionDtype"] -FilePathOrBuffer = Union[str, Path, IO[AnyStr], IOBase] -FileOrBuffer = Union[str, IO[AnyStr], IOBase] +FilePathOrBuffer = Union[ + str, Path, IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper +] +FileOrBuffer = Union[ + str, IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper +] +Buffer = Union[IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper] # FrameOrSeriesUnion means either a DataFrame or a Series. E.g. # `def func(a: FrameOrSeriesUnion) -> FrameOrSeriesUnion: ...` means that if a Series @@ -168,3 +173,12 @@ class IOArgs(Generic[ModeVar, EncodingVar]): compression: CompressionDict should_close: bool mode: Union[ModeVar, str] + + +@dataclass +class HandleArgs: + """Return value of io/common.py:get_handle""" + + handle: Buffer + created_handles: List[Buffer] + is_wrapped: bool diff --git a/pandas/io/common.py b/pandas/io/common.py index 67af9a0087dee..70dc83bf62856 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -18,7 +18,6 @@ Optional, Tuple, Type, - Union, ) from urllib.parse import ( urljoin, @@ -31,11 +30,13 @@ import zipfile from pandas._typing import ( + Buffer, CompressionDict, CompressionOptions, EncodingVar, FileOrBuffer, FilePathOrBuffer, + HandleArgs, IOArgs, ModeVar, StorageOptions, @@ -459,14 +460,14 @@ def infer_compression( def get_handle( - path_or_buf, + path_or_buf: FilePathOrBuffer, mode: str, - encoding=None, + encoding: Optional[str] = None, compression: CompressionOptions = None, memory_map: bool = False, is_text: bool = True, - errors=None, -): + errors: Optional[str] = None, +) -> HandleArgs: """ Get file handle for given path/buffer and mode. @@ -514,10 +515,15 @@ def get_handle( Returns ------- - f : file-like - A file-like object. - handles : list of file-like objects - A list of file-like object that were opened in this function. + handleArgs : + handle: The file handle to be used. + created_handles: All file handles that are created by get_handle + (iterate over it to close buffers). + is_wrapped: Whether TextIOWrapper is added (cannot close it as it closes + wrapped buffers: flush&detach it and close all remaining buffers + in created_handles). + + .. versionadded:: 1.2.0 """ need_text_wrapping: Tuple[Type["IOBase"], ...] try: @@ -536,8 +542,7 @@ def get_handle( except ImportError: pass - handles: List[Union[IO, _MMapWrapper]] = list() - f = path_or_buf + handles: List[Buffer] = list() # Windows does not default to utf-8. Set to utf-8 for a consistent behavior if encoding is None: @@ -546,6 +551,7 @@ def get_handle( # Convert pathlib.Path/py.path.local or string path_or_buf = stringify_path(path_or_buf) is_path = isinstance(path_or_buf, str) + f = path_or_buf compression, compression_args = get_compression_method(compression) if is_path: @@ -556,25 +562,29 @@ def get_handle( # GZ Compression if compression == "gzip": if is_path: + assert isinstance(path_or_buf, str) f = gzip.GzipFile(filename=path_or_buf, mode=mode, **compression_args) else: - f = gzip.GzipFile(fileobj=path_or_buf, mode=mode, **compression_args) + f = gzip.GzipFile( + fileobj=path_or_buf, # type: ignore[arg-type] + mode=mode, + **compression_args, + ) # BZ Compression elif compression == "bz2": - f = bz2.BZ2File(path_or_buf, mode=mode, **compression_args) + f = bz2.BZ2File( + path_or_buf, mode=mode, **compression_args # type: ignore[arg-type] + ) # ZIP Compression elif compression == "zip": - zf = _BytesZipFile(path_or_buf, mode, **compression_args) - # Ensure the container is closed as well. - handles.append(zf) - if zf.mode == "w": - f = zf - elif zf.mode == "r": - zip_names = zf.namelist() + f = _BytesZipFile(path_or_buf, mode, **compression_args) + if f.mode == "r": + handles.append(f) + zip_names = f.namelist() if len(zip_names) == 1: - f = zf.open(zip_names.pop()) + f = f.open(zip_names.pop()) elif len(zip_names) == 0: raise ValueError(f"Zero files found in ZIP file {path_or_buf}") else: @@ -592,13 +602,14 @@ def get_handle( msg = f"Unrecognized compression type: {compression}" raise ValueError(msg) + assert not isinstance(f, str) handles.append(f) elif is_path: # Check whether the filename is to be opened in binary mode. # Binary mode does not support 'encoding' and 'newline'. is_binary_mode = "b" in mode - + assert isinstance(path_or_buf, str) if encoding and not is_binary_mode: # Encoding f = open(path_or_buf, mode, encoding=encoding, errors=errors, newline="") @@ -608,23 +619,26 @@ def get_handle( handles.append(f) # Convert BytesIO or file objects passed with an encoding + is_wrapped = False if is_text and ( compression or isinstance(f, need_text_wrapping) or "b" in getattr(f, "mode", "") ): - g = TextIOWrapper(f, encoding=encoding, errors=errors, newline="") - if not isinstance(f, (BufferedIOBase, RawIOBase)): - handles.append(g) - f = g + f = TextIOWrapper( + f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type] + ) + handles.append(f) + is_wrapped = True if memory_map and hasattr(f, "fileno"): + assert not isinstance(f, str) try: - wrapped = _MMapWrapper(f) + wrapped = _MMapWrapper(f) # type: ignore[arg-type] f.close() handles.remove(f) - handles.append(wrapped) - f = wrapped + handles.append(wrapped) # type: ignore[arg-type] + f = wrapped # type: ignore[assignment] except Exception: # we catch any errors that may have occurred # because that is consistent with the lower-level @@ -632,7 +646,13 @@ def get_handle( # leave the file handler as is then pass - return f, handles + handles.reverse() # close the most recently added buffer first + assert not isinstance(f, str) + return HandleArgs( + handle=f, + created_handles=handles, + is_wrapped=is_wrapped, + ) # error: Definition of "__exit__" in base class "ZipFile" is incompatible with diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index 6c62d6825bc84..0a47e1dee40f8 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -240,7 +240,7 @@ def save(self) -> None: """ # get a handle or wrap an existing handle to take care of 1) compression and # 2) text -> byte conversion - f, handles = get_handle( + handleArgs = get_handle( self.path_or_buf, self.mode, encoding=self.encoding, @@ -251,7 +251,7 @@ def save(self) -> None: try: # Note: self.encoding is irrelevant here self.writer = csvlib.writer( - f, + handleArgs.handle, # type: ignore[arg-type] lineterminator=self.line_terminator, delimiter=self.sep, quoting=self.quoting, @@ -263,23 +263,21 @@ def save(self) -> None: self._save() finally: - if self.should_close: - f.close() - elif ( - isinstance(f, TextIOWrapper) - and not f.closed - and f != self.path_or_buf - and hasattr(self.path_or_buf, "write") - ): + if handleArgs.is_wrapped: # get_handle uses TextIOWrapper for non-binary handles. TextIOWrapper # closes the wrapped handle if it is not detached. - f.flush() # make sure everything is written - f.detach() # makes f unusable - del f - elif f != self.path_or_buf: - f.close() - for _fh in handles: - _fh.close() + assert isinstance(handleArgs.handle, TextIOWrapper) + handleArgs.handle.flush() + handleArgs.handle.detach() + handleArgs.created_handles.remove(handleArgs.handle) + for handle in handleArgs.created_handles: + handle.close() + if ( + self.should_close + and not self.path_or_buf.closed # type: ignore[union-attr] + ): + assert not isinstance(self.path_or_buf, str) + self.path_or_buf.close() def _save(self) -> None: if self._need_to_save_header: diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 98b9a585d890e..e506357e16d98 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -1,10 +1,10 @@ from abc import ABC, abstractmethod from collections import abc import functools -from io import BytesIO, StringIO +from io import StringIO, TextIOWrapper from itertools import islice import os -from typing import IO, Any, Callable, List, Mapping, Optional, Tuple, Type, Union +from typing import Any, Callable, Mapping, Optional, Tuple, Type, Union import numpy as np @@ -12,6 +12,7 @@ from pandas._libs.tslibs import iNaT from pandas._typing import ( CompressionOptions, + HandleArgs, IndexLabel, JSONSerializable, StorageOptions, @@ -101,20 +102,22 @@ def to_json( if lines: s = convert_to_line_delimits(s) - if isinstance(path_or_buf, str): - fh, handles = get_handle(path_or_buf, "w", compression=compression) + if path_or_buf is not None: + handleArgs = get_handle(path_or_buf, "w", compression=compression) try: - fh.write(s) + handleArgs.handle.write(s) finally: - fh.close() - for handle in handles: - handle.close() - elif path_or_buf is None: - return s + if handleArgs.is_wrapped: + assert isinstance(handleArgs.handle, TextIOWrapper) + handleArgs.handle.flush() + handleArgs.handle.detach() + handleArgs.created_handles.remove(handleArgs.handle) + for handle in handleArgs.created_handles: + handle.close() + if should_close: + path_or_buf.close() else: - path_or_buf.write(s) - if should_close: - path_or_buf.close() + return s class Writer(ABC): @@ -629,9 +632,8 @@ def __init__( self.lines = lines self.chunksize = chunksize self.nrows_seen = 0 - self.should_close = False self.nrows = nrows - self.file_handles: List[IO] = [] + self.handleArgs: Optional[HandleArgs] = None if self.chunksize is not None: self.chunksize = validate_integer("chunksize", self.chunksize, 1) @@ -670,30 +672,25 @@ def _get_data_from_filepath(self, filepath_or_buffer): This method turns (1) into (2) to simplify the rest of the processing. It returns input types (2) and (3) unchanged. """ - data = filepath_or_buffer - + # if it is a string but the file does not exist, it might be a JSON string exists = False - if isinstance(data, str): + if isinstance(filepath_or_buffer, str): try: exists = os.path.exists(filepath_or_buffer) # gh-5874: if the filepath is too long will raise here except (TypeError, ValueError): pass - if exists or self.compression["method"] is not None: - data, self.file_handles = get_handle( + if exists or not isinstance(filepath_or_buffer, str): + self.handleArgs = get_handle( filepath_or_buffer, "r", encoding=self.encoding, compression=self.compression, ) - self.should_close = True - self.open_stream = data + filepath_or_buffer = self.handleArgs.handle - if isinstance(data, BytesIO): - data = data.getvalue().decode() - - return data + return filepath_or_buffer def _combine_lines(self, lines) -> str: """ @@ -757,13 +754,14 @@ def close(self): If an open stream or file was passed, we leave it open. """ - if self.should_close: - try: - self.open_stream.close() - except (OSError, AttributeError): - pass - for file_handle in self.file_handles: - file_handle.close() + if self.handleArgs is None: + return + if self.handleArgs.is_wrapped: + assert isinstance(self.handleArgs.handle, TextIOWrapper) + self.handleArgs.handle.detach() + self.handleArgs.created_handles.remove(self.handleArgs.handle) + for handle in self.handleArgs.created_handles: + handle.close() def __next__(self): if self.nrows: diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 956b96aff3bc1..b01c6369ee0a9 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -5,12 +5,12 @@ from collections import abc, defaultdict import csv import datetime -from io import StringIO, TextIOWrapper +from io import StringIO import itertools import re import sys from textwrap import fill -from typing import Any, Dict, Iterable, List, Optional, Sequence, Set, Tuple +from typing import Any, Dict, Iterable, List, Optional, Sequence, Set import warnings import numpy as np @@ -20,7 +20,7 @@ import pandas._libs.parsers as parsers from pandas._libs.parsers import STR_NA_VALUES from pandas._libs.tslibs import parsing -from pandas._typing import FilePathOrBuffer, StorageOptions, Union +from pandas._typing import FilePathOrBuffer, HandleArgs, StorageOptions, Union from pandas.errors import ( AbstractMethodError, EmptyDataError, @@ -1355,10 +1355,6 @@ def __init__(self, kwds): self._first_chunk = True - # GH 13932 - # keep references to file handles opened by the parser itself - self.handles = [] - def _validate_parse_dates_presence(self, columns: List[str]) -> None: """ Check if parse_dates are in columns. @@ -1408,20 +1404,13 @@ def _validate_parse_dates_presence(self, columns: List[str]) -> None: ) def close(self): - if self._original_handle or isinstance(self.source, str): - # user-provided file handle or a string when using memory_map=True - pass - elif self._non_closeable_wrapper and not self.source.closed: - # user provided file handle that is wrap inside TextIOWrapper + if self.handleArgs.is_wrapped: + # user-provided file handle that is wrap inside TextIOWrapper # closing TextIOWrapper would close the file handle as well - self.source.detach() - else: - self.source.close() - for f in self.handles: - if self.source == f: - # skip: if we alreay detached/closed the buffer - continue - f.close() + self.handleArgs.handle.detach() + self.handleArgs.created_handles.remove(self.handleArgs.handle) + for handle in self.handleArgs.created_handles: + handle.close() @property def _has_complex_date_col(self): @@ -1872,10 +1861,13 @@ def __init__(self, src, **kwds): + "Please use memory_map=False instead." ) - self.source = src - self.handles = [] + self.handleArgs = HandleArgs( + handle=src, + created_handles=[], + is_wrapped=False, + ) else: - self.source, self.handles = get_handle( + self.handleArgs = get_handle( src, mode="r", encoding=kwds.get("encoding", None), @@ -1885,9 +1877,6 @@ def __init__(self, src, **kwds): kwds.pop("encoding", None) kwds.pop("memory_map", None) kwds.pop("compression", None) - self._original_handle, self._non_closeable_wrapper = _can_close( - src, self.source - ) # #2442 kwds["allow_leading_cols"] = self.index_col is not False @@ -1896,7 +1885,7 @@ def __init__(self, src, **kwds): self.usecols, self.usecols_dtype = _validate_usecols_arg(kwds["usecols"]) kwds["usecols"] = self.usecols - self._reader = parsers.TextReader(self.source, **kwds) + self._reader = parsers.TextReader(self.handleArgs.handle, **kwds) self.unnamed_cols = self._reader.unnamed_cols passed_names = self.names is None @@ -2269,21 +2258,19 @@ def __init__(self, f, **kwds): self.comment = kwds["comment"] self._comment_lines = [] - self.source, handles = get_handle( + self.handleArgs = get_handle( f, "r", encoding=self.encoding, compression=self.compression, memory_map=self.memory_map, ) - self.handles.extend(handles) - self._original_handle, self._non_closeable_wrapper = _can_close(f, self.source) # Set self.data to something that can read lines. - if hasattr(self.source, "readline"): - self._make_reader(self.source) + if hasattr(self.handleArgs.handle, "readline"): + self._make_reader(self.handleArgs.handle) else: - self.data = self.source + self.data = self.handleArgs.handle # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. @@ -3885,19 +3872,3 @@ def _validate_skipfooter(kwds: Dict[str, Any]) -> None: raise ValueError("'skipfooter' not supported for iteration") if kwds.get("nrows"): raise ValueError("'skipfooter' not supported with 'nrows'") - - -def _can_close( - original_handle: FilePathOrBuffer, wrapped_handle: FilePathOrBuffer -) -> Tuple[bool, bool]: - """Check whether the output of get_handle can be closed.""" - # do not close user-provided file handles - is_original_handle = id(wrapped_handle) == id(original_handle) - - # TextIOWrapper closes wrapped file handles, we cannot close it if the user - # provided a file handler. Needs to be detached (and flushed when writing). - non_closeable_wrapper = hasattr(original_handle, "read") and isinstance( - wrapped_handle, TextIOWrapper - ) - - return is_original_handle, non_closeable_wrapper diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index 426a40a65b522..6777a82f219a4 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -92,19 +92,16 @@ def to_pickle( mode="wb", storage_options=storage_options, ) - f, fh = get_handle( + handleArgs = get_handle( ioargs.filepath_or_buffer, "wb", compression=ioargs.compression, is_text=False ) if protocol < 0: protocol = pickle.HIGHEST_PROTOCOL try: - pickle.dump(obj, f, protocol=protocol) + pickle.dump(obj, handleArgs.handle, protocol=protocol) # type: ignore[arg-type] finally: - if f != filepath_or_buffer: - # do not close user-provided file objects GH 35679 - f.close() - for _f in fh: - _f.close() + for handle in handleArgs.created_handles: + handle.close() if ioargs.should_close: assert not isinstance(ioargs.filepath_or_buffer, str) try: @@ -193,7 +190,7 @@ def read_pickle( ioargs = get_filepath_or_buffer( filepath_or_buffer, compression=compression, storage_options=storage_options ) - f, fh = get_handle( + handleArgs = get_handle( ioargs.filepath_or_buffer, "rb", compression=ioargs.compression, is_text=False ) @@ -208,21 +205,18 @@ def read_pickle( with warnings.catch_warnings(record=True): # We want to silence any warnings about, e.g. moved modules. warnings.simplefilter("ignore", Warning) - return pickle.load(f) + return pickle.load(handleArgs.handle) # type: ignore[arg-type] except excs_to_catch: # e.g. # "No module named 'pandas.core.sparse.series'" # "Can't get attribute '__nat_unpickle' on Tuple[BinaryIO, bool, CompressionOptions]: +) -> Tuple[HandleArgs, CompressionOptions]: """ Open a binary file or no-op if file-like. @@ -1958,34 +1954,22 @@ def _open_file_binary_write( docs for the set of allowed keys and values .. versionadded:: 1.2.0 - - Returns - ------- - file : file-like object - File object supporting write - own : bool - True if the file was created, otherwise False """ - if hasattr(fname, "write"): - # See https://github.com/python/mypy/issues/1424 for hasattr challenges - # error: Incompatible return value type (got "Tuple[Union[str, Path, - # IO[Any]], bool, None]", expected "Tuple[BinaryIO, bool, Union[str, - # Mapping[str, str], None]]") - return fname, False, None # type: ignore[return-value] - elif isinstance(fname, (str, Path)): - # Extract compression mode as given, if dict - ioargs = get_filepath_or_buffer( - fname, mode="wb", compression=compression, storage_options=storage_options - ) - f, _ = get_handle( - ioargs.filepath_or_buffer, - "wb", - compression=ioargs.compression, - is_text=False, - ) - return f, True, ioargs.compression - else: - raise TypeError("fname must be a binary file, buffer or path-like.") + ioargs = get_filepath_or_buffer( + fname, mode="wb", compression=compression, storage_options=storage_options + ) + handleArgs = get_handle( + ioargs.filepath_or_buffer, + "wb", + compression=ioargs.compression, + is_text=False, + ) + if ioargs.filepath_or_buffer != fname and not isinstance( + ioargs.filepath_or_buffer, str + ): + # add handle created by get_filepath_or_buffer + handleArgs.created_handles.append(ioargs.filepath_or_buffer) + return handleArgs, ioargs.compression def _set_endianness(endianness: str) -> str: @@ -2236,9 +2220,8 @@ def __init__( self._time_stamp = time_stamp self._data_label = data_label self._variable_labels = variable_labels - self._own_file = True self._compression = compression - self._output_file: Optional[BinaryIO] = None + self._output_file: Optional[Buffer] = None # attach nobs, nvars, data, varlist, typlist self._prepare_pandas(data) self.storage_options = storage_options @@ -2249,21 +2232,20 @@ def __init__( self._fname = stringify_path(fname) self.type_converters = {253: np.int32, 252: np.int16, 251: np.int8} self._converted_names: Dict[Label, str] = {} - self._file: Optional[BinaryIO] = None def _write(self, to_write: str) -> None: """ Helper to call encode before writing to file for Python 3 compat. """ - assert self._file is not None - self._file.write(to_write.encode(self._encoding)) + self.handleArgs.handle.write( + to_write.encode(self._encoding) # type: ignore[arg-type] + ) def _write_bytes(self, value: bytes) -> None: """ Helper to assert file is open before writing. """ - assert self._file is not None - self._file.write(value) + self.handleArgs.handle.write(value) # type: ignore[arg-type] def _prepare_categoricals(self, data: DataFrame) -> DataFrame: """ @@ -2527,12 +2509,14 @@ def _encode_strings(self) -> None: self.data[col] = encoded def write_file(self) -> None: - self._file, self._own_file, compression = _open_file_binary_write( + self.handleArgs, compression = _open_file_binary_write( self._fname, self._compression, storage_options=self.storage_options ) if compression is not None: - self._output_file = self._file - self._file = BytesIO() + # ZipFile create a file for each write call (with the same name). + # Write it first into a buffer and then write the buffer to the ZipFile. + self._output_file = self.handleArgs.handle + self.handleArgs.handle = BytesIO() try: self._write_header(data_label=self._data_label, time_stamp=self._time_stamp) self._write_map() @@ -2552,10 +2536,9 @@ def write_file(self) -> None: self._write_map() except Exception as exc: self._close() - if self._own_file: + if isinstance(self._fname, (str, Path)): try: - if isinstance(self._fname, (str, Path)): - os.unlink(self._fname) + os.unlink(self._fname) except OSError: warnings.warn( f"This save was not successful but {self._fname} could not " @@ -2571,24 +2554,19 @@ def _close(self) -> None: Close the file if it was created by the writer. If a buffer or file-like object was passed in, for example a GzipFile, - then leave this file open for the caller to close. In either case, - attempt to flush the file contents to ensure they are written to disk - (if supported) + then leave this file open for the caller to close. """ - # Some file-like objects might not support flush - assert self._file is not None + # write compression if self._output_file is not None: - assert isinstance(self._file, BytesIO) - bio = self._file + assert isinstance(self.handleArgs.handle, BytesIO) + bio = self.handleArgs.handle bio.seek(0) - self._file = self._output_file - self._file.write(bio.read()) - try: - self._file.flush() - except AttributeError: - pass - if self._own_file: - self._file.close() + self.handleArgs.handle = self._output_file + self.handleArgs.handle.write(bio.read()) # type: ignore[arg-type] + bio.close() + # close any created handles + for handle in self.handleArgs.created_handles: + handle.close() def _write_map(self) -> None: """No-op, future compatibility""" @@ -3140,8 +3118,8 @@ def _tag(val: Union[str, bytes], tag: str) -> bytes: def _update_map(self, tag: str) -> None: """Update map location for tag with file position""" - assert self._file is not None - self._map[tag] = self._file.tell() + assert self.handleArgs.handle is not None + self._map[tag] = self.handleArgs.handle.tell() def _write_header( self, @@ -3208,12 +3186,11 @@ def _write_map(self) -> None: the map with 0s. The second call writes the final map locations when all blocks have been written. """ - assert self._file is not None if not self._map: self._map = dict( ( ("stata_data", 0), - ("map", self._file.tell()), + ("map", self.handleArgs.handle.tell()), ("variable_types", 0), ("varnames", 0), ("sortlist", 0), @@ -3229,7 +3206,7 @@ def _write_map(self) -> None: ) ) # Move to start of map - self._file.seek(self._map["map"]) + self.handleArgs.handle.seek(self._map["map"]) bio = BytesIO() for val in self._map.values(): bio.write(struct.pack(self._byteorder + "Q", val)) diff --git a/pandas/tests/frame/methods/test_to_csv.py b/pandas/tests/frame/methods/test_to_csv.py index 5bf1ce508dfc4..7ef5751d961ee 100644 --- a/pandas/tests/frame/methods/test_to_csv.py +++ b/pandas/tests/frame/methods/test_to_csv.py @@ -1034,11 +1034,11 @@ def test_to_csv_compression(self, df, encoding, compression): tm.assert_frame_equal(df, result) # test the round trip using file handle - to_csv -> read_csv - f, _handles = get_handle( + handleArgs = get_handle( filename, "w", compression=compression, encoding=encoding ) - with f: - df.to_csv(f, encoding=encoding) + with handleArgs.handle: + df.to_csv(handleArgs.handle, encoding=encoding) result = pd.read_csv( filename, compression=compression, diff --git a/pandas/tests/io/json/test_readlines.py b/pandas/tests/io/json/test_readlines.py index 933bdc462e3f8..9c64128616910 100644 --- a/pandas/tests/io/json/test_readlines.py +++ b/pandas/tests/io/json/test_readlines.py @@ -143,7 +143,7 @@ def test_readjson_chunks_closes(chunksize): ) reader.read() assert ( - reader.open_stream.closed + reader.handleArgs.handle.closed ), f"didn't close stream with chunksize = {chunksize}" diff --git a/pandas/tests/io/test_compression.py b/pandas/tests/io/test_compression.py index 31e9ad4cf4416..873f6c5ccaa52 100644 --- a/pandas/tests/io/test_compression.py +++ b/pandas/tests/io/test_compression.py @@ -47,18 +47,18 @@ def test_compression_size(obj, method, compression_only): @pytest.mark.parametrize("method", ["to_csv", "to_json"]) def test_compression_size_fh(obj, method, compression_only): with tm.ensure_clean() as path: - f, handles = icom.get_handle(path, "w", compression=compression_only) - with f: - getattr(obj, method)(f) - assert not f.closed - assert f.closed + handleArgs = icom.get_handle(path, "w", compression=compression_only) + with handleArgs.handle: + getattr(obj, method)(handleArgs.handle) + assert not handleArgs.handle.closed + assert handleArgs.handle.closed compressed_size = os.path.getsize(path) with tm.ensure_clean() as path: - f, handles = icom.get_handle(path, "w", compression=None) - with f: - getattr(obj, method)(f) - assert not f.closed - assert f.closed + handleArgs = icom.get_handle(path, "w", compression=None) + with handleArgs.handle: + getattr(obj, method)(handleArgs.handle) + assert not handleArgs.handle.closed + assert handleArgs.handle.closed uncompressed_size = os.path.getsize(path) assert uncompressed_size > compressed_size @@ -111,10 +111,10 @@ def test_compression_warning(compression_only): columns=["X", "Y", "Z"], ) with tm.ensure_clean() as path: - f, handles = icom.get_handle(path, "w", compression=compression_only) + handleArgs = icom.get_handle(path, "w", compression=compression_only) with tm.assert_produces_warning(RuntimeWarning, check_stacklevel=False): - with f: - df.to_csv(f, compression=compression_only) + with handleArgs.handle: + df.to_csv(handleArgs.handle, compression=compression_only) def test_compression_binary(compression_only): diff --git a/pandas/tests/series/methods/test_to_csv.py b/pandas/tests/series/methods/test_to_csv.py index a72e860340f25..d1acf16b43416 100644 --- a/pandas/tests/series/methods/test_to_csv.py +++ b/pandas/tests/series/methods/test_to_csv.py @@ -143,11 +143,11 @@ def test_to_csv_compression(self, s, encoding, compression): tm.assert_series_equal(s, result) # test the round trip using file handle - to_csv -> read_csv - f, _handles = get_handle( + handleArgs = get_handle( filename, "w", compression=compression, encoding=encoding ) - with f: - s.to_csv(f, encoding=encoding, header=True) + with handleArgs.handle: + s.to_csv(handleArgs.handle, encoding=encoding, header=True) result = pd.read_csv( filename, compression=compression, From 443a91e0d17aed2bb5dd0e9c0be170cfe6e70a12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Mon, 26 Oct 2020 23:18:24 -0400 Subject: [PATCH 3/9] remove unused imports --- pandas/_libs/parsers.pyx | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/pandas/_libs/parsers.pyx b/pandas/_libs/parsers.pyx index ce9b6e5d623eb..6991e94e8b1fa 100644 --- a/pandas/_libs/parsers.pyx +++ b/pandas/_libs/parsers.pyx @@ -1,15 +1,10 @@ # Copyright (c) 2012, Lambda Foundry, Inc. # See LICENSE for the license -import bz2 from csv import QUOTE_MINIMAL, QUOTE_NONE, QUOTE_NONNUMERIC from errno import ENOENT -import gzip -import io -import os import sys import time import warnings -import zipfile from libc.stdlib cimport free from libc.string cimport strcasecmp, strlen, strncpy @@ -17,7 +12,7 @@ from libc.string cimport strcasecmp, strlen, strncpy import cython from cython import Py_ssize_t -from cpython.bytes cimport PyBytes_AsString, PyBytes_FromString +from cpython.bytes cimport PyBytes_AsString from cpython.exc cimport PyErr_Fetch, PyErr_Occurred from cpython.object cimport PyObject from cpython.ref cimport Py_XDECREF @@ -67,7 +62,6 @@ from pandas._libs.khash cimport ( khiter_t, ) -from pandas.compat import get_lzma_file, import_lzma from pandas.errors import DtypeWarning, EmptyDataError, ParserError, ParserWarning from pandas.core.dtypes.common import ( @@ -82,8 +76,6 @@ from pandas.core.dtypes.common import ( ) from pandas.core.dtypes.concat import union_categoricals -lzma = import_lzma() - cdef: float64_t INF = np.inf float64_t NEGINF = -INF From 6a1051350c5aee30bec214a1f16a9fcfccae37c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Sat, 31 Oct 2020 17:39:33 -0400 Subject: [PATCH 4/9] added IOHandleArgs.close --- pandas/_typing.py | 39 +++++++++++++++----- pandas/core/frame.py | 3 +- pandas/io/common.py | 30 ++++++---------- pandas/io/formats/csvs.py | 21 +++-------- pandas/io/json/_json.py | 32 ++++++----------- pandas/io/parsers.py | 24 +++++-------- pandas/io/pickle.py | 20 +++++------ pandas/io/stata.py | 41 +++++++++++----------- pandas/tests/frame/methods/test_to_csv.py | 6 ++-- pandas/tests/io/json/test_readlines.py | 2 +- pandas/tests/io/test_compression.py | 26 +++++++------- pandas/tests/series/methods/test_to_csv.py | 6 ++-- 12 files changed, 117 insertions(+), 133 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index 5f67cc16c3daa..a47308a057b70 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -1,6 +1,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta, tzinfo from io import BufferedIOBase, RawIOBase, TextIOBase, TextIOWrapper +from mmap import mmap from pathlib import Path from typing import ( IO, @@ -77,13 +78,6 @@ "ExtensionDtype", str, np.dtype, Type[Union[str, float, int, complex, bool, object]] ] DtypeObj = Union[np.dtype, "ExtensionDtype"] -FilePathOrBuffer = Union[ - str, Path, IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper -] -FileOrBuffer = Union[ - str, IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper -] -Buffer = Union[IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper] # FrameOrSeriesUnion means either a DataFrame or a Series. E.g. # `def func(a: FrameOrSeriesUnion) -> FrameOrSeriesUnion: ...` means that if a Series @@ -138,6 +132,10 @@ "Resampler", ] +# filenames and file-like-objects +Buffer = Union[IO[AnyStr], RawIOBase, BufferedIOBase, TextIOBase, TextIOWrapper, mmap] +FileOrBuffer = Union[str, Buffer[T]] +FilePathOrBuffer = Union[Path, FileOrBuffer[T]] # for arbitrary kwargs passed during reading/writing files StorageOptions = Optional[Dict[str, Any]] @@ -176,9 +174,32 @@ class IOArgs(Generic[ModeVar, EncodingVar]): @dataclass -class HandleArgs: - """Return value of io/common.py:get_handle""" +class IOHandleArgs: + """ + Return value of io/common.py:get_handle + + handle: The file handle to be used. + created_handles: All file handles that are created by get_handle + is_wrapped: Whether a TextIOWrapper needs to be detached. + """ handle: Buffer created_handles: List[Buffer] is_wrapped: bool + + def close(self) -> None: + """ + Close all created buffers. + + Note: If a TextIOWrapper was inserted, it is flushed and detached to + avoid closing the potentially user-created buffer. + """ + if self.is_wrapped: + assert isinstance(self.handle, TextIOWrapper) + self.handle.flush() + self.handle.detach() + self.created_handles.remove(self.handle) + for handle in self.created_handles: + handle.close() + self.created_handles = [] + self.is_wrapped = False diff --git a/pandas/core/frame.py b/pandas/core/frame.py index 24b89085ac121..e5224aa221d48 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -15,6 +15,7 @@ import datetime from io import StringIO import itertools +import mmap from textwrap import dedent from typing import ( IO, @@ -2286,7 +2287,7 @@ def to_markdown( if buf is None: return result ioargs = get_filepath_or_buffer(buf, mode=mode, storage_options=storage_options) - assert not isinstance(ioargs.filepath_or_buffer, str) + assert not isinstance(ioargs.filepath_or_buffer, (str, mmap.mmap)) ioargs.filepath_or_buffer.writelines(result) if ioargs.should_close: ioargs.filepath_or_buffer.close() diff --git a/pandas/io/common.py b/pandas/io/common.py index 70dc83bf62856..5c3d646b3f37f 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -18,6 +18,7 @@ Optional, Tuple, Type, + cast, ) from urllib.parse import ( urljoin, @@ -36,8 +37,8 @@ EncodingVar, FileOrBuffer, FilePathOrBuffer, - HandleArgs, IOArgs, + IOHandleArgs, ModeVar, StorageOptions, ) @@ -467,7 +468,7 @@ def get_handle( memory_map: bool = False, is_text: bool = True, errors: Optional[str] = None, -) -> HandleArgs: +) -> IOHandleArgs: """ Get file handle for given path/buffer and mode. @@ -511,19 +512,9 @@ def get_handle( See the errors argument for :func:`open` for a full list of options. - .. versionadded:: 1.1.0 + .. versionchanged:: 1.2.0 - Returns - ------- - handleArgs : - handle: The file handle to be used. - created_handles: All file handles that are created by get_handle - (iterate over it to close buffers). - is_wrapped: Whether TextIOWrapper is added (cannot close it as it closes - wrapped buffers: flush&detach it and close all remaining buffers - in created_handles). - - .. versionadded:: 1.2.0 + Returns the dataclass IOHandleArgs """ need_text_wrapping: Tuple[Type["IOBase"], ...] try: @@ -629,16 +620,17 @@ def get_handle( f, encoding=encoding, errors=errors, newline="" # type: ignore[arg-type] ) handles.append(f) - is_wrapped = True + # do not mark as wrapped when the user provided a string + is_wrapped = not is_path if memory_map and hasattr(f, "fileno"): assert not isinstance(f, str) try: - wrapped = _MMapWrapper(f) # type: ignore[arg-type] + wrapped = cast(mmap.mmap, _MMapWrapper(f)) # type: ignore[arg-type] f.close() handles.remove(f) - handles.append(wrapped) # type: ignore[arg-type] - f = wrapped # type: ignore[assignment] + handles.append(wrapped) + f = wrapped except Exception: # we catch any errors that may have occurred # because that is consistent with the lower-level @@ -648,7 +640,7 @@ def get_handle( handles.reverse() # close the most recently added buffer first assert not isinstance(f, str) - return HandleArgs( + return IOHandleArgs( handle=f, created_handles=handles, is_wrapped=is_wrapped, diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index 0a47e1dee40f8..bdd21411c711f 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -3,7 +3,7 @@ """ import csv as csvlib -from io import StringIO, TextIOWrapper +from io import StringIO import os from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Union @@ -240,7 +240,7 @@ def save(self) -> None: """ # get a handle or wrap an existing handle to take care of 1) compression and # 2) text -> byte conversion - handleArgs = get_handle( + handle_args = get_handle( self.path_or_buf, self.mode, encoding=self.encoding, @@ -251,7 +251,7 @@ def save(self) -> None: try: # Note: self.encoding is irrelevant here self.writer = csvlib.writer( - handleArgs.handle, # type: ignore[arg-type] + handle_args.handle, # type: ignore[arg-type] lineterminator=self.line_terminator, delimiter=self.sep, quoting=self.quoting, @@ -263,19 +263,8 @@ def save(self) -> None: self._save() finally: - if handleArgs.is_wrapped: - # get_handle uses TextIOWrapper for non-binary handles. TextIOWrapper - # closes the wrapped handle if it is not detached. - assert isinstance(handleArgs.handle, TextIOWrapper) - handleArgs.handle.flush() - handleArgs.handle.detach() - handleArgs.created_handles.remove(handleArgs.handle) - for handle in handleArgs.created_handles: - handle.close() - if ( - self.should_close - and not self.path_or_buf.closed # type: ignore[union-attr] - ): + handle_args.close() + if self.should_close: assert not isinstance(self.path_or_buf, str) self.path_or_buf.close() diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index e506357e16d98..085250d94ce7c 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from collections import abc import functools -from io import StringIO, TextIOWrapper +from io import StringIO from itertools import islice import os from typing import Any, Callable, Mapping, Optional, Tuple, Type, Union @@ -12,8 +12,8 @@ from pandas._libs.tslibs import iNaT from pandas._typing import ( CompressionOptions, - HandleArgs, IndexLabel, + IOHandleArgs, JSONSerializable, StorageOptions, ) @@ -103,17 +103,11 @@ def to_json( s = convert_to_line_delimits(s) if path_or_buf is not None: - handleArgs = get_handle(path_or_buf, "w", compression=compression) + handle_args = get_handle(path_or_buf, "w", compression=compression) try: - handleArgs.handle.write(s) + handle_args.handle.write(s) finally: - if handleArgs.is_wrapped: - assert isinstance(handleArgs.handle, TextIOWrapper) - handleArgs.handle.flush() - handleArgs.handle.detach() - handleArgs.created_handles.remove(handleArgs.handle) - for handle in handleArgs.created_handles: - handle.close() + handle_args.close() if should_close: path_or_buf.close() else: @@ -633,7 +627,7 @@ def __init__( self.chunksize = chunksize self.nrows_seen = 0 self.nrows = nrows - self.handleArgs: Optional[HandleArgs] = None + self.handle_args: Optional[IOHandleArgs] = None if self.chunksize is not None: self.chunksize = validate_integer("chunksize", self.chunksize, 1) @@ -682,13 +676,13 @@ def _get_data_from_filepath(self, filepath_or_buffer): pass if exists or not isinstance(filepath_or_buffer, str): - self.handleArgs = get_handle( + self.handle_args = get_handle( filepath_or_buffer, "r", encoding=self.encoding, compression=self.compression, ) - filepath_or_buffer = self.handleArgs.handle + filepath_or_buffer = self.handle_args.handle return filepath_or_buffer @@ -754,14 +748,8 @@ def close(self): If an open stream or file was passed, we leave it open. """ - if self.handleArgs is None: - return - if self.handleArgs.is_wrapped: - assert isinstance(self.handleArgs.handle, TextIOWrapper) - self.handleArgs.handle.detach() - self.handleArgs.created_handles.remove(self.handleArgs.handle) - for handle in self.handleArgs.created_handles: - handle.close() + if self.handle_args is not None: + self.handle_args.close() def __next__(self): if self.nrows: diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index b01c6369ee0a9..0660d52133c8a 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -20,7 +20,7 @@ import pandas._libs.parsers as parsers from pandas._libs.parsers import STR_NA_VALUES from pandas._libs.tslibs import parsing -from pandas._typing import FilePathOrBuffer, HandleArgs, StorageOptions, Union +from pandas._typing import FilePathOrBuffer, IOHandleArgs, StorageOptions, Union from pandas.errors import ( AbstractMethodError, EmptyDataError, @@ -1404,13 +1404,7 @@ def _validate_parse_dates_presence(self, columns: List[str]) -> None: ) def close(self): - if self.handleArgs.is_wrapped: - # user-provided file handle that is wrap inside TextIOWrapper - # closing TextIOWrapper would close the file handle as well - self.handleArgs.handle.detach() - self.handleArgs.created_handles.remove(self.handleArgs.handle) - for handle in self.handleArgs.created_handles: - handle.close() + self.handle_args.close() @property def _has_complex_date_col(self): @@ -1861,13 +1855,13 @@ def __init__(self, src, **kwds): + "Please use memory_map=False instead." ) - self.handleArgs = HandleArgs( + self.handle_args = IOHandleArgs( handle=src, created_handles=[], is_wrapped=False, ) else: - self.handleArgs = get_handle( + self.handle_args = get_handle( src, mode="r", encoding=kwds.get("encoding", None), @@ -1885,7 +1879,7 @@ def __init__(self, src, **kwds): self.usecols, self.usecols_dtype = _validate_usecols_arg(kwds["usecols"]) kwds["usecols"] = self.usecols - self._reader = parsers.TextReader(self.handleArgs.handle, **kwds) + self._reader = parsers.TextReader(self.handle_args.handle, **kwds) self.unnamed_cols = self._reader.unnamed_cols passed_names = self.names is None @@ -2258,7 +2252,7 @@ def __init__(self, f, **kwds): self.comment = kwds["comment"] self._comment_lines = [] - self.handleArgs = get_handle( + self.handle_args = get_handle( f, "r", encoding=self.encoding, @@ -2267,10 +2261,10 @@ def __init__(self, f, **kwds): ) # Set self.data to something that can read lines. - if hasattr(self.handleArgs.handle, "readline"): - self._make_reader(self.handleArgs.handle) + if hasattr(self.handle_args.handle, "readline"): + self._make_reader(self.handle_args.handle) else: - self.data = self.handleArgs.handle + self.data = self.handle_args.handle # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index 6777a82f219a4..37d175591f014 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -92,16 +92,17 @@ def to_pickle( mode="wb", storage_options=storage_options, ) - handleArgs = get_handle( + handle_args = get_handle( ioargs.filepath_or_buffer, "wb", compression=ioargs.compression, is_text=False ) if protocol < 0: protocol = pickle.HIGHEST_PROTOCOL try: - pickle.dump(obj, handleArgs.handle, protocol=protocol) # type: ignore[arg-type] + pickle.dump( + obj, handle_args.handle, protocol=protocol # type: ignore[arg-type] + ) finally: - for handle in handleArgs.created_handles: - handle.close() + handle_args.close() if ioargs.should_close: assert not isinstance(ioargs.filepath_or_buffer, str) try: @@ -190,7 +191,7 @@ def read_pickle( ioargs = get_filepath_or_buffer( filepath_or_buffer, compression=compression, storage_options=storage_options ) - handleArgs = get_handle( + handle_args = get_handle( ioargs.filepath_or_buffer, "rb", compression=ioargs.compression, is_text=False ) @@ -205,18 +206,17 @@ def read_pickle( with warnings.catch_warnings(record=True): # We want to silence any warnings about, e.g. moved modules. warnings.simplefilter("ignore", Warning) - return pickle.load(handleArgs.handle) # type: ignore[arg-type] + return pickle.load(handle_args.handle) # type: ignore[arg-type] except excs_to_catch: # e.g. # "No module named 'pandas.core.sparse.series'" # "Can't get attribute '__nat_unpickle' on Tuple[HandleArgs, CompressionOptions]: +) -> Tuple[IOHandleArgs, CompressionOptions]: """ Open a binary file or no-op if file-like. @@ -1958,7 +1958,7 @@ def _open_file_binary_write( ioargs = get_filepath_or_buffer( fname, mode="wb", compression=compression, storage_options=storage_options ) - handleArgs = get_handle( + handle_args = get_handle( ioargs.filepath_or_buffer, "wb", compression=ioargs.compression, @@ -1968,8 +1968,8 @@ def _open_file_binary_write( ioargs.filepath_or_buffer, str ): # add handle created by get_filepath_or_buffer - handleArgs.created_handles.append(ioargs.filepath_or_buffer) - return handleArgs, ioargs.compression + handle_args.created_handles.append(ioargs.filepath_or_buffer) + return handle_args, ioargs.compression def _set_endianness(endianness: str) -> str: @@ -2237,7 +2237,7 @@ def _write(self, to_write: str) -> None: """ Helper to call encode before writing to file for Python 3 compat. """ - self.handleArgs.handle.write( + self.handle_args.handle.write( to_write.encode(self._encoding) # type: ignore[arg-type] ) @@ -2245,7 +2245,7 @@ def _write_bytes(self, value: bytes) -> None: """ Helper to assert file is open before writing. """ - self.handleArgs.handle.write(value) # type: ignore[arg-type] + self.handle_args.handle.write(value) # type: ignore[arg-type] def _prepare_categoricals(self, data: DataFrame) -> DataFrame: """ @@ -2509,14 +2509,14 @@ def _encode_strings(self) -> None: self.data[col] = encoded def write_file(self) -> None: - self.handleArgs, compression = _open_file_binary_write( + self.handle_args, compression = _open_file_binary_write( self._fname, self._compression, storage_options=self.storage_options ) if compression is not None: - # ZipFile create a file for each write call (with the same name). + # ZipFile creates a file (with the same name) for each write call. # Write it first into a buffer and then write the buffer to the ZipFile. - self._output_file = self.handleArgs.handle - self.handleArgs.handle = BytesIO() + self._output_file = self.handle_args.handle + self.handle_args.handle = BytesIO() try: self._write_header(data_label=self._data_label, time_stamp=self._time_stamp) self._write_map() @@ -2558,15 +2558,14 @@ def _close(self) -> None: """ # write compression if self._output_file is not None: - assert isinstance(self.handleArgs.handle, BytesIO) - bio = self.handleArgs.handle + assert isinstance(self.handle_args.handle, BytesIO) + bio = self.handle_args.handle bio.seek(0) - self.handleArgs.handle = self._output_file - self.handleArgs.handle.write(bio.read()) # type: ignore[arg-type] + self.handle_args.handle = self._output_file + self.handle_args.handle.write(bio.read()) # type: ignore[arg-type] bio.close() # close any created handles - for handle in self.handleArgs.created_handles: - handle.close() + self.handle_args.close() def _write_map(self) -> None: """No-op, future compatibility""" @@ -3118,8 +3117,8 @@ def _tag(val: Union[str, bytes], tag: str) -> bytes: def _update_map(self, tag: str) -> None: """Update map location for tag with file position""" - assert self.handleArgs.handle is not None - self._map[tag] = self.handleArgs.handle.tell() + assert self.handle_args.handle is not None + self._map[tag] = self.handle_args.handle.tell() def _write_header( self, @@ -3190,7 +3189,7 @@ def _write_map(self) -> None: self._map = dict( ( ("stata_data", 0), - ("map", self.handleArgs.handle.tell()), + ("map", self.handle_args.handle.tell()), ("variable_types", 0), ("varnames", 0), ("sortlist", 0), @@ -3206,7 +3205,7 @@ def _write_map(self) -> None: ) ) # Move to start of map - self.handleArgs.handle.seek(self._map["map"]) + self.handle_args.handle.seek(self._map["map"]) bio = BytesIO() for val in self._map.values(): bio.write(struct.pack(self._byteorder + "Q", val)) diff --git a/pandas/tests/frame/methods/test_to_csv.py b/pandas/tests/frame/methods/test_to_csv.py index 7ef5751d961ee..4270768578ee9 100644 --- a/pandas/tests/frame/methods/test_to_csv.py +++ b/pandas/tests/frame/methods/test_to_csv.py @@ -1034,11 +1034,11 @@ def test_to_csv_compression(self, df, encoding, compression): tm.assert_frame_equal(df, result) # test the round trip using file handle - to_csv -> read_csv - handleArgs = get_handle( + handle_args = get_handle( filename, "w", compression=compression, encoding=encoding ) - with handleArgs.handle: - df.to_csv(handleArgs.handle, encoding=encoding) + df.to_csv(handle_args.handle, encoding=encoding) + handle_args.close() result = pd.read_csv( filename, compression=compression, diff --git a/pandas/tests/io/json/test_readlines.py b/pandas/tests/io/json/test_readlines.py index 9c64128616910..53e8de097d277 100644 --- a/pandas/tests/io/json/test_readlines.py +++ b/pandas/tests/io/json/test_readlines.py @@ -143,7 +143,7 @@ def test_readjson_chunks_closes(chunksize): ) reader.read() assert ( - reader.handleArgs.handle.closed + reader.handle_args.handle.closed ), f"didn't close stream with chunksize = {chunksize}" diff --git a/pandas/tests/io/test_compression.py b/pandas/tests/io/test_compression.py index 873f6c5ccaa52..4d75d4bb01348 100644 --- a/pandas/tests/io/test_compression.py +++ b/pandas/tests/io/test_compression.py @@ -47,18 +47,18 @@ def test_compression_size(obj, method, compression_only): @pytest.mark.parametrize("method", ["to_csv", "to_json"]) def test_compression_size_fh(obj, method, compression_only): with tm.ensure_clean() as path: - handleArgs = icom.get_handle(path, "w", compression=compression_only) - with handleArgs.handle: - getattr(obj, method)(handleArgs.handle) - assert not handleArgs.handle.closed - assert handleArgs.handle.closed + handle_args = icom.get_handle(path, "w", compression=compression_only) + getattr(obj, method)(handle_args.handle) + assert not handle_args.handle.closed + handle_args.close() + assert handle_args.handle.closed compressed_size = os.path.getsize(path) with tm.ensure_clean() as path: - handleArgs = icom.get_handle(path, "w", compression=None) - with handleArgs.handle: - getattr(obj, method)(handleArgs.handle) - assert not handleArgs.handle.closed - assert handleArgs.handle.closed + handle_args = icom.get_handle(path, "w", compression=None) + getattr(obj, method)(handle_args.handle) + assert not handle_args.handle.closed + handle_args.close() + assert handle_args.handle.closed uncompressed_size = os.path.getsize(path) assert uncompressed_size > compressed_size @@ -111,10 +111,10 @@ def test_compression_warning(compression_only): columns=["X", "Y", "Z"], ) with tm.ensure_clean() as path: - handleArgs = icom.get_handle(path, "w", compression=compression_only) + handle_args = icom.get_handle(path, "w", compression=compression_only) with tm.assert_produces_warning(RuntimeWarning, check_stacklevel=False): - with handleArgs.handle: - df.to_csv(handleArgs.handle, compression=compression_only) + df.to_csv(handle_args.handle, compression=compression_only) + handle_args.close() def test_compression_binary(compression_only): diff --git a/pandas/tests/series/methods/test_to_csv.py b/pandas/tests/series/methods/test_to_csv.py index d1acf16b43416..2e2266d88f167 100644 --- a/pandas/tests/series/methods/test_to_csv.py +++ b/pandas/tests/series/methods/test_to_csv.py @@ -143,11 +143,11 @@ def test_to_csv_compression(self, s, encoding, compression): tm.assert_series_equal(s, result) # test the round trip using file handle - to_csv -> read_csv - handleArgs = get_handle( + handle_args = get_handle( filename, "w", compression=compression, encoding=encoding ) - with handleArgs.handle: - s.to_csv(handleArgs.handle, encoding=encoding, header=True) + s.to_csv(handle_args.handle, encoding=encoding, header=True) + handle_args.close() result = pd.read_csv( filename, compression=compression, From 60fc0a872563ac005cef8535682b7d0db06bf382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Sun, 1 Nov 2020 13:35:32 -0500 Subject: [PATCH 5/9] added IOArgs.close --- pandas/_typing.py | 9 +++++++++ pandas/io/excel/_base.py | 12 +++++++++--- pandas/io/feather_format.py | 9 ++------- pandas/io/formats/csvs.py | 30 ++++++++---------------------- pandas/io/formats/format.py | 13 ++++++++++--- pandas/io/json/_json.py | 32 ++++++++++++-------------------- pandas/io/orc.py | 1 + pandas/io/parsers.py | 8 +------- pandas/io/pickle.py | 14 ++------------ pandas/io/sas/sas7bdat.py | 21 +++++++-------------- pandas/io/sas/sas_xport.py | 19 ++++++++----------- pandas/io/sas/sasreader.py | 3 +-- pandas/io/stata.py | 28 +++++++++++++--------------- 13 files changed, 83 insertions(+), 116 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index a47308a057b70..571ec3820a002 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -172,6 +172,15 @@ class IOArgs(Generic[ModeVar, EncodingVar]): should_close: bool mode: Union[ModeVar, str] + def close(self) -> None: + """ + Close the buffer if it was created by get_filepath_or_buffer. + """ + if self.should_close: + assert not isinstance(self.filepath_or_buffer, str) + self.filepath_or_buffer.close() + self.should_close = False + @dataclass class IOHandleArgs: diff --git a/pandas/io/excel/_base.py b/pandas/io/excel/_base.py index 3461652f4ea24..3fcde0d366874 100644 --- a/pandas/io/excel/_base.py +++ b/pandas/io/excel/_base.py @@ -350,12 +350,17 @@ def read_excel( class BaseExcelReader(metaclass=abc.ABCMeta): def __init__(self, filepath_or_buffer, storage_options: StorageOptions = None): # If filepath_or_buffer is a url, load the data into a BytesIO + self.handles = [] if is_url(filepath_or_buffer): filepath_or_buffer = BytesIO(urlopen(filepath_or_buffer).read()) + self.handles.append(filepath_or_buffer) elif not isinstance(filepath_or_buffer, (ExcelFile, self._workbook_class)): - filepath_or_buffer = get_filepath_or_buffer( + ioargs = get_filepath_or_buffer( filepath_or_buffer, storage_options=storage_options - ).filepath_or_buffer + ) + filepath_or_buffer = ioargs.filepath_or_buffer + if ioargs.should_close: + self.handles.append(filepath_or_buffer) if isinstance(filepath_or_buffer, self._workbook_class): self.book = filepath_or_buffer @@ -382,7 +387,8 @@ def load_workbook(self, filepath_or_buffer): pass def close(self): - pass + for handle in self.handles: + handle.close() @property @abc.abstractmethod diff --git a/pandas/io/feather_format.py b/pandas/io/feather_format.py index 9a42b8289ab47..198acd5862d45 100644 --- a/pandas/io/feather_format.py +++ b/pandas/io/feather_format.py @@ -81,9 +81,7 @@ def to_feather( feather.write_feather(df, ioargs.filepath_or_buffer, **kwargs) - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - ioargs.filepath_or_buffer.close() + ioargs.close() def read_feather( @@ -137,9 +135,6 @@ def read_feather( ioargs.filepath_or_buffer, columns=columns, use_threads=bool(use_threads) ) - # s3fs only validates the credentials when the file is closed. - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - ioargs.filepath_or_buffer.close() + ioargs.close() return df diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index bdd21411c711f..3b559061f8b69 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -3,7 +3,6 @@ """ import csv as csvlib -from io import StringIO import os from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Sequence, Union @@ -39,7 +38,7 @@ class CSVFormatter: def __init__( self, formatter: "DataFrameFormatter", - path_or_buf: Optional[FilePathOrBuffer[str]] = None, + path_or_buf: FilePathOrBuffer[str] = "", sep: str = ",", cols: Optional[Sequence[Label]] = None, index_label: Optional[IndexLabel] = None, @@ -60,25 +59,14 @@ def __init__( self.obj = self.fmt.frame - self.encoding = encoding or "utf-8" - - if path_or_buf is None: - path_or_buf = StringIO() - - ioargs = get_filepath_or_buffer( + self.ioargs = get_filepath_or_buffer( path_or_buf, - encoding=self.encoding, + encoding=encoding, compression=compression, mode=mode, storage_options=storage_options, ) - self.compression = ioargs.compression.pop("method") - self.compression_args = ioargs.compression - self.path_or_buf = ioargs.filepath_or_buffer - self.should_close = ioargs.should_close - self.mode = ioargs.mode - self.sep = sep self.index_label = self._initialize_index_label(index_label) self.errors = errors @@ -241,11 +229,11 @@ def save(self) -> None: # get a handle or wrap an existing handle to take care of 1) compression and # 2) text -> byte conversion handle_args = get_handle( - self.path_or_buf, - self.mode, - encoding=self.encoding, + self.ioargs.filepath_or_buffer, + self.ioargs.mode, + encoding=self.ioargs.encoding, errors=self.errors, - compression=dict(self.compression_args, method=self.compression), + compression=self.ioargs.compression, ) try: @@ -264,9 +252,7 @@ def save(self) -> None: finally: handle_args.close() - if self.should_close: - assert not isinstance(self.path_or_buf, str) - self.path_or_buf.close() + self.ioargs.close() def _save(self) -> None: if self._need_to_save_header: diff --git a/pandas/io/formats/format.py b/pandas/io/formats/format.py index 3c759f477899b..85da90bc901c2 100644 --- a/pandas/io/formats/format.py +++ b/pandas/io/formats/format.py @@ -1046,6 +1046,11 @@ def to_csv( """ from pandas.io.formats.csvs import CSVFormatter + created_buffer = False + if path_or_buf is None: + path_or_buf = StringIO() + created_buffer = True + csv_formatter = CSVFormatter( path_or_buf=path_or_buf, line_terminator=line_terminator, @@ -1067,9 +1072,11 @@ def to_csv( ) csv_formatter.save() - if path_or_buf is None: - assert isinstance(csv_formatter.path_or_buf, StringIO) - return csv_formatter.path_or_buf.getvalue() + if created_buffer: + assert isinstance(path_or_buf, StringIO) + content = path_or_buf.getvalue() + path_or_buf.close() + return content return None diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 085250d94ce7c..84c67baf782c3 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -60,17 +60,6 @@ def to_json( "'index=False' is only valid when 'orient' is 'split' or 'table'" ) - if path_or_buf is not None: - ioargs = get_filepath_or_buffer( - path_or_buf, - compression=compression, - mode="wt", - storage_options=storage_options, - ) - path_or_buf = ioargs.filepath_or_buffer - should_close = ioargs.should_close - compression = ioargs.compression - if lines and orient != "records": raise ValueError("'lines' keyword only valid when 'orient' is records") @@ -103,13 +92,20 @@ def to_json( s = convert_to_line_delimits(s) if path_or_buf is not None: - handle_args = get_handle(path_or_buf, "w", compression=compression) + ioargs = get_filepath_or_buffer( + path_or_buf, + compression=compression, + mode="wt", + storage_options=storage_options, + ) + handle_args = get_handle( + ioargs.filepath_or_buffer, "w", compression=ioargs.compression + ) try: handle_args.handle.write(s) finally: handle_args.close() - if should_close: - path_or_buf.close() + ioargs.close() else: return s @@ -542,12 +538,10 @@ def read_json( dtype = True if convert_axes is None and orient != "table": convert_axes = True - if encoding is None: - encoding = "utf-8" ioargs = get_filepath_or_buffer( path_or_buf, - encoding=encoding, + encoding=encoding or "utf-8", compression=compression, storage_options=storage_options, ) @@ -574,9 +568,7 @@ def read_json( return json_reader result = json_reader.read() - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - ioargs.filepath_or_buffer.close() + ioargs.close() return result diff --git a/pandas/io/orc.py b/pandas/io/orc.py index 829ff6408d86d..5a734f0878a0c 100644 --- a/pandas/io/orc.py +++ b/pandas/io/orc.py @@ -53,4 +53,5 @@ def read_orc( ioargs = get_filepath_or_buffer(path) orc_file = pyarrow.orc.ORCFile(ioargs.filepath_or_buffer) result = orc_file.read(columns=columns, **kwargs).to_pandas() + ioargs.close() return result diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 0660d52133c8a..19724772dda05 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -467,13 +467,7 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds): data = parser.read(nrows) finally: parser.close() - - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - try: - ioargs.filepath_or_buffer.close() - except ValueError: - pass + ioargs.close() return data diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index 37d175591f014..a18611277ba23 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -103,12 +103,7 @@ def to_pickle( ) finally: handle_args.close() - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - try: - ioargs.filepath_or_buffer.close() - except ValueError: - pass + ioargs.close() def read_pickle( @@ -217,9 +212,4 @@ def read_pickle( return pc.load(handle_args.handle, encoding="latin-1") finally: handle_args.close() - if ioargs.should_close: - assert not isinstance(ioargs.filepath_or_buffer, str) - try: - ioargs.filepath_or_buffer.close() - except ValueError: - pass + ioargs.close() diff --git a/pandas/io/sas/sas7bdat.py b/pandas/io/sas/sas7bdat.py index 989036917b265..e9b74199cbc42 100644 --- a/pandas/io/sas/sas7bdat.py +++ b/pandas/io/sas/sas7bdat.py @@ -16,7 +16,7 @@ from collections import abc from datetime import datetime, timedelta import struct -from typing import IO, Any, Union +from typing import IO, Any, Union, cast import numpy as np @@ -131,8 +131,6 @@ class SAS7BDATReader(ReaderBase, abc.Iterator): bytes. """ - _path_or_buf: IO[Any] - def __init__( self, path_or_buf, @@ -170,14 +168,12 @@ def __init__( self._current_row_on_page_index = 0 self._current_row_in_file_index = 0 - path_or_buf = get_filepath_or_buffer(path_or_buf).filepath_or_buffer - if isinstance(path_or_buf, str): - buf = open(path_or_buf, "rb") - self.handle = buf - else: - buf = path_or_buf + self.ioargs = get_filepath_or_buffer(path_or_buf) + if isinstance(self.ioargs.filepath_or_buffer, str): + self.ioargs.filepath_or_buffer = open(path_or_buf, "rb") + self.ioargs.should_close = True - self._path_or_buf: IO[Any] = buf + self._path_or_buf = cast(IO[Any], self.ioargs.filepath_or_buffer) try: self._get_properties() @@ -202,10 +198,7 @@ def column_types(self): return np.asarray(self._column_types, dtype=np.dtype("S1")) def close(self): - try: - self.handle.close() - except AttributeError: - pass + self.ioargs.close() def _get_properties(self): diff --git a/pandas/io/sas/sas_xport.py b/pandas/io/sas/sas_xport.py index 2a48abe9fbd63..4303cef2df60d 100644 --- a/pandas/io/sas/sas_xport.py +++ b/pandas/io/sas/sas_xport.py @@ -10,6 +10,7 @@ from collections import abc from datetime import datetime import struct +from typing import IO, cast import warnings import numpy as np @@ -252,17 +253,13 @@ def __init__( self._index = index self._chunksize = chunksize - if isinstance(filepath_or_buffer, str): - filepath_or_buffer = get_filepath_or_buffer( - filepath_or_buffer, encoding=encoding - ).filepath_or_buffer + self.ioargs = get_filepath_or_buffer(filepath_or_buffer, encoding=encoding) - if isinstance(filepath_or_buffer, (str, bytes)): - self.filepath_or_buffer = open(filepath_or_buffer, "rb") - else: - # Since xport files include non-text byte sequences, xport files - # should already be opened in binary mode in Python 3. - self.filepath_or_buffer = filepath_or_buffer + if isinstance(self.ioargs.filepath_or_buffer, str): + self.ioargs.filepath_or_buffer = open(self.ioargs.filepath_or_buffer, "rb") + self.ioargs.should_close = True + + self.filepath_or_buffer = cast(IO[bytes], self.ioargs.filepath_or_buffer) try: self._read_header() @@ -271,7 +268,7 @@ def __init__( raise def close(self): - self.filepath_or_buffer.close() + self.ioargs.close() def _get_row(self): return self.filepath_or_buffer.read(80).decode() diff --git a/pandas/io/sas/sasreader.py b/pandas/io/sas/sasreader.py index caf53b5be971a..446e2daaa1f9c 100644 --- a/pandas/io/sas/sasreader.py +++ b/pandas/io/sas/sasreader.py @@ -139,5 +139,4 @@ def read_sas( try: return reader.read() finally: - if ioargs.should_close: - reader.close() + ioargs.close() diff --git a/pandas/io/stata.py b/pandas/io/stata.py index 01f0a1462aaca..ef31dfec559ea 100644 --- a/pandas/io/stata.py +++ b/pandas/io/stata.py @@ -1058,19 +1058,20 @@ def __init__( self._lines_read = 0 self._native_byteorder = _set_endianness(sys.byteorder) - path_or_buf = stringify_path(path_or_buf) - if isinstance(path_or_buf, str): - path_or_buf = get_filepath_or_buffer( - path_or_buf, storage_options=storage_options - ).filepath_or_buffer - - if isinstance(path_or_buf, (str, bytes)): - self.path_or_buf = open(path_or_buf, "rb") + self.ioargs = get_filepath_or_buffer( + path_or_buf, storage_options=storage_options + ) + + if isinstance(self.ioargs.filepath_or_buffer, (str, bytes)): + self.ioargs.filepath_or_buffer = open(self.ioargs.filepath_or_buffer, "rb") + self.ioargs.should_close = True elif hasattr(path_or_buf, "read"): # Copy to BytesIO, and ensure no encoding - pb: Any = path_or_buf - contents = pb.read() - self.path_or_buf = BytesIO(contents) + contents = self.ioargs.filepath_or_buffer.read() + self.ioargs.close() + self.ioargs.filepath_or_buffer = BytesIO(contents) # type: ignore[arg-type] + self.ioargs.should_close = True + self.path_or_buf = cast(BytesIO, self.ioargs.filepath_or_buffer) self._read_header() self._setup_dtype() @@ -1085,10 +1086,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: def close(self) -> None: """ close the handle if its open """ - try: - self.path_or_buf.close() - except OSError: - pass + self.ioargs.close() def _set_encoding(self) -> None: """ From e65c4d9418aaa01fc349558c8661c95231a76bb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Mon, 2 Nov 2020 01:03:11 -0500 Subject: [PATCH 6/9] mostly comments --- pandas/_typing.py | 21 ++++++---- pandas/io/common.py | 8 ++-- pandas/io/excel/_base.py | 45 +++++++++++++--------- pandas/io/formats/csvs.py | 11 +++--- pandas/io/json/_json.py | 20 +++++----- pandas/io/parsers.py | 22 +++++------ pandas/io/pickle.py | 18 ++++----- pandas/io/stata.py | 38 +++++++++--------- pandas/tests/frame/methods/test_to_csv.py | 7 ++-- pandas/tests/io/json/test_readlines.py | 2 +- pandas/tests/io/test_compression.py | 26 ++++++------- pandas/tests/series/methods/test_to_csv.py | 6 +-- 12 files changed, 117 insertions(+), 107 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index 571ec3820a002..a5d2c32807ce8 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -1,4 +1,4 @@ -from dataclasses import dataclass +import dataclasses from datetime import datetime, timedelta, tzinfo from io import BufferedIOBase, RawIOBase, TextIOBase, TextIOWrapper from mmap import mmap @@ -155,11 +155,13 @@ FloatFormatType = Union[str, Callable, "EngFormatter"] -@dataclass +@dataclasses.dataclass class IOArgs(Generic[ModeVar, EncodingVar]): """ Return value of io/common.py:get_filepath_or_buffer. + This is used to easily close created fsspec objects. + Note (copy&past from io/parsers): filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile] though mypy handling of conditional imports is difficult. @@ -168,9 +170,9 @@ class IOArgs(Generic[ModeVar, EncodingVar]): filepath_or_buffer: FileOrBuffer encoding: EncodingVar - compression: CompressionDict - should_close: bool mode: Union[ModeVar, str] + compression: CompressionDict + should_close: bool = False def close(self) -> None: """ @@ -182,19 +184,22 @@ def close(self) -> None: self.should_close = False -@dataclass -class IOHandleArgs: +@dataclasses.dataclass +class IOHandles: """ Return value of io/common.py:get_handle + This is used to easily close created buffers and to handle corner cases when + TextIOWrapper is inserted. + handle: The file handle to be used. created_handles: All file handles that are created by get_handle is_wrapped: Whether a TextIOWrapper needs to be detached. """ handle: Buffer - created_handles: List[Buffer] - is_wrapped: bool + created_handles: List[Buffer] = dataclasses.field(default_factory=list) + is_wrapped: bool = False def close(self) -> None: """ diff --git a/pandas/io/common.py b/pandas/io/common.py index 5c3d646b3f37f..1b5cdae733710 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -38,7 +38,7 @@ FileOrBuffer, FilePathOrBuffer, IOArgs, - IOHandleArgs, + IOHandles, ModeVar, StorageOptions, ) @@ -468,7 +468,7 @@ def get_handle( memory_map: bool = False, is_text: bool = True, errors: Optional[str] = None, -) -> IOHandleArgs: +) -> IOHandles: """ Get file handle for given path/buffer and mode. @@ -514,7 +514,7 @@ def get_handle( .. versionchanged:: 1.2.0 - Returns the dataclass IOHandleArgs + Returns the dataclass IOHandles """ need_text_wrapping: Tuple[Type["IOBase"], ...] try: @@ -640,7 +640,7 @@ def get_handle( handles.reverse() # close the most recently added buffer first assert not isinstance(f, str) - return IOHandleArgs( + return IOHandles( handle=f, created_handles=handles, is_wrapped=is_wrapped, diff --git a/pandas/io/excel/_base.py b/pandas/io/excel/_base.py index 3fcde0d366874..90a0f3c624a54 100644 --- a/pandas/io/excel/_base.py +++ b/pandas/io/excel/_base.py @@ -8,7 +8,7 @@ from pandas._config import config from pandas._libs.parsers import STR_NA_VALUES -from pandas._typing import StorageOptions +from pandas._typing import IOArgs, StorageOptions from pandas.errors import EmptyDataError from pandas.util._decorators import Appender, deprecate_nonkeyword_arguments @@ -349,29 +349,37 @@ def read_excel( class BaseExcelReader(metaclass=abc.ABCMeta): def __init__(self, filepath_or_buffer, storage_options: StorageOptions = None): + self.ioargs = IOArgs( + filepath_or_buffer=filepath_or_buffer, + encoding=None, + mode=None, + compression={"method": None}, + ) # If filepath_or_buffer is a url, load the data into a BytesIO - self.handles = [] if is_url(filepath_or_buffer): - filepath_or_buffer = BytesIO(urlopen(filepath_or_buffer).read()) - self.handles.append(filepath_or_buffer) + self.ioargs = IOArgs( + filepath_or_buffer=BytesIO(urlopen(filepath_or_buffer).read()), + should_close=True, + encoding=None, + mode=None, + compression={"method": None}, + ) elif not isinstance(filepath_or_buffer, (ExcelFile, self._workbook_class)): - ioargs = get_filepath_or_buffer( + self.ioargs = get_filepath_or_buffer( filepath_or_buffer, storage_options=storage_options ) - filepath_or_buffer = ioargs.filepath_or_buffer - if ioargs.should_close: - self.handles.append(filepath_or_buffer) - if isinstance(filepath_or_buffer, self._workbook_class): - self.book = filepath_or_buffer - elif hasattr(filepath_or_buffer, "read"): + if isinstance(self.ioargs.filepath_or_buffer, self._workbook_class): + self.book = self.ioargs.filepath_or_buffer + elif hasattr(self.ioargs.filepath_or_buffer, "read"): # N.B. xlrd.Book has a read attribute too - filepath_or_buffer.seek(0) - self.book = self.load_workbook(filepath_or_buffer) - elif isinstance(filepath_or_buffer, str): - self.book = self.load_workbook(filepath_or_buffer) - elif isinstance(filepath_or_buffer, bytes): - self.book = self.load_workbook(BytesIO(filepath_or_buffer)) + assert not isinstance(self.ioargs.filepath_or_buffer, str) + self.ioargs.filepath_or_buffer.seek(0) + self.book = self.load_workbook(self.ioargs.filepath_or_buffer) + elif isinstance(self.ioargs.filepath_or_buffer, str): + self.book = self.load_workbook(self.ioargs.filepath_or_buffer) + elif isinstance(self.ioargs.filepath_or_buffer, bytes): + self.book = self.load_workbook(BytesIO(self.ioargs.filepath_or_buffer)) else: raise ValueError( "Must explicitly set engine if not passing in buffer or path for io." @@ -387,8 +395,7 @@ def load_workbook(self, filepath_or_buffer): pass def close(self): - for handle in self.handles: - handle.close() + self.ioargs.close() @property @abc.abstractmethod diff --git a/pandas/io/formats/csvs.py b/pandas/io/formats/csvs.py index 3b559061f8b69..20226dbb3c9d4 100644 --- a/pandas/io/formats/csvs.py +++ b/pandas/io/formats/csvs.py @@ -226,9 +226,8 @@ def save(self) -> None: """ Create the writer & save. """ - # get a handle or wrap an existing handle to take care of 1) compression and - # 2) text -> byte conversion - handle_args = get_handle( + # apply compression and byte/text conversion + handles = get_handle( self.ioargs.filepath_or_buffer, self.ioargs.mode, encoding=self.ioargs.encoding, @@ -239,7 +238,7 @@ def save(self) -> None: try: # Note: self.encoding is irrelevant here self.writer = csvlib.writer( - handle_args.handle, # type: ignore[arg-type] + handles.handle, # type: ignore[arg-type] lineterminator=self.line_terminator, delimiter=self.sep, quoting=self.quoting, @@ -251,7 +250,9 @@ def save(self) -> None: self._save() finally: - handle_args.close() + # close compression and byte/text wrapper + handles.close() + # close any fsspec-like objects self.ioargs.close() def _save(self) -> None: diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 84c67baf782c3..39749797502ff 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -13,7 +13,7 @@ from pandas._typing import ( CompressionOptions, IndexLabel, - IOHandleArgs, + IOHandles, JSONSerializable, StorageOptions, ) @@ -92,19 +92,21 @@ def to_json( s = convert_to_line_delimits(s) if path_or_buf is not None: + # open fsspec URLs ioargs = get_filepath_or_buffer( path_or_buf, compression=compression, mode="wt", storage_options=storage_options, ) - handle_args = get_handle( + # apply compression and byte/text conversion + handles = get_handle( ioargs.filepath_or_buffer, "w", compression=ioargs.compression ) try: - handle_args.handle.write(s) + handles.handle.write(s) finally: - handle_args.close() + handles.close() ioargs.close() else: return s @@ -619,7 +621,7 @@ def __init__( self.chunksize = chunksize self.nrows_seen = 0 self.nrows = nrows - self.handle_args: Optional[IOHandleArgs] = None + self.handles: Optional[IOHandles] = None if self.chunksize is not None: self.chunksize = validate_integer("chunksize", self.chunksize, 1) @@ -668,13 +670,13 @@ def _get_data_from_filepath(self, filepath_or_buffer): pass if exists or not isinstance(filepath_or_buffer, str): - self.handle_args = get_handle( + self.handles = get_handle( filepath_or_buffer, "r", encoding=self.encoding, compression=self.compression, ) - filepath_or_buffer = self.handle_args.handle + filepath_or_buffer = self.handles.handle return filepath_or_buffer @@ -740,8 +742,8 @@ def close(self): If an open stream or file was passed, we leave it open. """ - if self.handle_args is not None: - self.handle_args.close() + if self.handles is not None: + self.handles.close() def __next__(self): if self.nrows: diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 19724772dda05..3fe23a16f1645 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -20,7 +20,7 @@ import pandas._libs.parsers as parsers from pandas._libs.parsers import STR_NA_VALUES from pandas._libs.tslibs import parsing -from pandas._typing import FilePathOrBuffer, IOHandleArgs, StorageOptions, Union +from pandas._typing import FilePathOrBuffer, IOHandles, StorageOptions, Union from pandas.errors import ( AbstractMethodError, EmptyDataError, @@ -1398,7 +1398,7 @@ def _validate_parse_dates_presence(self, columns: List[str]) -> None: ) def close(self): - self.handle_args.close() + self.handles.close() @property def _has_complex_date_col(self): @@ -1849,13 +1849,9 @@ def __init__(self, src, **kwds): + "Please use memory_map=False instead." ) - self.handle_args = IOHandleArgs( - handle=src, - created_handles=[], - is_wrapped=False, - ) + self.handles = IOHandles(handle=src) else: - self.handle_args = get_handle( + self.handles = get_handle( src, mode="r", encoding=kwds.get("encoding", None), @@ -1873,7 +1869,7 @@ def __init__(self, src, **kwds): self.usecols, self.usecols_dtype = _validate_usecols_arg(kwds["usecols"]) kwds["usecols"] = self.usecols - self._reader = parsers.TextReader(self.handle_args.handle, **kwds) + self._reader = parsers.TextReader(self.handles.handle, **kwds) self.unnamed_cols = self._reader.unnamed_cols passed_names = self.names is None @@ -2246,7 +2242,7 @@ def __init__(self, f, **kwds): self.comment = kwds["comment"] self._comment_lines = [] - self.handle_args = get_handle( + self.handles = get_handle( f, "r", encoding=self.encoding, @@ -2255,10 +2251,10 @@ def __init__(self, f, **kwds): ) # Set self.data to something that can read lines. - if hasattr(self.handle_args.handle, "readline"): - self._make_reader(self.handle_args.handle) + if hasattr(self.handles.handle, "readline"): + self._make_reader(self.handles.handle) else: - self.data = self.handle_args.handle + self.data = self.handles.handle # Get columns in two steps: infer from data, then # infer column indices from self.usecols if it is specified. diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index a18611277ba23..2c41b97ae8e29 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -92,17 +92,15 @@ def to_pickle( mode="wb", storage_options=storage_options, ) - handle_args = get_handle( + handles = get_handle( ioargs.filepath_or_buffer, "wb", compression=ioargs.compression, is_text=False ) if protocol < 0: protocol = pickle.HIGHEST_PROTOCOL try: - pickle.dump( - obj, handle_args.handle, protocol=protocol # type: ignore[arg-type] - ) + pickle.dump(obj, handles.handle, protocol=protocol) # type: ignore[arg-type] finally: - handle_args.close() + handles.close() ioargs.close() @@ -186,7 +184,7 @@ def read_pickle( ioargs = get_filepath_or_buffer( filepath_or_buffer, compression=compression, storage_options=storage_options ) - handle_args = get_handle( + handles = get_handle( ioargs.filepath_or_buffer, "rb", compression=ioargs.compression, is_text=False ) @@ -201,15 +199,15 @@ def read_pickle( with warnings.catch_warnings(record=True): # We want to silence any warnings about, e.g. moved modules. warnings.simplefilter("ignore", Warning) - return pickle.load(handle_args.handle) # type: ignore[arg-type] + return pickle.load(handles.handle) # type: ignore[arg-type] except excs_to_catch: # e.g. # "No module named 'pandas.core.sparse.series'" # "Can't get attribute '__nat_unpickle' on Tuple[IOHandleArgs, CompressionOptions]: +) -> Tuple[IOHandles, CompressionOptions]: """ Open a binary file or no-op if file-like. @@ -1956,7 +1956,7 @@ def _open_file_binary_write( ioargs = get_filepath_or_buffer( fname, mode="wb", compression=compression, storage_options=storage_options ) - handle_args = get_handle( + handles = get_handle( ioargs.filepath_or_buffer, "wb", compression=ioargs.compression, @@ -1966,8 +1966,8 @@ def _open_file_binary_write( ioargs.filepath_or_buffer, str ): # add handle created by get_filepath_or_buffer - handle_args.created_handles.append(ioargs.filepath_or_buffer) - return handle_args, ioargs.compression + handles.created_handles.append(ioargs.filepath_or_buffer) + return handles, ioargs.compression def _set_endianness(endianness: str) -> str: @@ -2235,7 +2235,7 @@ def _write(self, to_write: str) -> None: """ Helper to call encode before writing to file for Python 3 compat. """ - self.handle_args.handle.write( + self.handles.handle.write( to_write.encode(self._encoding) # type: ignore[arg-type] ) @@ -2243,7 +2243,7 @@ def _write_bytes(self, value: bytes) -> None: """ Helper to assert file is open before writing. """ - self.handle_args.handle.write(value) # type: ignore[arg-type] + self.handles.handle.write(value) # type: ignore[arg-type] def _prepare_categoricals(self, data: DataFrame) -> DataFrame: """ @@ -2507,14 +2507,14 @@ def _encode_strings(self) -> None: self.data[col] = encoded def write_file(self) -> None: - self.handle_args, compression = _open_file_binary_write( + self.handles, compression = _open_file_binary_write( self._fname, self._compression, storage_options=self.storage_options ) if compression is not None: # ZipFile creates a file (with the same name) for each write call. # Write it first into a buffer and then write the buffer to the ZipFile. - self._output_file = self.handle_args.handle - self.handle_args.handle = BytesIO() + self._output_file = self.handles.handle + self.handles.handle = BytesIO() try: self._write_header(data_label=self._data_label, time_stamp=self._time_stamp) self._write_map() @@ -2556,14 +2556,14 @@ def _close(self) -> None: """ # write compression if self._output_file is not None: - assert isinstance(self.handle_args.handle, BytesIO) - bio = self.handle_args.handle + assert isinstance(self.handles.handle, BytesIO) + bio = self.handles.handle bio.seek(0) - self.handle_args.handle = self._output_file - self.handle_args.handle.write(bio.read()) # type: ignore[arg-type] + self.handles.handle = self._output_file + self.handles.handle.write(bio.read()) # type: ignore[arg-type] bio.close() # close any created handles - self.handle_args.close() + self.handles.close() def _write_map(self) -> None: """No-op, future compatibility""" @@ -3115,8 +3115,8 @@ def _tag(val: Union[str, bytes], tag: str) -> bytes: def _update_map(self, tag: str) -> None: """Update map location for tag with file position""" - assert self.handle_args.handle is not None - self._map[tag] = self.handle_args.handle.tell() + assert self.handles.handle is not None + self._map[tag] = self.handles.handle.tell() def _write_header( self, @@ -3187,7 +3187,7 @@ def _write_map(self) -> None: self._map = dict( ( ("stata_data", 0), - ("map", self.handle_args.handle.tell()), + ("map", self.handles.handle.tell()), ("variable_types", 0), ("varnames", 0), ("sortlist", 0), @@ -3203,7 +3203,7 @@ def _write_map(self) -> None: ) ) # Move to start of map - self.handle_args.handle.seek(self._map["map"]) + self.handles.handle.seek(self._map["map"]) bio = BytesIO() for val in self._map.values(): bio.write(struct.pack(self._byteorder + "Q", val)) diff --git a/pandas/tests/frame/methods/test_to_csv.py b/pandas/tests/frame/methods/test_to_csv.py index 4270768578ee9..3103f6e1ba0b1 100644 --- a/pandas/tests/frame/methods/test_to_csv.py +++ b/pandas/tests/frame/methods/test_to_csv.py @@ -1034,11 +1034,12 @@ def test_to_csv_compression(self, df, encoding, compression): tm.assert_frame_equal(df, result) # test the round trip using file handle - to_csv -> read_csv - handle_args = get_handle( + handles = get_handle( filename, "w", compression=compression, encoding=encoding ) - df.to_csv(handle_args.handle, encoding=encoding) - handle_args.close() + df.to_csv(handles.handle, encoding=encoding) + assert not handles.handle.closed + handles.close() result = pd.read_csv( filename, compression=compression, diff --git a/pandas/tests/io/json/test_readlines.py b/pandas/tests/io/json/test_readlines.py index 53e8de097d277..2e68d3306c7d1 100644 --- a/pandas/tests/io/json/test_readlines.py +++ b/pandas/tests/io/json/test_readlines.py @@ -143,7 +143,7 @@ def test_readjson_chunks_closes(chunksize): ) reader.read() assert ( - reader.handle_args.handle.closed + reader.handles.handle.closed ), f"didn't close stream with chunksize = {chunksize}" diff --git a/pandas/tests/io/test_compression.py b/pandas/tests/io/test_compression.py index 4d75d4bb01348..8d7d5d85cbb48 100644 --- a/pandas/tests/io/test_compression.py +++ b/pandas/tests/io/test_compression.py @@ -47,18 +47,18 @@ def test_compression_size(obj, method, compression_only): @pytest.mark.parametrize("method", ["to_csv", "to_json"]) def test_compression_size_fh(obj, method, compression_only): with tm.ensure_clean() as path: - handle_args = icom.get_handle(path, "w", compression=compression_only) - getattr(obj, method)(handle_args.handle) - assert not handle_args.handle.closed - handle_args.close() - assert handle_args.handle.closed + handles = icom.get_handle(path, "w", compression=compression_only) + getattr(obj, method)(handles.handle) + assert not handles.handle.closed + handles.close() + assert handles.handle.closed compressed_size = os.path.getsize(path) with tm.ensure_clean() as path: - handle_args = icom.get_handle(path, "w", compression=None) - getattr(obj, method)(handle_args.handle) - assert not handle_args.handle.closed - handle_args.close() - assert handle_args.handle.closed + handles = icom.get_handle(path, "w", compression=None) + getattr(obj, method)(handles.handle) + assert not handles.handle.closed + handles.close() + assert handles.handle.closed uncompressed_size = os.path.getsize(path) assert uncompressed_size > compressed_size @@ -111,10 +111,10 @@ def test_compression_warning(compression_only): columns=["X", "Y", "Z"], ) with tm.ensure_clean() as path: - handle_args = icom.get_handle(path, "w", compression=compression_only) + handles = icom.get_handle(path, "w", compression=compression_only) with tm.assert_produces_warning(RuntimeWarning, check_stacklevel=False): - df.to_csv(handle_args.handle, compression=compression_only) - handle_args.close() + df.to_csv(handles.handle, compression=compression_only) + handles.close() def test_compression_binary(compression_only): diff --git a/pandas/tests/series/methods/test_to_csv.py b/pandas/tests/series/methods/test_to_csv.py index 2e2266d88f167..714173158f4d6 100644 --- a/pandas/tests/series/methods/test_to_csv.py +++ b/pandas/tests/series/methods/test_to_csv.py @@ -143,11 +143,11 @@ def test_to_csv_compression(self, s, encoding, compression): tm.assert_series_equal(s, result) # test the round trip using file handle - to_csv -> read_csv - handle_args = get_handle( + handles = get_handle( filename, "w", compression=compression, encoding=encoding ) - s.to_csv(handle_args.handle, encoding=encoding, header=True) - handle_args.close() + s.to_csv(handles.handle, encoding=encoding, header=True) + handles.close() result = pd.read_csv( filename, compression=compression, From 13782212e18f950c39bb462d4788b3760a92db3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Tue, 3 Nov 2020 11:37:43 -0500 Subject: [PATCH 7/9] move memory_map from TextReader to CParserWrapper --- pandas/_libs/parsers.pyx | 47 +++++------------------ pandas/io/parsers.py | 30 ++++++--------- pandas/tests/io/parser/test_common.py | 26 +++++-------- pandas/tests/io/parser/test_textreader.py | 5 --- 4 files changed, 31 insertions(+), 77 deletions(-) diff --git a/pandas/_libs/parsers.pyx b/pandas/_libs/parsers.pyx index 6991e94e8b1fa..4b7a47c5f93c2 100644 --- a/pandas/_libs/parsers.pyx +++ b/pandas/_libs/parsers.pyx @@ -274,10 +274,9 @@ cdef class TextReader: # source: StringIO or file object ..versionchange:: 1.2.0 - removed 'compression' argument (outsourced to CParserWrapper). - removed 'memory_map' argument: CParserWrapper converts all - non-memory-mapped files to file handles. The remaining - (string) objects will be opened with 'memory_map=True'. + removed 'compression', 'memory_map', and 'encoding' argument. + These arguments are outsourced to CParserWrapper. + 'source' has to be a file handle. """ cdef: @@ -323,7 +322,6 @@ cdef class TextReader: quotechar=b'"', quoting=0, lineterminator=None, - encoding=None, comment=None, decimal=b'.', thousands=None, @@ -347,13 +345,7 @@ cdef class TextReader: bint skip_blank_lines=True): # set encoding for native Python and C library - if encoding is not None: - if not isinstance(encoding, bytes): - encoding = encoding.encode('utf-8') - encoding = encoding.lower() - self.c_encoding = encoding - else: - self.c_encoding = NULL + self.c_encoding = NULL self.parser = parser_new() self.parser.chunksize = tokenize_chunksize @@ -595,34 +587,15 @@ cdef class TextReader: cdef: void *ptr - self.parser.cb_io = NULL - self.parser.cb_cleanup = NULL - - if isinstance(source, str): - encoding = sys.getfilesystemencoding() or "utf-8" - source = source.encode(encoding) - ptr = new_mmap(source) - if ptr == NULL: - # fall back - ptr = new_file_source(source, self.parser.chunksize) - self.parser.cb_io = &buffer_file_bytes - self.parser.cb_cleanup = &del_file_source - else: - self.parser.cb_io = &buffer_mmap_bytes - self.parser.cb_cleanup = &del_mmap - self.parser.source = ptr - - elif hasattr(source, "read"): - # e.g., StringIO - - ptr = new_rd_source(source) - self.parser.source = ptr - self.parser.cb_io = &buffer_rd_bytes - self.parser.cb_cleanup = &del_rd_source - else: + if not hasattr(source, "read"): raise IOError(f'Expected file path name or file-like object, ' f'got {type(source)} type') + ptr = new_rd_source(source) + self.parser.source = ptr + self.parser.cb_io = &buffer_rd_bytes + self.parser.cb_cleanup = &del_rd_source + cdef _get_header(self): # header is now a list of lists, so field_count should use header[0] diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 3fe23a16f1645..3b4756dcacb7b 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -20,7 +20,7 @@ import pandas._libs.parsers as parsers from pandas._libs.parsers import STR_NA_VALUES from pandas._libs.tslibs import parsing -from pandas._typing import FilePathOrBuffer, IOHandles, StorageOptions, Union +from pandas._typing import FilePathOrBuffer, StorageOptions, Union from pandas.errors import ( AbstractMethodError, EmptyDataError, @@ -1834,33 +1834,27 @@ def __init__(self, src, **kwds): if kwds.get("memory_map", False): # memory-mapped files are directly handled by the TextReader. - # compression and file-object were never supported by it. src = stringify_path(src) - if not isinstance(src, str): - raise ValueError( - "read_csv supports only string-like objects with engine='c' " - + "and memory_map=True. Please use engine='python' instead." - ) - if get_compression_method(kwds.get("compression", None))[0] is not None: raise ValueError( "read_csv does not support compression with memory_map=True. " + "Please use memory_map=False instead." ) - self.handles = IOHandles(handle=src) - else: - self.handles = get_handle( - src, - mode="r", - encoding=kwds.get("encoding", None), - compression=kwds.get("compression", None), - is_text=True, - ) - kwds.pop("encoding", None) + self.handles = get_handle( + src, + mode="r", + encoding=kwds.get("encoding", None), + compression=kwds.get("compression", None), + memory_map=kwds.get("memory_map", False), + is_text=True, + ) + kwds.pop("encoding", None) kwds.pop("memory_map", None) kwds.pop("compression", None) + if kwds.get("memory_map", False) and hasattr(self.handles.handle, "mmap"): + self.handles.handle = self.handles.handle.mmap # #2442 kwds["allow_leading_cols"] = self.index_col is not False diff --git a/pandas/tests/io/parser/test_common.py b/pandas/tests/io/parser/test_common.py index 8ee98b7d29df1..e61a5fce99c69 100644 --- a/pandas/tests/io/parser/test_common.py +++ b/pandas/tests/io/parser/test_common.py @@ -2295,28 +2295,20 @@ def test_memory_map_compression_error(c_parser_only): parser.read_csv(path, memory_map=True, compression="gzip") -def test_memory_map_file_handle_error(all_parsers): +def test_memory_map_file_handle(all_parsers): """ - c-parsers do support only string-like files with memory_map=True. + Support some buffers with memory_map=True. GH 36997 """ parser = all_parsers expected = DataFrame({"a": [1], "b": [2]}) - msg = ( - "read_csv supports only string-like objects with engine='c' " - + "and memory_map=True. Please use engine='python' instead." - ) - handle = StringIO("a,b\n1,2") + handle = StringIO() + expected.to_csv(handle, index=False) + handle.seek(0) - if parser.engine != "python": - # c engine should fail - with pytest.raises(ValueError, match=msg): - parser.read_csv(handle, memory_map=True) - else: - # verify that python engine supports the same call - tm.assert_frame_equal( - parser.read_csv(handle, memory_map=True), - expected, - ) + tm.assert_frame_equal( + parser.read_csv(handle, memory_map=True), + expected, + ) diff --git a/pandas/tests/io/parser/test_textreader.py b/pandas/tests/io/parser/test_textreader.py index 59aecb4b393aa..413b78a52ad38 100644 --- a/pandas/tests/io/parser/test_textreader.py +++ b/pandas/tests/io/parser/test_textreader.py @@ -31,11 +31,6 @@ def test_file_handle(self): reader = TextReader(f) reader.read() - def test_string_filename(self): - # this is using memory_map=True - reader = TextReader(self.csv1, header=None) - reader.read() - def test_file_handle_mmap(self): # this was never using memory_map=True with open(self.csv1, "rb") as f: From 74c68725c7e3238c5281e337ba14e61a5cc5ffe2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Tue, 3 Nov 2020 18:54:02 -0500 Subject: [PATCH 8/9] moved IOArgs and IOHandles --- pandas/_typing.py | 64 ------------------------------------- pandas/io/common.py | 69 ++++++++++++++++++++++++++++++++++++++-- pandas/io/excel/_base.py | 3 +- pandas/io/json/_json.py | 8 +++-- pandas/io/stata.py | 8 +++-- 5 files changed, 81 insertions(+), 71 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index a5d2c32807ce8..c50cf14197145 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -153,67 +153,3 @@ # type of float formatter in DataFrameFormatter FloatFormatType = Union[str, Callable, "EngFormatter"] - - -@dataclasses.dataclass -class IOArgs(Generic[ModeVar, EncodingVar]): - """ - Return value of io/common.py:get_filepath_or_buffer. - - This is used to easily close created fsspec objects. - - Note (copy&past from io/parsers): - filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile] - though mypy handling of conditional imports is difficult. - See https://github.com/python/mypy/issues/1297 - """ - - filepath_or_buffer: FileOrBuffer - encoding: EncodingVar - mode: Union[ModeVar, str] - compression: CompressionDict - should_close: bool = False - - def close(self) -> None: - """ - Close the buffer if it was created by get_filepath_or_buffer. - """ - if self.should_close: - assert not isinstance(self.filepath_or_buffer, str) - self.filepath_or_buffer.close() - self.should_close = False - - -@dataclasses.dataclass -class IOHandles: - """ - Return value of io/common.py:get_handle - - This is used to easily close created buffers and to handle corner cases when - TextIOWrapper is inserted. - - handle: The file handle to be used. - created_handles: All file handles that are created by get_handle - is_wrapped: Whether a TextIOWrapper needs to be detached. - """ - - handle: Buffer - created_handles: List[Buffer] = dataclasses.field(default_factory=list) - is_wrapped: bool = False - - def close(self) -> None: - """ - Close all created buffers. - - Note: If a TextIOWrapper was inserted, it is flushed and detached to - avoid closing the potentially user-created buffer. - """ - if self.is_wrapped: - assert isinstance(self.handle, TextIOWrapper) - self.handle.flush() - self.handle.detach() - self.created_handles.remove(self.handle) - for handle in self.created_handles: - handle.close() - self.created_handles = [] - self.is_wrapped = False diff --git a/pandas/io/common.py b/pandas/io/common.py index 1b5cdae733710..5098ac25a3827 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -2,6 +2,7 @@ import bz2 from collections import abc +import dataclasses import gzip from io import BufferedIOBase, BytesIO, RawIOBase, TextIOWrapper import mmap @@ -13,11 +14,13 @@ Any, AnyStr, Dict, + Generic, List, Mapping, Optional, Tuple, Type, + Union, cast, ) from urllib.parse import ( @@ -37,8 +40,6 @@ EncodingVar, FileOrBuffer, FilePathOrBuffer, - IOArgs, - IOHandles, ModeVar, StorageOptions, ) @@ -58,6 +59,70 @@ from io import IOBase +@dataclasses.dataclass +class IOArgs(Generic[ModeVar, EncodingVar]): + """ + Return value of io/common.py:get_filepath_or_buffer. + + This is used to easily close created fsspec objects. + + Note (copy&past from io/parsers): + filepath_or_buffer can be Union[FilePathOrBuffer, s3fs.S3File, gcsfs.GCSFile] + though mypy handling of conditional imports is difficult. + See https://github.com/python/mypy/issues/1297 + """ + + filepath_or_buffer: FileOrBuffer + encoding: EncodingVar + mode: Union[ModeVar, str] + compression: CompressionDict + should_close: bool = False + + def close(self) -> None: + """ + Close the buffer if it was created by get_filepath_or_buffer. + """ + if self.should_close: + assert not isinstance(self.filepath_or_buffer, str) + self.filepath_or_buffer.close() + self.should_close = False + + +@dataclasses.dataclass +class IOHandles: + """ + Return value of io/common.py:get_handle + + This is used to easily close created buffers and to handle corner cases when + TextIOWrapper is inserted. + + handle: The file handle to be used. + created_handles: All file handles that are created by get_handle + is_wrapped: Whether a TextIOWrapper needs to be detached. + """ + + handle: Buffer + created_handles: List[Buffer] = dataclasses.field(default_factory=list) + is_wrapped: bool = False + + def close(self) -> None: + """ + Close all created buffers. + + Note: If a TextIOWrapper was inserted, it is flushed and detached to + avoid closing the potentially user-created buffer. + """ + if self.is_wrapped: + assert isinstance(self.handle, TextIOWrapper) + self.handle.flush() + self.handle.detach() + self.created_handles.remove(self.handle) + for handle in self.created_handles: + handle.close() + self.created_handles = [] + self.is_wrapped = False + + def is_url(url) -> bool: """ Check to see if a URL has a valid protocol. diff --git a/pandas/io/excel/_base.py b/pandas/io/excel/_base.py index 90a0f3c624a54..03c61c3ed8376 100644 --- a/pandas/io/excel/_base.py +++ b/pandas/io/excel/_base.py @@ -8,7 +8,7 @@ from pandas._config import config from pandas._libs.parsers import STR_NA_VALUES -from pandas._typing import IOArgs, StorageOptions +from pandas._typing import StorageOptions from pandas.errors import EmptyDataError from pandas.util._decorators import Appender, deprecate_nonkeyword_arguments @@ -17,6 +17,7 @@ from pandas.core.frame import DataFrame from pandas.io.common import ( + IOArgs, get_filepath_or_buffer, is_url, stringify_path, diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 39749797502ff..4cc7cf1594846 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -13,7 +13,6 @@ from pandas._typing import ( CompressionOptions, IndexLabel, - IOHandles, JSONSerializable, StorageOptions, ) @@ -27,7 +26,12 @@ from pandas.core.generic import NDFrame from pandas.core.reshape.concat import concat -from pandas.io.common import get_compression_method, get_filepath_or_buffer, get_handle +from pandas.io.common import ( + IOHandles, + get_compression_method, + get_filepath_or_buffer, + get_handle, +) from pandas.io.json._normalize import convert_to_line_delimits from pandas.io.json._table_schema import build_table_schema, parse_table_schema from pandas.io.parsers import validate_integer diff --git a/pandas/io/stata.py b/pandas/io/stata.py index e042ea1c32fca..7c7997f128086 100644 --- a/pandas/io/stata.py +++ b/pandas/io/stata.py @@ -28,7 +28,6 @@ Buffer, CompressionOptions, FilePathOrBuffer, - IOHandles, Label, StorageOptions, ) @@ -54,7 +53,12 @@ from pandas.core.indexes.base import Index from pandas.core.series import Series -from pandas.io.common import get_filepath_or_buffer, get_handle, stringify_path +from pandas.io.common import ( + IOHandles, + get_filepath_or_buffer, + get_handle, + stringify_path, +) _version_error = ( "Version of given Stata file is {version}. pandas supports importing " From 4dc58a6e311d43740027fd253ef96533eb726a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Torsten=20W=C3=B6rtwein?= Date: Tue, 3 Nov 2020 19:43:09 -0500 Subject: [PATCH 9/9] more comments --- pandas/_typing.py | 2 -- pandas/core/frame.py | 3 +-- pandas/io/common.py | 12 +++++++++--- pandas/io/formats/format.py | 7 +++---- pandas/io/json/_json.py | 2 ++ pandas/io/parsers.py | 4 +++- pandas/io/pickle.py | 4 ++++ 7 files changed, 22 insertions(+), 12 deletions(-) diff --git a/pandas/_typing.py b/pandas/_typing.py index c50cf14197145..3e89cf24632e2 100644 --- a/pandas/_typing.py +++ b/pandas/_typing.py @@ -1,4 +1,3 @@ -import dataclasses from datetime import datetime, timedelta, tzinfo from io import BufferedIOBase, RawIOBase, TextIOBase, TextIOWrapper from mmap import mmap @@ -11,7 +10,6 @@ Callable, Collection, Dict, - Generic, Hashable, List, Mapping, diff --git a/pandas/core/frame.py b/pandas/core/frame.py index e5224aa221d48..a3130ec27713d 100644 --- a/pandas/core/frame.py +++ b/pandas/core/frame.py @@ -2289,8 +2289,7 @@ def to_markdown( ioargs = get_filepath_or_buffer(buf, mode=mode, storage_options=storage_options) assert not isinstance(ioargs.filepath_or_buffer, (str, mmap.mmap)) ioargs.filepath_or_buffer.writelines(result) - if ioargs.should_close: - ioargs.filepath_or_buffer.close() + ioargs.close() return None @deprecate_kwarg(old_arg_name="fname", new_arg_name="path") diff --git a/pandas/io/common.py b/pandas/io/common.py index 5098ac25a3827..90a79e54015c4 100644 --- a/pandas/io/common.py +++ b/pandas/io/common.py @@ -84,7 +84,10 @@ def close(self) -> None: """ if self.should_close: assert not isinstance(self.filepath_or_buffer, str) - self.filepath_or_buffer.close() + try: + self.filepath_or_buffer.close() + except (OSError, ValueError): + pass self.should_close = False @@ -117,8 +120,11 @@ def close(self) -> None: self.handle.flush() self.handle.detach() self.created_handles.remove(self.handle) - for handle in self.created_handles: - handle.close() + try: + for handle in self.created_handles: + handle.close() + except (OSError, ValueError): + pass self.created_handles = [] self.is_wrapped = False diff --git a/pandas/io/formats/format.py b/pandas/io/formats/format.py index 85da90bc901c2..43e76d0aef490 100644 --- a/pandas/io/formats/format.py +++ b/pandas/io/formats/format.py @@ -1046,13 +1046,12 @@ def to_csv( """ from pandas.io.formats.csvs import CSVFormatter - created_buffer = False - if path_or_buf is None: + created_buffer = path_or_buf is None + if created_buffer: path_or_buf = StringIO() - created_buffer = True csv_formatter = CSVFormatter( - path_or_buf=path_or_buf, + path_or_buf=path_or_buf, # type: ignore[arg-type] line_terminator=line_terminator, sep=sep, encoding=encoding, diff --git a/pandas/io/json/_json.py b/pandas/io/json/_json.py index 4cc7cf1594846..bfb57f415db3b 100644 --- a/pandas/io/json/_json.py +++ b/pandas/io/json/_json.py @@ -110,7 +110,9 @@ def to_json( try: handles.handle.write(s) finally: + # close compression and byte/text wrapper handles.close() + # close any fsspec-like objects ioargs.close() else: return s diff --git a/pandas/io/parsers.py b/pandas/io/parsers.py index 3b4756dcacb7b..3b72869188344 100644 --- a/pandas/io/parsers.py +++ b/pandas/io/parsers.py @@ -466,7 +466,9 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds): try: data = parser.read(nrows) finally: + # close compression and byte/text wrapper parser.close() + # close any fsspec-like objects ioargs.close() return data @@ -1945,7 +1947,7 @@ def __init__(self, src, **kwds): def close(self) -> None: super().close() - # close additional handles opened by C parser (for memory_map) + # close additional handles opened by C parser try: self._reader.close() except ValueError: diff --git a/pandas/io/pickle.py b/pandas/io/pickle.py index 2c41b97ae8e29..6fa044b4651a5 100644 --- a/pandas/io/pickle.py +++ b/pandas/io/pickle.py @@ -100,7 +100,9 @@ def to_pickle( try: pickle.dump(obj, handles.handle, protocol=protocol) # type: ignore[arg-type] finally: + # close compression and byte/text wrapper handles.close() + # close any fsspec-like objects ioargs.close() @@ -209,5 +211,7 @@ def read_pickle( # e.g. can occur for files written in py27; see GH#28645 and GH#31988 return pc.load(handles.handle, encoding="latin-1") finally: + # close compression and byte/text wrapper handles.close() + # close any fsspec-like objects ioargs.close()