Skip to content

Commit 1f23cf5

Browse files
authored
feat: Add Async File Support (#34)
1 parent aea1d3b commit 1f23cf5

File tree

3 files changed

+212
-1
lines changed

3 files changed

+212
-1
lines changed

opendalfs/file.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22

3+
from fsspec.asyn import AbstractAsyncStreamedFile
34
from fsspec.spec import AbstractBufferedFile
5+
from opendal import AsyncFile as OpendalAsyncFile
46
from opendal import File as OpendalFile
57

68
logger = logging.getLogger("opendalfs")
@@ -148,3 +150,133 @@ def close(self):
148150
self._opendal_writer.close()
149151
finally:
150152
self._opendal_writer = None
153+
154+
155+
class OpendalAsyncBufferedFile(AbstractAsyncStreamedFile):
156+
"""Async buffered file implementation for OpenDAL."""
157+
158+
_opendal_writer: OpendalAsyncFile | None
159+
_append_via_write: bool
160+
_initiated: bool
161+
_exclusive_create: bool
162+
163+
def __init__(
164+
self,
165+
fs,
166+
path,
167+
mode="rb",
168+
block_size="default",
169+
autocommit=True,
170+
cache_type="readahead",
171+
cache_options=None,
172+
size=None,
173+
**kwargs,
174+
):
175+
self._exclusive_create = mode == "xb"
176+
normalized_mode = "wb" if self._exclusive_create else mode
177+
super().__init__(
178+
fs,
179+
path,
180+
mode=normalized_mode,
181+
block_size=block_size,
182+
autocommit=autocommit,
183+
cache_type=cache_type,
184+
cache_options=cache_options,
185+
size=size,
186+
**kwargs,
187+
)
188+
189+
self._opendal_writer = None
190+
self._append_via_write = False
191+
self._initiated = False
192+
193+
async def _fetch_range(self, start: int, end: int):
194+
if start >= end:
195+
return b""
196+
197+
reader = await self.fs.async_fs.open(self.path, "rb")
198+
try:
199+
await reader.seek(start)
200+
return await reader.read(end - start)
201+
finally:
202+
await reader.close()
203+
204+
async def _upload_chunk(self, final: bool = False):
205+
if not self._initiated:
206+
raise RuntimeError("Upload has not been initiated")
207+
208+
self.buffer.seek(0)
209+
chunk = self.buffer.read()
210+
211+
if not chunk:
212+
if not final:
213+
return False
214+
if self.mode == "ab" and self._append_via_write:
215+
if not await self.fs.async_fs.exists(self.path):
216+
await self.fs.async_fs.write(self.path, b"")
217+
return None
218+
await self._commit_upload()
219+
return None
220+
221+
if self.mode == "ab" and self._append_via_write:
222+
await self.fs.async_fs.write(self.path, chunk, append=True)
223+
return None
224+
225+
if self._opendal_writer is None:
226+
self._opendal_writer = await self.fs.async_fs.open(self.path, "wb")
227+
228+
await self._opendal_writer.write(chunk)
229+
230+
if final:
231+
await self._commit_upload()
232+
return None
233+
234+
async def _initiate_upload(self) -> None:
235+
if self._initiated:
236+
return
237+
238+
if self._exclusive_create and await self.fs.async_fs.exists(self.path):
239+
raise FileExistsError(self.path)
240+
241+
if self.mode == "ab":
242+
cap = self.fs.async_fs.capability()
243+
if getattr(cap, "write_can_append", False):
244+
self._append_via_write = True
245+
self.offset = self.loc
246+
else:
247+
try:
248+
existing = await self.fs.async_fs.read(self.path)
249+
except FileNotFoundError:
250+
existing = b""
251+
if existing:
252+
self._opendal_writer = await self.fs.async_fs.open(self.path, "wb")
253+
await self._opendal_writer.write(existing)
254+
self.offset = len(existing)
255+
256+
self._initiated = True
257+
258+
async def _commit_upload(self) -> None:
259+
if self.mode == "ab" and self._append_via_write:
260+
return
261+
262+
if self._opendal_writer is None:
263+
await self.fs.async_fs.write(self.path, b"")
264+
return
265+
266+
try:
267+
await self._opendal_writer.close()
268+
finally:
269+
self._opendal_writer = None
270+
271+
async def close(self):
272+
if self.closed:
273+
return
274+
275+
try:
276+
await super().close()
277+
finally:
278+
if self._opendal_writer is not None:
279+
try:
280+
await self._opendal_writer.close()
281+
finally:
282+
self._opendal_writer = None

opendalfs/fs.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from fsspec.asyn import AsyncFileSystem
33
import logging
44
from opendal import AsyncOperator, Operator
5-
from .file import OpendalBufferedFile
5+
from .file import OpendalAsyncBufferedFile, OpendalBufferedFile
66

77
logger = logging.getLogger("opendalfs")
88

@@ -147,6 +147,26 @@ def _open(
147147
**kwargs,
148148
)
149149

150+
async def open_async(self, path, mode="rb", **kwargs):
151+
if "b" not in mode or kwargs.get("compression"):
152+
raise ValueError
153+
154+
size = None
155+
if mode == "rb":
156+
info = await self.async_fs.stat(path)
157+
size = info.content_length
158+
159+
file = OpendalAsyncBufferedFile(self, path, mode, size=size, **kwargs)
160+
161+
if mode == "ab":
162+
try:
163+
info = await self.async_fs.stat(path)
164+
file.loc = info.content_length
165+
except FileNotFoundError:
166+
file.loc = 0
167+
168+
return file
169+
150170
async def _modified(self, path: str):
151171
"""Get modified time (async version)"""
152172
info = await self.async_fs.stat(path)

tests/core/test_file.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,62 @@ def test_open_exclusive_create(any_fs):
3131
with pytest.raises(FileExistsError):
3232
with any_fs.open("exists.txt", "xb") as f:
3333
f.write(b"y")
34+
35+
36+
@pytest.mark.asyncio
37+
async def test_open_async_read_seek():
38+
from opendalfs import OpendalFileSystem
39+
40+
fs = OpendalFileSystem(scheme="memory", asynchronous=True, skip_instance_cache=True)
41+
data = b"0123456789"
42+
await fs._pipe_file("readseek.txt", data)
43+
44+
async with await fs.open_async("readseek.txt", "rb") as f:
45+
assert await f.read(3) == b"012"
46+
assert f.tell() == 3
47+
48+
f.seek(5)
49+
assert await f.read(2) == b"56"
50+
51+
f.seek(-3, 2)
52+
assert await f.read() == b"789"
53+
54+
55+
@pytest.mark.asyncio
56+
async def test_open_async_write_chunked():
57+
from opendalfs import OpendalFileSystem
58+
59+
fs = OpendalFileSystem(scheme="memory", asynchronous=True, skip_instance_cache=True)
60+
61+
async with await fs.open_async("chunked.txt", "wb", block_size=3) as f:
62+
await f.write(b"abc")
63+
await f.write(b"def")
64+
await f.write(b"gh")
65+
66+
assert await fs._cat_file("chunked.txt") == b"abcdefgh"
67+
68+
69+
@pytest.mark.asyncio
70+
async def test_open_async_exclusive_create():
71+
from opendalfs import OpendalFileSystem
72+
73+
fs = OpendalFileSystem(scheme="memory", asynchronous=True, skip_instance_cache=True)
74+
await fs._pipe_file("exists.txt", b"x")
75+
76+
with pytest.raises(FileExistsError):
77+
async with await fs.open_async("exists.txt", "xb") as f:
78+
await f.write(b"y")
79+
80+
81+
@pytest.mark.asyncio
82+
async def test_open_async_append_emulated():
83+
from opendalfs import OpendalFileSystem
84+
85+
fs = OpendalFileSystem(scheme="memory", asynchronous=True, skip_instance_cache=True)
86+
await fs._pipe_file("append.txt", b"hello")
87+
88+
async with await fs.open_async("append.txt", "ab", block_size=2) as f:
89+
assert f.tell() == 5
90+
await f.write(b"world")
91+
92+
assert await fs._cat_file("append.txt") == b"helloworld"

0 commit comments

Comments
 (0)