Skip to content

WIP: DataFrame support #84

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed

Conversation

jreback
Copy link

@jreback jreback commented Oct 15, 2016

In [1]: import numpy as np
   ...: import pandas as pd
   ...: import zarr
   ...: 
   ...: pd.options.display.max_rows=10
   ...: np.random.seed(1234)
   ...: N = 10000000
   ...: ngroups = 1000
   ...: strings = tm.rands_array(10, 1000)
   ...: 
   ...: df = pd.DataFrame({'A': np.arange(N),
   ...:                    'B': np.random.randint(0, ngroups, size=N),
   ...:                    'C': np.random.randn(N),
   ...:                    'D': pd.date_range('20130101', periods=ngroups).take(np.random.randint(0, ngroups, size=N)),
   ...:                    #'E': pd.Series(strings.take(np.random.randint(0, ngroups, size=N))).astype('category')})
   ...:                    'F': strings.take(np.random.randint(0, ngroups, size=N)),
   ...:                    })
   ...: g = zarr.group('foo')
   ...: 

In [2]: g['df'] = df

In [3]: g['df'][:]
Out[3]: 
               A    B         C          D           F
0              0  324  1.087268 2014-10-03  eiC7V8prQC
1              1  918 -0.094611 2013-08-25  VX7APS0R2U
2              2  800 -0.192653 2014-05-02  fSdKpkHLDw
3              3  335  1.888183 2014-02-12  6PQyrA8nl9
4              4  227 -0.741727 2013-01-24  HNHYSzGuSi
...          ...  ...       ...        ...         ...
9999995  9999995  103  0.213191 2014-05-03  yzJ7uNjpxy
9999996  9999996  823  0.548665 2013-11-24  VpqZ5qMMhx
9999997  9999997  166  1.382537 2014-10-13  62mxP9rpzu
9999998  9999998  606  0.368681 2014-08-15  8SSr6sQ9Rh
9999999  9999999  873  0.183906 2015-08-23  hrWnbHyEtv

[10000000 rows x 5 columns]

In [4]: %timeit g['df'][:]
1 loop, best of 3: 13.7 s per loop
In [5]: g['df']
Out[5]: 
Frame(/df, (10000000, 5), chunks=(19532, 1), )
 store: DirectoryStore

 Array(/df/data/A, int64, order=C)
  nbytes: 76.3M; nbytes_stored: 1.2M; ratio: 65.8; initialized: 512/512
  compressor: Blosc(cname=u'lz4', clevel=5, shuffle=1)
 Array(/df/data/B, int64, order=C)
  nbytes: 76.3M; nbytes_stored: 17.3M; ratio: 4.4; initialized: 512/512
  compressor: Blosc(cname=u'lz4', clevel=5, shuffle=1)
 Array(/df/data/C, float64, order=C)
  nbytes: 76.3M; nbytes_stored: 72.5M; ratio: 1.1; initialized: 512/512
  compressor: Blosc(cname=u'lz4', clevel=5, shuffle=1)
 Array(/df/data/D, int64, order=C)
  nbytes: 76.3M; nbytes_stored: 51.9M; ratio: 1.5; initialized: 512/512
  compressor: Blosc(cname=u'lz4', clevel=5, shuffle=1)
 Array(/df/data/F, object, order=C)
  nbytes: 76.3M; nbytes_stored: 40.1M; ratio: 1.9; initialized: 512/512
  filters: PickleCodec()
  compressor: Blosc(cname=u'lz4', clevel=5, shuffle=1)

very preliminary WIP for DataFrame support directly in zarr. This adds a new meta type like Array, Group, called Frame. It has top-level metadata (nrows, columns, dtypes) and stores the data in individual Arrays. These can be individuall compressed via different compressors, if the default is specified. (I would propose the default be what @mrocklin mentioned in castra to start).

Uses @alimanfoo pickle filter directly. chunking is done (but no selection yet). Currently just supporting set/getitem access for columnar access.

I would propose adding:

g['df'].loc[row_indexer, columns]

for setting / getting (main for chunking support). This preserves the pandas semantics of [] for columnar access, while .loc is for both row/columns. This deviates a bit from zarr semantics though.

