Skip to content

Determine if caching/lock is necessary in bgen reader #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
1 task done
eric-czech opened this issue Oct 8, 2020 · 4 comments
Closed
1 task done

Determine if caching/lock is necessary in bgen reader #315

eric-czech opened this issue Oct 8, 2020 · 4 comments

Comments

@eric-czech
Copy link
Collaborator

eric-czech commented Oct 8, 2020

Hey @horta I would love to get your thoughts on this line of code from https://github.com/pystatgen/sgkit/pull/314:

@cached(cache, lock=lock)  # type: ignore[misc]
def _read_metafile_partition(path: Path, partition: int) -> pd.DataFrame:

Did you find that dask was loading the same partition multiple times? Also, is the lock necessary because of any kind of thread-safety issue in the metafile reader code?

TODO:

  • Remove cachetools dependency if this isn't necessary
@eric-czech eric-czech mentioned this issue Oct 8, 2020
2 tasks
@horta
Copy link
Collaborator

horta commented Oct 8, 2020

Hey @eric-czech , the Dask was indeed calling for the same partition multiple times. To read genotype A and B we need their addresses which are in the some partition (block) of the metafile. If it happens to be in the same partition, as Dask is unaware of that, it will call that function twice. That is specially useful for users that are reading genotype sequentially: two contiguous genotypes are very likely to reside in the same partition, and therefore we want to avoid the second disk access. The way I thought of doing it is via that cache.

The lock was set to avoid parallel access to the cache (see issue limix/bgen-reader-py#13), which Dask does a lot. That caching library is not thread-safe: https://cachetools.readthedocs.io/en/stable/

Also importantly is to clear the cache if any change on the metafile has been detected: limix/bgen-reader-py#25

@eric-czech
Copy link
Collaborator Author

Ok thanks @horta. I don't think I quite follow on this though:

If it happens to be in the same partition, as Dask is unaware of that, it will call that function twice. That is specially useful for users that are reading genotype sequentially: two contiguous genotypes are very likely to reside in the same partition, and therefore we want to avoid the second disk access.

How do you ask for dask to fetch individual rows through the dataframe api without an index? I'm unable to reproduce that behavior with multiple calls to load the same partition except for the first partition, which I believe it loads initially to either get or validate dtypes. For example, I'm looking at:

import dask.dataframe as dd
from dask import delayed
import pandas as pd

def get_df(part):
    print('Fetching part', part)
    return pd.DataFrame({'x': list(range(3*part, 3*part+3)), 'y': [part]*3})
tasks = [
    delayed(get_df)(i)
    for i in range(4)
]
df = dd.from_delayed(tasks)
df.compute(scheduler='single-threaded') 
Fetching part 0  # Repeated once
Fetching part 0
Fetching part 1
Fetching part 2
Fetching part 3
    x  y
0   0  0
1   1  0
2   2  0
0   3  1
1   4  1
2   5  1
0   6  2
1   7  2
2   8  2
0   9  3
1  10  3
2  11  3

I could try to access individual rows sequentially but:

df.iloc[:3].compute()
NotImplementedError: 'DataFrame.iloc' only supports selecting columns

With an index, that works with .loc instead though I don't see the same partition loaded multiple times.

Is it possible that a lot of the repetitive partition loads you saw came from accessing multiple columns separately?

Let me know if I'm missing something, otherwise I went with removing the caching in https://github.com/pystatgen/sgkit/pull/314, assuming that no partitions other than the first are loaded multiple times with a non-distributed scheduler, and ultimately relying on dd.DataFrame.persist to cache the partitions for sequential access to different columns (which we have to do to convert to individual arrays for Xarray).

@horta
Copy link
Collaborator

horta commented Oct 9, 2020

I think I understand the difference.

You do not need this cache (nor lock) because you are not calling _read_metafile_partition within another lazy Dask object (https://github.com/eric-czech/sgkit/blob/582caf042c9b17f400e9fbd9c251d1f98dfe26a3/sgkit/io/bgen/bgen_reader.py#L287) whose call depend on the genotype index.

I on the other hand I'm doing exactly the above: https://github.com/limix/bgen-reader-py/blob/85db392cd2ec3aadb8193995ffe21b5af52fbca0/bgen_reader/_genotype.py#L43

Your code seems safe in that way.

@eric-czech
Copy link
Collaborator Author

Ahh I see, ty. Ok then we're all set on this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants