Skip to content

Support chunks in HashChecker #452

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
pmeier opened this issue May 24, 2022 · 0 comments
Open

Support chunks in HashChecker #452

pmeier opened this issue May 24, 2022 · 0 comments

Comments

@pmeier
Copy link
Contributor

pmeier commented May 24, 2022

Imagine I have a really large file that I want to read in chunks to avoid memory overflow:

import io
from torchdata.datapipes.iter import IterableWrapper, StreamReader

dp = IterableWrapper([("really_large_file.txt", io.BytesIO(b"foo\nbar\nbaz\n"))])
dp = StreamReader(dp, chunk=4)

for data in dp:
    print(data)
('really_large_file.txt', b'foo\n')
('really_large_file.txt', b'bar\n')
('really_large_file.txt', b'baz\n')

Now, it might be useful to also check the hash of the file. Naively, one could simply attach a

dp = HashChecker(
    dp,
    hash_dict={"really_large_file.txt": "268a5059001855fef30b4f95f82044ed"},
    hash_type="md5",
)

to the datapipe. Unfortunately this leads to a checksum error. This happens because if the input is a bytes, it will taken as the sole item for the hash computation:

if isinstance(data, (str, bytes, bytearray)):
if isinstance(data, str):
data = data.decode()
hash_func.update(data)

In contrast, if the input is a stream, it will be iterated and fully used for the computation:

# Not all streams have `read(bytes)` method.
# `__iter__` method is chosen because it is a common interface for IOBase.
for d in data:
hash_func.update(d)

Thus, placing the HashChecker before the StreamReader gives the wanted behavior here:

import io
from torchdata.datapipes.iter import IterableWrapper, HashChecker, StreamReader

dp = IterableWrapper([("really_large_file.txt", io.BytesIO(b"foo\nbar\nbaz\n"))])
dp = HashChecker(
    dp,
    hash_dict={"really_large_file.txt": "268a5059001855fef30b4f95f82044ed"},
    hash_type="md5",
)
dp = StreamReader(dp, chunk=4)

for data in dp:
    print(data)

However, this has several downsides:

  1. If the stream is not seekable, e.g. a HTTP response, there is nothing left for the StreamReader to read after the HashChecker is finished.
  2. We can't control how the stream is iterated. As the code comment implies, __iter__ is chosen since it is a common interface for all streams. However, the chunks returned by it have to be separated by a b"\n". Thus, when iterating over arbitrary binary streams we might read the whole file at once, which defeats the chunked behavior we want.
  3. We read from the stream twice since the data read by the HashChecker is not cached anywhere and StreamReader has to do it all over again.

Since the hash_func can be updated, would it be possible to introduce a cache based on the file name in case we encounter bytes? Something along the lines of

dp = iter(self.source_datapipe)
for file_name, data in dp:
    hash_func = ...

    if isinstance(data, (str, bytes, bytearray)):
        if isinstance(data, str):
            data = data.decode()
        hash_func.update(data)

        for file_name_, data_ in dp:
            if file_name_ != file_name:
                break

            if isinstance(data, str):
                data = data.decode()
            hash_func.update(data)
    else:
        ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant