Skip to content

Commit 0aae54d

Browse files
authored
[python] Ingest from S3 performance (#3872)
* h5ad read from s3 performance * add simplicity * increase cache size * more macro data refinement * cleanup * darn clang-format * remove dead code * py39 lint * fix pre py 3.12 typeguard error * log python packages installed * pr fb * PR fb * refactor for clarity
1 parent ee8aab4 commit 0aae54d

File tree

5 files changed

+405
-9
lines changed

5 files changed

+405
-9
lines changed

.github/workflows/python-ci-single.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ jobs:
118118
- name: Show XCode version
119119
run: clang --version
120120

121+
- name: Log pip dependencies
122+
run: pip list
123+
121124
- name: Run libtiledbsoma unit tests
122125
run: ctest --output-on-failure --test-dir build/libtiledbsoma -C Release --verbose
123126
env:
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
from __future__ import annotations
2+
3+
import ctypes
4+
import io
5+
import sys
6+
import threading
7+
from collections import OrderedDict
8+
from typing import Any, Union, cast
9+
10+
import attrs
11+
import pyarrow as pa
12+
from typing_extensions import Buffer, TypeAlias
13+
14+
# typeguard and cython memoryview incompat in older versions
15+
if sys.version_info >= (3, 12):
16+
WritableBuffer: TypeAlias = Union[memoryview, Buffer]
17+
else:
18+
WritableBuffer: TypeAlias = Any
19+
20+
21+
@attrs.define
22+
class CacheStats:
23+
block_idx: int
24+
hit: int = 0
25+
miss: int = 0
26+
27+
28+
class CachingReader:
29+
"""Buffering reader which maintains a LRU cache of previously read data, wrapping
30+
and presenting a file-like interface. Typical use is similar to ``io.BufferedReader``:
31+
32+
f = CachingReader(open("a file", mode="rb"), memory_budget=64*1024**2, cache_block_size=1024**2)
33+
34+
The LRU cache is user-defined by:
35+
* memory budget - total cache size in bytes
36+
* block_size - the unit of cached data. Also the read size
37+
38+
Instances present as a file-like object, and all public methods conform to the normal Python
39+
``io`` package semantics. Most, but not all methods required for io.RawIOBase (read-only) are
40+
implemented.
41+
42+
Class assumes that the underlying file is not changing concurrent with read.
43+
44+
Primary use case is wrapping a VFS FileBuffer object, to improve performance of read
45+
operations by doing fewer, larger reads.
46+
47+
The implemtation stores cached data as PyArrow arrays, primarily to benefit from the
48+
easy, minimal-copy concat and slice operations upon underlying buffers.
49+
"""
50+
51+
def __init__(
52+
self,
53+
file: Any, # file-like object. Unfortunately, Python lacks a good typing signature for this concept.
54+
*,
55+
memory_budget: int = 64 * 1024**2,
56+
cache_block_size: int = 1024**2,
57+
):
58+
if not file.readable():
59+
raise io.UnsupportedOperation("File must be readable")
60+
if memory_budget < cache_block_size:
61+
raise ValueError(
62+
"The memory_budget parameter must be greater than or equal to the cache_block_size"
63+
)
64+
65+
self._file = file
66+
self._file_length = file.seek(0, io.SEEK_END)
67+
file.seek(0)
68+
self._pos = 0
69+
70+
self._cache_block_size = cache_block_size
71+
self._max_cache_blocks = max(1, memory_budget // cache_block_size)
72+
_n_blocks = (self._file_length + cache_block_size - 1) // cache_block_size
73+
74+
self._cache_lock = threading.Lock()
75+
self._cache: OrderedDict[int, pa.UInt8Array] = OrderedDict()
76+
self._cache_stats: list[CacheStats] = [
77+
CacheStats(block_idx) for block_idx in range(_n_blocks)
78+
]
79+
80+
def _read_block(self, block_idx: int) -> pa.UInt8Array:
81+
nbytes = min(
82+
self._cache_block_size,
83+
self._file_length - block_idx * self._cache_block_size,
84+
)
85+
assert nbytes > 0
86+
buffer = pa.allocate_buffer(nbytes)
87+
ctypes.memset(buffer.address, 0, len(buffer)) # better safe than sorry
88+
self._file.seek(block_idx * self._cache_block_size)
89+
bytes_read = self._file.readinto(memoryview(buffer))
90+
assert nbytes == bytes_read == len(buffer)
91+
a = pa.UInt8Array.from_buffers(pa.uint8(), len(buffer), [None, buffer])
92+
return a
93+
94+
def _load_cache(self, start: int, end: int) -> list[pa.UInt8Array]:
95+
end = min(end, self._file_length)
96+
start_block = start // self._cache_block_size
97+
end_block = (end + self._cache_block_size - 1) // self._cache_block_size
98+
with self._cache_lock:
99+
missing_blocks = [
100+
i for i in range(start_block, end_block) if i not in self._cache
101+
]
102+
for block_idx in missing_blocks:
103+
self._cache_stats[block_idx].miss += 1
104+
self._cache[block_idx] = self._read_block(block_idx)
105+
106+
requested_blocks = [
107+
self._cache[block_idx] for block_idx in range(start_block, end_block)
108+
]
109+
110+
self._mark_and_sweep_blocks(start_block, end_block)
111+
112+
return requested_blocks
113+
114+
def _mark_and_sweep_blocks(self, start: int, stop: int) -> None:
115+
for block_idx in range(start, stop):
116+
self._cache.move_to_end(block_idx)
117+
self._cache_stats[block_idx].hit += 1
118+
119+
for i in range(max(0, len(self._cache) - self._max_cache_blocks)):
120+
self._cache.popitem(last=False)
121+
122+
def _reset_cache(self) -> None:
123+
with self._cache_lock:
124+
self._cache.clear()
125+
126+
def read(self, size: int = -1) -> bytes:
127+
"""Read up to size bytes from the object and return them. As a convenience, if size
128+
is unspecified or -1, all bytes until EOF are returned. Fewer than size bytes may be
129+
returned if the operating system call returns fewer than size bytes.
130+
131+
If 0 bytes are returned, and size was not 0, this indicates end of file."""
132+
if size is None:
133+
size = -1 # type: ignore[unreachable]
134+
if size < 0:
135+
size = self._file_length - self._pos
136+
if size == 0:
137+
return b""
138+
139+
blocks = self._load_cache(self._pos, self._pos + size)
140+
141+
start = self._pos % self._cache_block_size
142+
end = start + size
143+
arr = pa.chunked_array(blocks)[start:end].combine_chunks() # NB: copy
144+
assert arr.offset == 0
145+
b = arr.buffers()[1].to_pybytes() # NB: copy
146+
self._pos += len(b)
147+
return cast(bytes, b)
148+
149+
def readinto(self, buf: WritableBuffer) -> int | None:
150+
"""Read bytes into a pre-allocated, writable bytes-like object b,
151+
and return the number of bytes read."""
152+
if not isinstance(buf, memoryview):
153+
buf = memoryview(buf)
154+
if buf.nbytes == 0:
155+
return 0
156+
buf = buf.cast("B")
157+
158+
size = buf.nbytes
159+
blocks = self._load_cache(self._pos, self._pos + size)
160+
start = self._pos % self._cache_block_size
161+
end = start + size
162+
carr = pa.chunked_array(blocks)[start:end]
163+
164+
dst = ctypes.addressof(ctypes.c_char.from_buffer(buf))
165+
dstidx = 0
166+
for c in carr.chunks:
167+
src = c.buffers()[1].address + c.offset
168+
count = len(c)
169+
ctypes.memmove(dst + dstidx, src, count)
170+
dstidx += count
171+
172+
assert dstidx == len(carr)
173+
self._pos += dstidx
174+
return dstidx
175+
176+
def close(self) -> None:
177+
"""Close the file handle."""
178+
self._reset_cache()
179+
self._file.close()
180+
181+
def tell(self) -> int:
182+
"""Return integer indicating the file object's current position in the file.
183+
See ``io.RawIOBase.tell``
184+
"""
185+
return self._pos
186+
187+
def seek(self, offset: int, whence: int = 0) -> int:
188+
"""
189+
Change the stream position to the given byte offset, interpreted relative to the position
190+
indicated by whence, and return the new absolute position. Values for whence are:
191+
192+
os.SEEK_SET or 0 - start of the stream (the default); offset should be zero or positive
193+
os.SEEK_CUR or 1 - current stream position; offset may be negative
194+
os.SEEK_END or 2 - end of the stream; offset is usually negative
195+
196+
"""
197+
if whence == io.SEEK_SET:
198+
new_pos = 0 + offset
199+
elif whence == io.SEEK_CUR:
200+
new_pos = self._pos + offset
201+
elif whence == io.SEEK_END:
202+
new_pos = self._file_length + offset
203+
else:
204+
raise ValueError("Invalid whence value {whence})")
205+
206+
if new_pos < 0:
207+
raise OSError("seek() returned invalid position")
208+
209+
self._pos = new_pos
210+
return new_pos
211+
212+
@property
213+
def closed(self) -> bool:
214+
"""Return True if the stream is closed."""
215+
return bool(self._file.closed)
216+
217+
def readable(self) -> bool:
218+
"""Return True if the stream can be read from."""
219+
return bool(self._file.readable())
220+
221+
def seekable(self) -> bool:
222+
"""Return True if the file can be seeked."""
223+
return bool(self._file.seekable())

apis/python/src/tiledbsoma/io/_util.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from .._exception import SOMAError
2323
from .._types import Path
2424
from ..options import SOMATileDBContext
25+
from ._caching_reader import CachingReader
2526

2627
_pa_type_to_str_fmt = {
2728
pa.string(): "U",
@@ -51,7 +52,11 @@ def read_h5ad(
5152
"""
5253
ctx = ctx or SOMATileDBContext()
5354
vfs = clib.SOMAVFS(ctx.native_context)
54-
input_handle = clib.SOMAVFSFilebuf(vfs).open(str(input_path))
55+
input_handle = CachingReader(
56+
clib.SOMAVFSFilebuf(vfs).open(str(input_path)),
57+
memory_budget=64 * 1024**2,
58+
cache_block_size=8 * 1024**2,
59+
)
5560
try:
5661
with _hack_patch_anndata():
5762
anndata = ad.read_h5ad(_FSPathWrapper(input_handle, input_path), mode)

apis/python/src/tiledbsoma/soma_vfs.cc

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,25 @@ class SOMAVFSFilebuf : public tiledb::impl::VFSFilebuf {
3434
private:
3535
std::streamsize offset_ = 0;
3636
SOMAVFS vfs_;
37+
std::ios::openmode openmode_;
3738

3839
public:
3940
SOMAVFSFilebuf(const VFS& vfs)
4041
: tiledb::impl::VFSFilebuf(vfs)
4142
, vfs_(vfs){};
4243

44+
SOMAVFSFilebuf* open(const std::string& uri, std::ios::openmode openmode) {
45+
openmode_ = openmode;
46+
if (tiledb::impl::VFSFilebuf::open(uri, openmode) == nullptr) {
47+
// No std::format in C++17, and fmt::format is overkill
48+
// here
49+
std::stringstream ss;
50+
ss << "URI " << uri << " is not a valid URI";
51+
TPY_ERROR_LOC(ss.str());
52+
}
53+
return this;
54+
}
55+
4356
std::streamsize seek(std::streamsize offset, uint64_t whence) {
4457
if (whence == 0) {
4558
offset_ = seekoff(offset, std::ios::beg, std::ios::in);
@@ -70,9 +83,44 @@ class SOMAVFSFilebuf : public tiledb::impl::VFSFilebuf {
7083
return py::bytes(buffer);
7184
}
7285

86+
std::streamsize readinto(py::buffer buffer) {
87+
py::buffer_info info = buffer.request();
88+
if (info.ndim != 1)
89+
throw std::runtime_error("Expected a 1-dimensional byte array");
90+
if (info.readonly)
91+
throw std::runtime_error("Cannot write to a read-only buffer");
92+
93+
auto nbytes = info.size;
94+
if (nbytes <= 0)
95+
return 0;
96+
97+
py::gil_scoped_release release;
98+
auto bytes_read = xsgetn(static_cast<char*>(info.ptr), nbytes);
99+
offset_ += bytes_read;
100+
py::gil_scoped_acquire acquire;
101+
102+
return bytes_read;
103+
}
104+
73105
std::streamsize tell() {
74106
return offset_;
75107
}
108+
109+
bool readable() {
110+
return (openmode_ & std::ios::in) != 0;
111+
}
112+
113+
bool writable() {
114+
return (openmode_ & std::ios::out) != 0;
115+
}
116+
117+
bool closed() {
118+
return !is_open();
119+
}
120+
121+
bool seekable() {
122+
return true;
123+
}
76124
};
77125

78126
void load_soma_vfs(py::module& m) {
@@ -88,18 +136,17 @@ void load_soma_vfs(py::module& m) {
88136
.def(
89137
"open",
90138
[](SOMAVFSFilebuf& buf, const std::string& uri) {
91-
auto fb = buf.open(uri, std::ios::in);
92-
if (fb == nullptr) {
93-
// No std::format in C++17, and fmt::format is overkill here
94-
std::stringstream ss;
95-
ss << "URI " << uri << " is not a valid URI";
96-
TPY_ERROR_LOC(ss.str());
97-
}
98-
return fb;
139+
return buf.open(uri, std::ios::in); // hardwired to read-only
99140
},
100141
py::call_guard<py::gil_scoped_release>())
101142
.def("read", &SOMAVFSFilebuf::read, "size"_a = -1)
143+
.def("readinto", &SOMAVFSFilebuf::readinto, "buffer"_a)
144+
.def("flush", [](SOMAVFSFilebuf& buf) {})
102145
.def("tell", &SOMAVFSFilebuf::tell)
146+
.def("readable", &SOMAVFSFilebuf::readable)
147+
.def("writable", &SOMAVFSFilebuf::writable)
148+
.def_property_readonly("closed", &SOMAVFSFilebuf::closed)
149+
.def("seekable", &SOMAVFSFilebuf::seekable)
103150
.def(
104151
"seek",
105152
&SOMAVFSFilebuf::seek,

0 commit comments

Comments
 (0)