Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/hats/io/file_io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
read_parquet_file_to_pandas,
read_parquet_metadata,
remove_directory,
unnest_headers_for_pandas,
write_dataframe_to_csv,
write_dataframe_to_parquet,
write_fits_image,
Expand Down
60 changes: 31 additions & 29 deletions src/hats/io/file_io/file_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import tempfile
import urllib.request
from collections.abc import Generator
from io import BytesIO
from pathlib import Path

import nested_pandas as npd
Expand All @@ -10,7 +12,6 @@
import pyarrow.dataset as pds
import pyarrow.parquet as pq
import upath.implementations.http
import yaml
from cdshealpix.skymap.skymap import Skymap
from pyarrow.dataset import Dataset
from upath import UPath
Expand Down Expand Up @@ -148,6 +149,22 @@ def write_dataframe_to_parquet(dataframe: pd.DataFrame, file_pointer):
dataframe.to_parquet(file_pointer.path, filesystem=file_pointer.fs)


def _parquet_precache_all_bytes(file_pointer): # pragma: no cover
if not isinstance(file_pointer, upath.implementations.http.HTTPPath):
return False
cache_options = file_pointer.fs.cache_options or {}
if "parquet_precache_all_bytes" not in cache_options:
return False
return cache_options["parquet_precache_all_bytes"]


def _precache_all_bytes(reading_method, file_pointer, **kwargs): # pragma: no cover
req_info = urllib.request.Request(file_pointer.path)
with urllib.request.urlopen(req_info) as req:
reader = BytesIO(req.read())
return reading_method(reader, **kwargs)


def read_parquet_metadata(file_pointer: str | Path | UPath, **kwargs) -> pq.FileMetaData:
"""Read FileMetaData from footer of a single Parquet file.

Expand All @@ -158,8 +175,10 @@ def read_parquet_metadata(file_pointer: str | Path | UPath, **kwargs) -> pq.File
file_pointer = get_upath(file_pointer)
if file_pointer is None or not file_pointer.exists():
raise FileNotFoundError("Parquet file does not exist")
parquet_file = pq.read_metadata(file_pointer.path, filesystem=file_pointer.fs, **kwargs)
return parquet_file
if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return _precache_all_bytes(pq.read_metadata, file_pointer, **kwargs)

return pq.read_metadata(file_pointer.path, filesystem=file_pointer.fs, **kwargs)


def read_parquet_file(file_pointer: str | Path | UPath, **kwargs) -> pq.ParquetFile:
Expand All @@ -172,6 +191,10 @@ def read_parquet_file(file_pointer: str | Path | UPath, **kwargs) -> pq.ParquetF
file_pointer = get_upath(file_pointer)
if file_pointer is None or not file_pointer.exists():
raise FileNotFoundError("Parquet file does not exist")

if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return _precache_all_bytes(pq.ParquetFile, file_pointer, **kwargs)

return pq.ParquetFile(file_pointer.path, filesystem=file_pointer.fs, **kwargs)


Expand Down Expand Up @@ -262,18 +285,6 @@ def write_fits_image(histogram: np.ndarray, map_file_pointer: str | Path | UPath
_map_file.write(_tmp_file.read())


def read_yaml(file_handle: str | Path | UPath):
"""Reads yaml file from filesystem.

Args:
file_handle: location of yaml file
"""
file_handle = get_upath(file_handle)
with file_handle.open("r", encoding="utf-8") as _file:
metadata = yaml.safe_load(_file)
return metadata


def delete_file(file_handle: str | Path | UPath):
"""Deletes file from filesystem.

Expand All @@ -284,19 +295,6 @@ def delete_file(file_handle: str | Path | UPath):
file_handle.unlink()


def unnest_headers_for_pandas(storage_options: dict | None) -> dict | None:
"""Handle storage options for pandas read/write methods.
This is needed because fsspec http storage options are nested under the "headers" key,
see https://github.com/astronomy-commons/hipscat/issues/295
"""
if storage_options is not None and "headers" in storage_options:
# Copy the storage options to avoid modifying the original dictionary
storage_options_copy = storage_options.copy()
headers = storage_options_copy.pop("headers")
return {**storage_options_copy, **headers}
return storage_options


def read_parquet_file_to_pandas(file_pointer: str | Path | UPath, **kwargs) -> npd.NestedFrame:
"""Reads parquet file(s) to a pandas DataFrame

Expand All @@ -310,14 +308,18 @@ def read_parquet_file_to_pandas(file_pointer: str | Path | UPath, **kwargs) -> n
file_pointer = get_upath(file_pointer)
# If we are trying to read a directory over http, we need to send the explicit list of files instead.
# We don't want to get the list unnecessarily because it can be expensive.
if isinstance(file_pointer, upath.implementations.http.HTTPPath) and file_pointer.is_dir():
if (
isinstance(file_pointer, upath.implementations.http.HTTPPath) and file_pointer.is_dir()
): # pragma: no cover
file_pointers = [f for f in file_pointer.iterdir() if f.is_file()]
return npd.read_parquet(
file_pointers,
filesystem=file_pointer.fs,
partitioning=None, # Avoid the ArrowTypeError described in #367
**kwargs,
)
if _parquet_precache_all_bytes(file_pointer): # pragma: no cover
return _precache_all_bytes(pq.ParquetFile, file_pointer, partitioning=None, **kwargs)
return npd.read_parquet(
file_pointer.path,
filesystem=file_pointer.fs,
Expand Down
25 changes: 0 additions & 25 deletions tests/hats/io/file_io/test_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
read_parquet_dataset,
read_parquet_file_to_pandas,
remove_directory,
unnest_headers_for_pandas,
write_dataframe_to_csv,
write_fits_image,
write_string_to_file,
Expand Down Expand Up @@ -160,27 +159,3 @@ def test_write_point_map_roundtrip(small_sky_order1_dir, tmp_path):
write_fits_image(expected_counts_skymap, output_map_pointer)
counts_skymap = read_fits_image(output_map_pointer)
np.testing.assert_array_equal(counts_skymap, expected_counts_skymap)


def test_unnest_headers_for_pandas():
storage_options = {
"headers": {"Authorization": "Bearer my_token"},
}
storage_options_str = {"Authorization": "Bearer my_token"}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

storage_options = {
"key1": "value1",
"headers": {"Authorization": "Bearer my_token", "Content": "X"},
}
storage_options_str = {"key1": "value1", "Authorization": "Bearer my_token", "Content": "X"}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

storage_options = {
"key1": "value1",
"key2": None,
}
storage_options_str = {"key1": "value1", "key2": None}
assert storage_options_str == unnest_headers_for_pandas(storage_options)

assert None is unnest_headers_for_pandas(None)
Loading