Skip to content

Commit f102d25

Browse files
d4l3kfacebook-github-bot
authored andcommitted
torchdata/datapipes/iter/load/fsspec: added fsspec datapipes (#116)
Summary: This adds fsspec datapipes that are equivalent to the existing iopath ones. The tests are largely equivalent and test the `file://` and `memory://` filesystems to ensure compatibility. Closes #114 Pull Request resolved: #116 Test Plan: pytest tests/test_fsspec.py Reviewed By: ejguan Differential Revision: D32933676 Pulled By: d4l3k fbshipit-source-id: 30a0b9da385f5d9328d07105234a92d7259ae05d
1 parent 6309113 commit f102d25

File tree

5 files changed

+316
-1
lines changed

5 files changed

+316
-1
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
pip3 install -r requirements.txt
4545
pip3 install --pre torch -f https://download.pytorch.org/whl/nightly/cpu/torch_nightly.html
4646
- name: Install test requirements
47-
run: pip3 install expecttest iopath==0.1.9 numpy pytest rarfile
47+
run: pip3 install expecttest fsspec iopath==0.1.9 numpy pytest rarfile
4848
- name: Build TorchData
4949
run: python setup.py develop
5050
- name: Run DataPipes tests with pytest

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ ignore_missing_imports = True
3131

3232
[mypy-rarfile.*]
3333
ignore_missing_imports = True
34+
35+
[mypy-fsspec.*]
36+
ignore_missing_imports = True

test/test_fsspec.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
import expecttest
3+
import os
4+
import unittest
5+
import warnings
6+
7+
from torchdata.datapipes.iter import (
8+
FileLister,
9+
IterableWrapper,
10+
FSSpecFileLister,
11+
FSSpecFileOpener,
12+
FSSpecSaver,
13+
)
14+
15+
from _utils._common_utils_for_test import (
16+
create_temp_dir,
17+
create_temp_files,
18+
reset_after_n_next_calls,
19+
)
20+
21+
try:
22+
import fsspec
23+
24+
HAS_FSSPEC = True
25+
except ImportError:
26+
HAS_FSSPEC = False
27+
skipIfNoFSSpec = unittest.skipIf(not HAS_FSSPEC, "no fsspec")
28+
29+
30+
class TestDataPipeFSSpec(expecttest.TestCase):
31+
def setUp(self):
32+
self.temp_dir = create_temp_dir()
33+
self.temp_files = create_temp_files(self.temp_dir)
34+
self.temp_sub_dir = create_temp_dir(self.temp_dir.name)
35+
self.temp_sub_files = create_temp_files(self.temp_sub_dir, 4, False)
36+
37+
def tearDown(self):
38+
try:
39+
self.temp_sub_dir.cleanup()
40+
self.temp_dir.cleanup()
41+
except Exception as e:
42+
warnings.warn(
43+
f"TestDataPipeLocalIO was not able to cleanup temp dir due to {e}"
44+
)
45+
46+
def _write_text_files(self):
47+
def filepath_fn(name: str) -> str:
48+
return os.path.join(self.temp_dir.name, os.path.basename(name))
49+
50+
name_to_data = {"1.text": b"DATA", "2.text": b"DATA", "3.text": b"DATA"}
51+
source_dp = IterableWrapper(sorted(name_to_data.items()))
52+
saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb")
53+
list(saver_dp)
54+
55+
@skipIfNoFSSpec
56+
def test_fsspec_file_lister_iterdatapipe(self):
57+
datapipe = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)
58+
59+
# check all file paths within sub_folder are listed
60+
for path in datapipe:
61+
self.assertIn(
62+
path.split("://")[1],
63+
{
64+
fsspec.implementations.local.make_path_posix(file)
65+
for file in self.temp_sub_files
66+
},
67+
)
68+
69+
@skipIfNoFSSpec
70+
def test_fsspec_file_loader_iterdatapipe(self):
71+
datapipe1 = FSSpecFileLister(root="file://" + self.temp_sub_dir.name)
72+
datapipe2 = FSSpecFileOpener(datapipe1)
73+
74+
# check contents of file match
75+
for _, f in datapipe2:
76+
self.assertEqual(f.read(), "0123456789abcdef")
77+
78+
# Reset Test: Ensure the resulting streams are still readable after the DataPipe is reset/exhausted
79+
self._write_text_files()
80+
lister_dp = FileLister(self.temp_dir.name, "*.text")
81+
fsspec_file_loader_dp = FSSpecFileOpener(lister_dp, mode="rb")
82+
83+
n_elements_before_reset = 2
84+
res_before_reset, res_after_reset = reset_after_n_next_calls(
85+
fsspec_file_loader_dp, n_elements_before_reset
86+
)
87+
self.assertEqual(2, len(res_before_reset))
88+
self.assertEqual(3, len(res_after_reset))
89+
for _name, stream in res_before_reset:
90+
self.assertEqual(b"DATA", stream.read())
91+
for _name, stream in res_after_reset:
92+
self.assertEqual(b"DATA", stream.read())
93+
94+
@skipIfNoFSSpec
95+
def test_fsspec_saver_iterdatapipe(self):
96+
def filepath_fn(name: str) -> str:
97+
return "file://" + os.path.join(self.temp_dir.name, os.path.basename(name))
98+
99+
# Functional Test: Saving some data
100+
name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2", "3.txt": b"DATA3"}
101+
source_dp = IterableWrapper(sorted(name_to_data.items()))
102+
saver_dp = source_dp.save_by_fsspec(filepath_fn=filepath_fn, mode="wb")
103+
res_file_paths = list(saver_dp)
104+
expected_paths = [filepath_fn(name) for name in name_to_data.keys()]
105+
self.assertEqual(expected_paths, res_file_paths)
106+
for name in name_to_data.keys():
107+
p = filepath_fn(name).split("://")[1]
108+
with open(p, "r") as f:
109+
self.assertEqual(name_to_data[name], f.read().encode())
110+
111+
# Reset Test:
112+
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")
113+
n_elements_before_reset = 2
114+
res_before_reset, res_after_reset = reset_after_n_next_calls(
115+
saver_dp, n_elements_before_reset
116+
)
117+
self.assertEqual([filepath_fn("1.txt"), filepath_fn("2.txt")], res_before_reset)
118+
self.assertEqual(expected_paths, res_after_reset)
119+
for name in name_to_data.keys():
120+
p = filepath_fn(name).split("://")[1]
121+
with open(p, "r") as f:
122+
self.assertEqual(name_to_data[name], f.read().encode())
123+
124+
# __len__ Test: returns the length of source DataPipe
125+
self.assertEqual(3, len(saver_dp))
126+
127+
@skipIfNoFSSpec
128+
def test_fsspec_memory_list(self):
129+
fs = fsspec.filesystem("memory")
130+
fs.mkdir("foo")
131+
fs.touch("foo/bar1")
132+
fs.touch("foo/bar2")
133+
134+
datapipe = FSSpecFileLister(root="memory://foo")
135+
self.assertEqual(set(datapipe), {"memory:///foo/bar1", "memory:///foo/bar2"})
136+
137+
datapipe = FSSpecFileLister(root="memory://foo/bar1")
138+
self.assertEqual(set(datapipe), {"memory://foo/bar1"})
139+
140+
@skipIfNoFSSpec
141+
def test_fsspec_memory_load(self):
142+
fs = fsspec.filesystem("memory")
143+
with fs.open("file", "w") as f:
144+
f.write("hello")
145+
with fs.open("file2", "w") as f:
146+
f.write("hello2")
147+
148+
files = ["memory://file", "memory://file2"]
149+
datapipe = FSSpecFileOpener(files)
150+
self.assertEqual([f.read() for _, f in datapipe], ["hello", "hello2"])
151+
152+
@skipIfNoFSSpec
153+
def test_fsspec_memory_save(self):
154+
def filepath_fn(name: str) -> str:
155+
return "memory://" + name
156+
157+
name_to_data = {"1.txt": b"DATA1", "2.txt": b"DATA2"}
158+
source_dp = IterableWrapper(sorted(name_to_data.items()))
159+
saver_dp = FSSpecSaver(source_dp, filepath_fn=filepath_fn, mode="wb")
160+
161+
self.assertEqual(set(saver_dp), {"memory://1.txt", "memory://2.txt"})
162+
163+
164+
if __name__ == "__main__":
165+
unittest.main()

torchdata/datapipes/iter/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131
IoPathFileLoaderIterDataPipe as IoPathFileLoader,
3232
IoPathSaverIterDataPipe as IoPathSaver,
3333
)
34+
from torchdata.datapipes.iter.load.fsspec import (
35+
FSSpecFileListerIterDataPipe as FSSpecFileLister,
36+
FSSpecFileOpenerIterDataPipe as FSSpecFileOpener,
37+
FSSpecSaverIterDataPipe as FSSpecSaver,
38+
)
3439
from torchdata.datapipes.iter.transform.bucketbatcher import BucketBatcherIterDataPipe as BucketBatcher
3540
from torchdata.datapipes.iter.util.cacheholder import (
3641
EndOnDiskCacheHolderIterDataPipe as EndOnDiskCacheHolder,
@@ -80,6 +85,9 @@
8085
"EndOnDiskCacheHolder",
8186
"Enumerator",
8287
"Extractor",
88+
"FSSpecFileLister",
89+
"FSSpecFileOpener",
90+
"FSSpecSaver",
8391
"FileLister",
8492
"FileLoader",
8593
"Filter",
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
# Copyright (c) Facebook, Inc. and its affiliates.
2+
import os
3+
import posixpath
4+
5+
from typing import Any, Callable, Iterator, List, Optional, Tuple, Union
6+
7+
from torch.utils.data.datapipes.utils.common import match_masks
8+
9+
from torchdata.datapipes import functional_datapipe
10+
from torchdata.datapipes.iter import IterDataPipe
11+
from torchdata.datapipes.utils import StreamWrapper
12+
13+
try:
14+
import fsspec
15+
16+
except ImportError:
17+
fsspec = None
18+
19+
U = Union[bytes, bytearray, str]
20+
21+
22+
def _assert_fsspec() -> None:
23+
if fsspec is None:
24+
raise ModuleNotFoundError(
25+
"Package `fsspec` is required to be installed to use this datapipe."
26+
"Please use `pip install fsspec` or `conda install -c conda-forge fsspec`"
27+
"to install the package"
28+
)
29+
30+
31+
class FSSpecFileListerIterDataPipe(IterDataPipe[str]):
32+
r""":class:`FSSpecFileListerIterDataPipe`.
33+
34+
Iterable DataPipe to list the contents of the directory at the provided `root` pathname or url,
35+
and yields the full pathname or url for each file within the directory.
36+
37+
Args:
38+
root: The root fsspec path directory to list files from
39+
masks: Unix style filter string or string list for filtering file name(s)
40+
"""
41+
42+
def __init__(
43+
self,
44+
root: str,
45+
masks: Union[str, List[str]] = "",
46+
) -> None:
47+
_assert_fsspec()
48+
49+
self.root: str = root
50+
self.masks = masks
51+
52+
def __iter__(self) -> Iterator[str]:
53+
fs, path = fsspec.core.url_to_fs(self.root)
54+
is_local = fs.protocol == "file" or not self.root.startswith(fs.protocol)
55+
if fs.isfile(path):
56+
yield self.root
57+
else:
58+
for file_name in fs.ls(path):
59+
if not match_masks(file_name, self.masks):
60+
continue
61+
62+
# ensure the file name has the full fsspec protocol path
63+
if file_name.startswith(fs.protocol):
64+
yield file_name
65+
else:
66+
if is_local:
67+
abs_path = os.path.join(path, file_name)
68+
else:
69+
abs_path = posixpath.join(path, file_name)
70+
71+
if self.root.startswith(fs.protocol):
72+
yield fs.protocol + "://" + abs_path
73+
else:
74+
yield abs_path
75+
76+
77+
@functional_datapipe("open_file_by_fsspec")
78+
class FSSpecFileOpenerIterDataPipe(IterDataPipe[Tuple[str, StreamWrapper]]):
79+
r""":class:`FSSpecFileOpenerIterDataPipe`.
80+
81+
Iterable DataPipe to open files from input datapipe which contains fsspec paths
82+
and yields a tuple of pathname and opened file stream.
83+
84+
Args:
85+
source_datapipe: Iterable DataPipe that provides the pathnames or urls
86+
mode: An optional string that specifies the mode in which the file is opened ('r' by default)
87+
"""
88+
89+
def __init__(self, source_datapipe: IterDataPipe[str], mode: str = "r") -> None:
90+
_assert_fsspec()
91+
92+
self.source_datapipe: IterDataPipe[str] = source_datapipe
93+
self.mode: str = mode
94+
95+
def __iter__(self) -> Iterator[Tuple[str, StreamWrapper]]:
96+
for file_uri in self.source_datapipe:
97+
fs, path = fsspec.core.url_to_fs(file_uri)
98+
file = fs.open(path, self.mode)
99+
yield file_uri, StreamWrapper(file)
100+
101+
def __len__(self) -> int:
102+
return len(self.source_datapipe)
103+
104+
105+
@functional_datapipe("save_by_fsspec")
106+
class FSSpecSaverIterDataPipe(IterDataPipe[str]):
107+
r"""
108+
Iterable DataPipe that takes in a DataPipe of tuples of metadata and data, saves the data
109+
to the target path (generated by the filepath_fn and metadata), and yields the resulting fsspec
110+
path
111+
112+
Args:
113+
source_datapipe: Iterable DataPipe with tuples of metadata and data
114+
mode: Mode in which the file will be opened for write the data ("w" by default)
115+
filepath_fn: Function that takes in metadata nad returns the target path of the new file
116+
"""
117+
118+
def __init__(
119+
self,
120+
source_datapipe: IterDataPipe[Tuple[Any, U]],
121+
mode: str = "w",
122+
filepath_fn: Optional[Callable] = None,
123+
):
124+
_assert_fsspec()
125+
126+
self.source_datapipe: IterDataPipe[Tuple[Any, U]] = source_datapipe
127+
self.mode: str = mode
128+
self.filepath_fn: Optional[Callable] = filepath_fn
129+
130+
def __iter__(self) -> Iterator[str]:
131+
for meta, data in self.source_datapipe:
132+
filepath = meta if self.filepath_fn is None else self.filepath_fn(meta)
133+
fs, path = fsspec.core.url_to_fs(filepath)
134+
with fs.open(path, self.mode) as f:
135+
f.write(data)
136+
yield filepath
137+
138+
def __len__(self) -> int:
139+
return len(self.source_datapipe)

0 commit comments

Comments
 (0)