minimal testing ATM, should be able to support all numpy dtypes that pandas supports; categorical soon. I had to add the notion of a storage dtype and a semantic one (eg. M8[ns] is actually stored as i8, so the actual store shows that above). This is an implementation detail but I don't think a big deal. Need this for example, for category support (where we actually have a sub-group with codes/categories).

I was trying to isolate pandas so that it would not be a requirement unless you actually tried to save a DataFrame. Currently it must be there to import zarr, but I think that could be relaxed.

I extracted some things from Array and created a Base, more could be done here.

Finally I think it might be worth adding a filter which does arrow encoding for object arrays (in reality just a pair of cython functions could do it without making feather a dep, e.g. https://github.com/wesm/feather/blob/master/python/feather/interop.h) and then saving as values, offsets (like how we do for categories). Might be interesting / better performed than pickle. IDK.

xref dask/dask#1599

@jreback
Copy link
Author

jreback commented Oct 15, 2016

cc @alimanfoo
cc @mrocklin
cc @wesm
cc @shoyer

@mrocklin
Copy link
Contributor

Cool to see work in this direction. Two questions:

Why the need for a separate Frame type? I wonder if a dataframe could just be a Group that is laid out in a certain way.

Is this in scope for Zarr? I wonder if something like this should be external. It would be nice to see Zarr stay minimal and somewhat general.

@jreback
Copy link
Author

jreback commented Oct 15, 2016

Frame is essentially a Group that is laid out. But a couple of issues that make it special.

  • you want to hide the actual storage format (e.g. say I want to store dates as i8), but re-serialize to M8[ns]. This could in theory actually be handled by Array
  • I may want to specify how to chunk overall, and potentially filters on a Dataframe basis and have it consistent across all columns.

Sure this could be a separate package, zframe or whatever. Though re-uses lots of the zarr machinery (which does multiple things, e.g. storage backends, hierarchies, and arrays).

@jreback
Copy link
Author

jreback commented Oct 15, 2016

updated the top. forgot the object column :<

@mrocklin
Copy link
Contributor

Perhaps things like logical vs physical dtype mappings for dtypes could be stored in the group's metadata?

Your point about filtering across all columns does serve as pretty good motivation for a separate class. Some special API on top of groups would be definitely desired. Perhaps the Frame class could encapsulate a Zarr Group rather than re-implement its logic?

I think in my mind Pandas is mired in the unfortunate mess that is the real world while Zarr is still fairly clean. I'm somewhat in favor of building messy things on top of clean things, rather than within them. However the corollary here is that clean things need to make sure that people can extend them externally with ease. Perhaps there are lessons to be learned from this work?

fix normalized array path on frame storage
add pandas as dep (for now)
@mrocklin
Copy link
Contributor

Any thoughts from others on this topic?

@shoyer
Copy link
Contributor

shoyer commented Oct 16, 2016

Just a note -- it looks like you don't make use of date/time types directly, but I realized that these are ambiguously describe in the zarr spec (#85).

I'm also not sure this makes sense for zarr proper. Adding a query language to zarr (.loc) feels like a significant increase in scope, as does a specific serialization strategy for pandas's logical dtypes.

@alimanfoo
Copy link
Member

@jreback indeed very cool to see work in this direction. Here’s a few thoughts, very open to discussion on any of these points.

I have a strong desire to keep zarr lean and mean. Partly that’s because I am cautious about taking on any more code commitments, but also because I want to make sure zarr fills a well-defined niche and complements other libraries as much as possible. In particular, the original motivation for zarr was to provide a storage partner to dask.array that made the most of the parallel computing capabilities of dask. That helped to give a clear scope, i.e., it was clear what problem I was trying to solve, which was how to design a chunked compressed N-dimensional array store that enabled writing in parallel to different regions of an array. There were also some simplifying assumptions I could make which really helped to reduce complexity. Much of the original inspiration for zarr came from the carray class in bcolz, but because I was targeting a dask.array usage context, I could do away with some things, e.g. carray has several optimisations for iterating over items and for appending to an array one item at-a-time, which I did not need to include in zarr.

I say this because the interesting question that @mrocklin has now raised is, what would an ideal storage partner for dask.dataframe look like? I know a lot less about dask.dataframe and about out-of-core processing of tabular data in general, and so I have much less of a feel for what the problem we’re trying to solve is. I think what I would like to achieve in the short term is some set of minimal functionalities or enhancements or example code, either within zarr or pandas or dask or some sandbox package, that would allow @mrocklin to do some experimentation with dask.dataframe and zarr, and get some insight on where there could be significant benefits and synergy. I.e., where the niche is; what are the key problems not solved or at least not solved well by existing storage libraries? I am hoping @mrocklin and others can advise on what further minimal work would be required enable this experimentation. I’d like to proceed slowly because as I say I don’t understand the use cases and algorithms very well, so I don’t have a good sense of where there could be a lot of bang for buck, and anyway I’m sure some experiments are required and there may be some surprises.

Having said all that, I am open to the idea that zarr might be extended to include some kind of Frame or Table class. I’d be happy to explore this, but I think there are a number of questions that need careful discussion.

One question is, what would be the public API for a zarr Frame class? I.e., what are the public methods, including features supported via getitem and setitem? Clearly the natural thing to do is stick as close as possible to pandas, but where does zarr stop and dask.dataframe take over? To give an analogy, zarr.Array only supports indexing with slices; fancy indexing is left for dask.array or some other higher-level library. So what are the minimal set of features for Frame that would fill an important niche?

Another question is, what type(s) of parallel processing would be anticipated zarr.Frame being used for, and how would zarr.Frame implement synchronization/locking within the public API to provide maximum support? To give another anology, zarr.Array was designed for use as the data destination for chunked algorithms, and so allows writing of contiguous regions via setitem, with write locking done at the chunk level so different regions of the array can be written to in parallel. What are the major parallel use cases and synchronisation issues for a Frame class?

A third question is, technically, how to implement a Frame class, and what (if any) changes to the storage spec are required? In this PR you created a new class sibling to Array and Group. An alternative suggested by @mrocklin would be to sub-class or wrap the Group class. I tend to favour sub-classing Group as it could imply little or no changes to the storage spec, layering the functionality on the existing Group and metadata infrastructure. But there may be good architectural and/or performance/scalability reasons for doing otherwise that I am not aware of, so I don’t want to advocate strongly for any particular direction at this time and would rather delay making any commitment until knowing more.

A final question is, what would zarr.Frame do that bcolz.ctable doesn’t do? Are they attempting to fill the same niche, or not? If not, what are the differences in target use cases? If so, then why is it better to implement in Zarr rather than work from the ctable class? Again I don’t want to stifle any discussion, but I think I’d like to understand more about what problem(s) we are trying to provide a better solution for, and then much of this would become clear. In any case it would be good to include @FrancescAlted in the conversation.

Sorry for the long-winded response, again very cool to see this work, and very interested to hear thoughts.

@jreback
Copy link
Author

jreback commented Oct 19, 2016

@alimanfoo thanks for your response. So here's some philosophy and rationale that hopefully answers some questions (and brings up some more!). this is probably TL; DR. but DO read it!

I have recently been using dask to work with fairly large remotely stored data. This is a mix of dtypes, including some large text fields (with varying distribution of length within the field), numerics (mostly floats) and dates. This is hosted on a local S3 cluster (so network access is quite cheap, compared to a typical S3 on aws installation). The data is coming to me initially in .txt.gz (csv gzipped).

To put some numbers on this. I have about 2000 files, which when expanded are 1.5B rows, say 20 columns (several TB's in total). However at any one time I only really load up a couple of columns into distributed memory (currently using about 1.5TB memory across a few machines with 150 cores ATM).

This works pretty well now. IOW I can load it up, do some analysis, then .restart() the cluster and do some more. However this got me thinking about some storage options in order to improve the experience.

In an ideal scenario, I would perform a pre-processing step where I read in the data, clean (including dtype conversions), maybe re-partition, and re-store to s3 in a binary format.

Some conversions are pretty cheap so doing them after loading up from storage are cheap, so post-processing is not a big deal. IOW things like pd.to_datetime across partitions. Ideally I would also like to .categorize across the entire set (per-column of course), and re-store, but this can be fairly expensive to do.

Bottom line is any 'real' post-load-transforms requires a storage format to:

  • store dtypes (metadata)
  • be s3 friendly
  • be columns accessible
  • deal with object dtype nicely
  • dataframe friendly (meaning how much work to support various dtypes)

So I did a mini-test of some storage formats. Here are my thoughts (no particular order), and obviously repeating some known things.

  • gzipped csv
    • pros: familiar, s3 friendly (using s3fs)
    • cons: no per-column access, no meta data storage, slowish (but usually pretty adequate), s3 friendly, though gzip is detrimental to this.
  • HDF5 with various compression options (lzma, blosc)
    • pros: (keeps meta data), some per-column access, dataframe friendly
    • cons: not friendly to object dtypes as has fixed string storage (compression does help with though though)
  • gzipped feather (I also tried lzma but that was strictly worse)
    • pros: per-column access (though still must unzip the entire file, so this may be a con), efficient storing of objects (via automatic categorization & layout behind the scenes). dataframe friendly.
    • not compressed or chunked internally (not that much of a con in my case as I have a natural partition already)
  • castra
    • pros: compressed, chunked, dataframe friendly
    • cons: not supported any more :<, IIRC categoricals are handled, but not efficient for object dtypes
  • bcolz
    • pros: per-column access, chunked, compressed
    • cons: defaults blow up dataframe object columns to full storage (negating the compression). this can be fixed by categorizing, but requires a bit of user effort; also not easily able to store the categorical WITH the original frame (IOW, the bcolz groups don't allow an easy way to mix in OTHER objects w/o them being attrs). biggest con is that this generates lots of files, so is not too friendly to s3.
  • zarr
    • pros: per-column access, chunked, compressed
    • cons: not dataframe friendly, issues with object dtypes

So every format has various tradeoffs. Note that I am not being critical of any author here, rather the reverse. I am looking at a usecase which I think is actually pretty common, but not fully addressed.

So feather is quite friendly, but doesn't have the per-column access (and so must unzip the entire file), while bcolz would work, gets a little awkward once you need to move it to an external file system like s3.

So I liked the base that zarr provided. It seems building a zframe format on top is the right thing to do. It has 2 really nice properties. a HDF5 like interface which is familiar, and it has a ZipStore, which makes it really s3 friendly.

I am indifferent where to put this:

  • inside zarr: natural for users, does make more complex though
  • zframe: seems straightforward and could provide a nice interface.

@alimanfoo
Copy link
Member

Thanks @jreback, very interesting. FWIW I think working on this in a
separate package at least for now would be the best option, would at least
give you more freedom to experiment. My only suggestion would be to try and
implement Frame by sub-classing or wrapping zarr.Group if possible and
layering on top of existing Zarr API and storage spec, i.e., using the
custom attributes on Array and Group objects etc. If there are good reasons
to deviate this then of course fine.

On Wed, Oct 19, 2016 at 12:08 PM, Jeff Reback [email protected]
wrote:

@alimanfoo https://github.com/alimanfoo thanks for your response. So
here's some philosophy and rationale that hopefully answers some questions
(and brings up some more!). this is probably TL; DR. but DO read it!

I have recently been using dask to work with fairly large remotely stored
data. This is a mix of dtypes, including some large text fields (with
varying distribution of length within the field), numerics (mostly floats)
and dates. This is hosted on a local S3 cluster (so network access is quite
cheap, compared to a typical S3 on aws installation). The data is coming to
me initially in .txt.gz (csv gzipped).

To put some numbers on this. I have about 2000 files, which when expanded
are 1.5B rows, say 20 columns (several TB's in total). However at any one
time I only really load up a couple of columns into distributed memory
(currently using about 1.5TB memory across a few machines with 150 cores
ATM).

This works pretty well now. IOW I can load it up, do some analysis, then
.restart() the cluster and do some more. However this got me thinking
about some storage options in order to improve the experience.

In an ideal scenario, I would perform a pre-processing step where I read
in the data, clean (including dtype conversions), maybe re-partition,
and re-store to s3 in a binary format.

Some conversions are pretty cheap so doing them after loading up from
storage are cheap, so post-processing is not a big deal. IOW things like
pd.to_datetime across partitions. Ideally I would also like to .categorize
across the entire set (per-column of course), and re-store, but this can be
fairly expensive to do.

Bottom line is any 'real' post-load-transforms requires a storage format
to:

  • store dtypes (metadata)
  • be s3 friendly
  • be columns accessible
  • deal with object dtype nicely
  • dataframe friendly (meaning how much work to support various dtypes)

So I did a mini-test of some storage formats. Here are my thoughts (no
particular order), and obviously repeating some known things.

gzipped csv

  • pros: familiar, s3 friendly (using s3fs)

    • cons: no per-column access, no meta data storage, slowish (but
      usually pretty adequate), s3 friendly, though gzip is detrimental
      to this.

    HDF5 with various compression options (lzma, blosc)

  • pros: (keeps meta data), some per-column access, dataframe friendly

    • cons: not friendly to object dtypes as has fixed string storage
      (compression does help with though though)

    gzipped feather (I also tried lzma but that was strictly worse)

  • pros: per-column access (though still must unzip the entire file, so
    this may be a con), efficient storing of objects (via automatic
    categorization & layout behind the scenes). dataframe friendly.

    • not compressed or chunked internally (not that much of a con in
      my case as I have a natural partition already)

    castra

  • pros: compressed, chunked, dataframe friendly

    • cons: not supported any more :<, IIRC categoricals are handled,
      but not efficient for object dtypes

    bcolz

  • pros: per-column access, chunked, compressed

    • cons: defaults blow up dataframe object columns to full storage
      (negating the compression). this can be fixed by categorizing, but requires
      a bit of user effort; also not easily able to store the categorical WITH
      the original frame (IOW, the bcolz groups don't allow an easy way to mix in
      OTHER objects w/o them being attrs). biggest con is that this generates
      lots of files, so is not too friendly to s3.

    zarr

  • pros: per-column access, chunked, compressed

    • cons: not dataframe friendly, issues with object dtypes

So every format has various tradeoffs. Note that I am not being critical
of any author here, rather the reverse. I am looking at a usecase which I
think is actually pretty common, but not fully addressed.

So feather is quite friendly, but doesn't have the per-column access (and
so must unzip the entire file), while bcolz would work, gets a little
awkward once you need to move it to an external file system like s3.

So I liked the base that zarr provided. It seems building a zframe format
on top is the right thing to do. It has 2 really nice properties. a HDF5
like interface which is familiar, and it has a ZipStore, which makes it
really s3 friendly.

I am indifferent where to put this:

  • inside zarr: natural for users, does make more complex though
  • zframe: seems straightforward and could provide a nice interface.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
https://github.com/alimanfoo/zarr/pull/84#issuecomment-254782455, or mute
the thread
https://github.com/notifications/unsubscribe-auth/AAq8Qgyaqs2ZLCVCrGMZ8StibGwjizUkks5q1fo1gaJpZM4KX1em
.

Alistair Miles
Head of Epidemiological Informatics
Centre for Genomics and Global Health http://cggh.org
The Wellcome Trust Centre for Human Genetics
Roosevelt Drive
Oxford
OX3 7BN
United Kingdom
Email: [email protected]
Web: http://purl.org/net/aliman
Twitter: https://twitter.com/alimanfoo
Tel: +44 (0)1865 287721

@jreback
Copy link
Author

jreback commented Nov 2, 2016

going to close this for now.

parquet support is now fast & first class: https://github.com/martindurant/fastparquet/ (very recent), thanks @martindurant

I know @wesm is integrating this into pyarrow as well.

So in theory this is a good idea (prob as a separate package, maybe zframe), this would require substantial investments in time / effort for benchmarking.

I do think that this is a nice alternative format.

@jreback jreback closed this Nov 2, 2016
@alimanfoo
Copy link
Member

Thanks @jreback, happy to revisit in future if there is interest.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants