Skip to content

xr.DataArray.from_dask_dataframe feature #4659

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: main
Choose a base branch
from

Conversation

AyrtonB
Copy link
Contributor

@AyrtonB AyrtonB commented Dec 7, 2020

This feature allows users to convert Dask DataFrames (of a single type) into a DataArray that uses a Dask array. This solves a gap in the Python ecosystem around saving Dask DataFrames to Zarr which is currently not possible without loading the full dataset into memory.

This feature specifically handles the case where a DataFrame is of a single type, a xr.Dataset.from_dask_dataframe could be developed in future to handle the multi-type case.

@AyrtonB
Copy link
Contributor Author

AyrtonB commented Dec 7, 2020

During testing I'm currently encountering the issue: ModuleNotFoundError: No module named 'dask'

How should testing of dask DataArrays be approached?

@keewis
Copy link
Collaborator

keewis commented Dec 7, 2020

you can just decorate tests that require dask with requires_dask and they will be skipped automatically if dask is not installed

Edit: actually, you seem to import dask in some modules, which is not what we want. We usually use either the dask_compat module or pycompat.dask_array_type to work around that.

@AyrtonB
Copy link
Contributor Author

AyrtonB commented Dec 7, 2020

Thanks, yes I need to load the library for type-hinting and type checks.

When you say dask_compat is that the same as dask_array_compat? How would I use them instead of Dask, could I use say from dask_compat.dataframe.core import DataFrame as ddf instead of from dask.dataframe.core import DataFrame as ddf?

@keewis
Copy link
Collaborator

keewis commented Dec 7, 2020

sorry, it is indeed called dask_array_compat. Looking closer, you probably won't be able to use that. Instead, I'd advise to do a local import (for an example see Dataset.to_dask_dataframe). For the change in variable.py I would use the same pattern as for pycompat.dask_array_type, so if dask.dataframe is not available dask_dataframe_type should be ().

@pep8speaks
Copy link

pep8speaks commented Dec 7, 2020

Hello @AyrtonB! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:

There are currently no PEP 8 issues detected in this Pull Request. Cheers! 🍻

Comment last updated at 2021-04-12 21:29:29 UTC

@AyrtonB
Copy link
Contributor Author

AyrtonB commented Dec 7, 2020

That makes sense, thanks @keewis

@AyrtonB
Copy link
Contributor Author

AyrtonB commented Dec 7, 2020

I've added dask_dataframe_type = (dask.dataframe.core.DataFrame,) to pycompat but now see: ImportError: cannot import name 'dask_dataframe_type' despite it being in there

@keewis
Copy link
Collaborator

keewis commented Dec 7, 2020

there's a few things to fix in pycompat for this to work: first of all, import dask.dataframe before accessing dask.dataframe.core.DataFrame. We should also move the assignment to dask_dataframe_type to its own try / except block since it's possible to have dask.array but not dask.dataframe installed. And the reason for the ImportError you got is that we need a value for dask_dataframe_type if there was a ImportError. I'm thinking of something like this:

try:
    import dask.dataframe

    dask_dataframe_type = (dask.dataframe.core.DataFrame,)
except ImportError:
    dask_dataframe_type = ()

Copy link
Collaborator

@keewis keewis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the late reply, @AyrtonB.

This is looks great so far, but I think we should try to copy the pandas conversion functions and move from_dask_dataframe to Dataset (and add DataArray.from_dask_series). That way, we can get rid of the dtype check (see below).

We also need tests in test_dask for this. I think you can look at the tests of the pandas import functions for inspiration, but we should probably make sure we don't accidentally compute (using raise_if_dask_computes).

xarray.DataArray.from_series
pandas.DataFrame.to_xarray
"""
assert len(set(ddf.dtypes)) == 1, "Each variable can include only one data-type"
Copy link
Collaborator

@keewis keewis Jan 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to raise a ValueError (or whatever is appropriate) instead of using assert, which will be skipped if python was asked to run as optimized using -O (I don't know which optimization level, though).

If we move this function to Dataset we can get rid of this check.

@@ -846,6 +859,52 @@ def _dask_finalize(results, func, args, name):
coords = ds._variables
return DataArray(variable, coords, name=name, fastpath=True)

@classmethod
def from_dask_dataframe(cls, ddf, index_name: str = "", columns_name: str = ""):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe set the default to None? Also, although using str is recommended, dimension / coordinate / variable names can be any hashable.

Just for reference, from_dataframe does not support these options (they could be useful, though)

Comment on lines 401 to 412
def compute_delayed_tuple_elements(tuple_):
tuple_ = tuple(
[
elem.compute() if hasattr(elem, "compute") else elem
for elem in tuple_
]
)

return tuple_

shape = compute_delayed_tuple_elements(data.shape)
coords = compute_delayed_tuple_elements(coords)
Copy link
Collaborator

@keewis keewis Jan 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks a lot of the tests (for example, DataArray([1]) fails). The idea here is to add support for unknown chunk sizes, right? Maybe we should leave that to a different PR, or move the calls to compute to from_dask_*? Any thoughts, @shoyer, @dcherian?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we shouldn't do this, it'll compute dask arrays too!

I don't understand why it's necessary either since lengths=True when we call to_dask_array: https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.to_dask_array

Copy link
Collaborator

@keewis keewis Feb 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to compute the shape of the dataframe in a single call without combining them? Combining is not ideal because the columns might have different dtypes, and computing separately means that raise_if_dask_computes detects more than 1 compute.

@martindurant
Copy link
Contributor

Ping, can I please ask what the current status is here?

@AyrtonB
Copy link
Contributor Author

AyrtonB commented Mar 12, 2021

From what I can gather there are more serious back-end considerations needed before this can be progressed.

Personally, I've been monkey-patching this code in which has solved my particular use-case, hopefully it's helpful for yours.

import xarray as xr
import pandas as pd
import numpy as np

import dask.dataframe as dd
from dask.distributed import Client

import numcodecs
from types import ModuleType
from datetime import timedelta

from dask.dataframe.core import DataFrame as ddf
from numbers import Number
from typing import Any, Union, Sequence, Tuple, Mapping, Hashable, Dict, Optional, Set

from xarray.core import dtypes, groupby, rolling, resample, weighted, utils#
from xarray.core.accessor_dt import CombinedDatetimelikeAccessor
from xarray.core.variable import Variable, IndexVariable
from xarray.core.merge import PANDAS_TYPES
from xarray.core.variable import NON_NUMPY_SUPPORTED_ARRAY_TYPES, IS_NEP18_ACTIVE, _maybe_wrap_data, _possibly_convert_objects
from xarray.core.dataarray import _check_data_shape, _infer_coords_and_dims, _extract_indexes_from_coords
from xarray.core.common import ImplementsDatasetReduce, DataWithCoords

def as_compatible_data(data, fastpath=False):
    """Prepare and wrap data to put in a Variable.
    - If data does not have the necessary attributes, convert it to ndarray.
    - If data has dtype=datetime64, ensure that it has ns precision. If it's a
      pandas.Timestamp, convert it to datetime64.
    - If data is already a pandas or xarray object (other than an Index), just
      use the values.
    Finally, wrap it up with an adapter if necessary.
    """
    if fastpath and getattr(data, "ndim", 0) > 0:
        # can't use fastpath (yet) for scalars
        return _maybe_wrap_data(data)

    # *** Start of monkey-patch changes ***
    if isinstance(data, (ddf,)):
        return data.to_dask_array(lengths=True)
    # *** End of monkey-patch changes ***

    if isinstance(data, Variable):
        return data.data

    if isinstance(data, NON_NUMPY_SUPPORTED_ARRAY_TYPES):
        return _maybe_wrap_data(data)
    if isinstance(data, tuple):
        data = utils.to_0d_object_array(data)
    if isinstance(data, pd.Timestamp):
        # TODO: convert, handle datetime objects, too
        data = np.datetime64(data.value, "ns")
    if isinstance(data, timedelta):
        data = np.timedelta64(getattr(data, "value", data), "ns")
    # we don't want nested self-described arrays
    data = getattr(data, "values", data)
    if isinstance(data, np.ma.MaskedArray):
        mask = np.ma.getmaskarray(data)
        if mask.any():
            dtype, fill_value = dtypes.maybe_promote(data.dtype)
            data = np.asarray(data, dtype=dtype)
            data[mask] = fill_value
        else:
            data = np.asarray(data)
    if not isinstance(data, np.ndarray):
        if hasattr(data, "__array_function__"):
            if IS_NEP18_ACTIVE:
                return data
            else:
                raise TypeError(
                    "Got an NumPy-like array type providing the "
                    "__array_function__ protocol but NEP18 is not enabled. "
                    "Check that numpy >= v1.16 and that the environment "
                    'variable "NUMPY_EXPERIMENTAL_ARRAY_FUNCTION" is set to '
                    '"1"'
                )
    # validate whether the data is valid data types.
    data = np.asarray(data)
    if isinstance(data, np.ndarray):
        if data.dtype.kind == "O":
            data = _possibly_convert_objects(data)
        elif data.dtype.kind == "M":
            data = _possibly_convert_objects(data)
        elif data.dtype.kind == "m":
            data = _possibly_convert_objects(data)

    return _maybe_wrap_data(data)

xr.core.variable.as_compatible_data = as_compatible_data

class DataArray(xr.core.dataarray.DataArray):

    _cache: Dict[str, Any]
    _coords: Dict[Any, Variable]
    _indexes: Optional[Dict[Hashable, pd.Index]]
    _name: Optional[Hashable]
    _variable: Variable

    __slots__ = (
        "_cache",
        "_coords",
        "_file_obj",
        "_indexes",
        "_name",
        "_variable"
    )

    _groupby_cls = groupby.DataArrayGroupBy
    _rolling_cls = rolling.DataArrayRolling
    _coarsen_cls = rolling.DataArrayCoarsen
    _resample_cls = resample.DataArrayResample
    _weighted_cls = weighted.DataArrayWeighted

    dt = utils.UncachedAccessor(CombinedDatetimelikeAccessor)

    def __init__(
        self,
        data: Any = dtypes.NA,
        coords: Union[Sequence[Tuple], Mapping[Hashable, Any], None] = None,
        dims: Union[Hashable, Sequence[Hashable], None] = None,
        name: Hashable = None,
        attrs: Mapping = None,
        # internal parameters
        indexes: Dict[Hashable, pd.Index] = None,
        fastpath: bool = False,
    ):
        if fastpath:
            variable = data
            assert dims is None
            assert attrs is None
        else:
            # try to fill in arguments from data if they weren't supplied
            if coords is None:

                if isinstance(data, DataArray):
                    coords = data.coords
                elif isinstance(data, pd.Series):
                    coords = [data.index]
                elif isinstance(data, pd.DataFrame):
                    coords = [data.index, data.columns]
                elif isinstance(data, (pd.Index, IndexVariable)):
                    coords = [data]
                elif isinstance(data, pdcompat.Panel):
                    coords = [data.items, data.major_axis, data.minor_axis]

            if dims is None:
                dims = getattr(data, "dims", getattr(coords, "dims", None))
            if name is None:
                name = getattr(data, "name", None)
            if attrs is None and not isinstance(data, PANDAS_TYPES):
                attrs = getattr(data, "attrs", None)

            # *** Start of monkey-patch changes ***
            def compute_delayed_tuple_elements(tuple_):
                tuple_ = tuple(
                    [
                        elem.compute() if hasattr(elem, "compute") else elem
                        for elem in tuple_
                    ]
                )

                return tuple_

            shape = compute_delayed_tuple_elements(data.shape)
            coords = compute_delayed_tuple_elements(coords)

            data = _check_data_shape(data, coords, dims)
            data = as_compatible_data(data)
            coords, dims = _infer_coords_and_dims(shape, coords, dims)
            # *** End of monkey-patch changes ***

            variable = Variable(dims, data, attrs, fastpath=True)
            indexes = dict(
                _extract_indexes_from_coords(coords)
            )  # needed for to_dataset

        # These fully describe a DataArray
        self._variable = variable
        assert isinstance(coords, dict)
        self._coords = coords
        self._name = name

        # TODO(shoyer): document this argument, once it becomes part of the
        # public interface.
        self._indexes = indexes

        self._file_obj = None

    @classmethod
    def from_dask_dataframe(cls, ddf, index_name: str = "", columns_name: str = ""):
        """Convert a pandas.DataFrame into an xarray.DataArray
        This method will produce a DataArray from a Dask DataFrame.
        Dimensions are loaded into memory but the data itself remains
        a Dask Array. The dataframe you pass can contain only one data-type.
        Parameters
        ----------
        ddf: DataFrame
            Dask DataFrame from which to copy data and indices.
        index_name: str
            Name of the dimension that will be created from the index
        columns_name: str
            Name of the dimension that will be created from the columns
        Returns
        -------
        New DataArray.
        See also
        --------
        xarray.DataSet.from_dataframe
        xarray.DataArray.from_series
        pandas.DataFrame.to_xarray
        """
        assert len(set(ddf.dtypes)) == 1, "Each variable can include only one data-type"

        def extract_dim_name(df, dim="index"):
            if getattr(ddf, dim).name is None:
                getattr(ddf, dim).name = dim

            dim_name = getattr(ddf, dim).name

            return dim_name

        if index_name == "":
            index_name = extract_dim_name(ddf, "index")
        if columns_name == "":
            columns_name = extract_dim_name(ddf, "columns")

        dims = dict.fromkeys([index_name, columns_name], df.shape)
        da = cls(ddf, coords=[ddf.index, ddf.columns], dims=dims)

        return da

xr.core.dataarray.DataArray = DataArray
xr.DataArray = DataArray

def _maybe_chunk(
    name, var, chunks=None, token=None, lock=None, name_prefix="xarray-", overwrite_encoded_chunks=False,
):
    from dask.base import tokenize

    if chunks is not None:
        chunks = {dim: chunks[dim] for dim in var.dims if dim in chunks}
    if var.ndim:
        # when rechunking by different amounts, make sure dask names change
        # by provinding chunks as an input to tokenize.
        # subtle bugs result otherwise. see GH3350
        token2 = tokenize(name, token if token else var._data, chunks)
        name2 = f"{name_prefix}{name}-{token2}"
        var = var.chunk(chunks, name=name2, lock=lock)

        if overwrite_encoded_chunks and var.chunks is not None:
            var.encoding["chunks"] = tuple(x[0] for x in var.chunks)
        return var
    else:
        return var

class Dataset(xr.Dataset):
    """A multi-dimensional, in memory, array database.

    A dataset resembles an in-memory representation of a NetCDF file,
    and consists of variables, coordinates and attributes which
    together form a self describing dataset.

    Dataset implements the mapping interface with keys given by variable
    names and values given by DataArray objects for each variable name.

    One dimensional variables with name equal to their dimension are
    index coordinates used for label based indexing.

    To load data from a file or file-like object, use the `open_dataset`
    function.

    Parameters
    ----------
    data_vars : dict-like, optional
        A mapping from variable names to :py:class:`~xarray.DataArray`
        objects, :py:class:`~xarray.Variable` objects or to tuples of
        the form ``(dims, data[, attrs])`` which can be used as
        arguments to create a new ``Variable``. Each dimension must
        have the same length in all variables in which it appears.

        The following notations are accepted:

        - mapping {var name: DataArray}
        - mapping {var name: Variable}
        - mapping {var name: (dimension name, array-like)}
        - mapping {var name: (tuple of dimension names, array-like)}
        - mapping {dimension name: array-like}
          (it will be automatically moved to coords, see below)

        Each dimension must have the same length in all variables in
        which it appears.
    coords : dict-like, optional
        Another mapping in similar form as the `data_vars` argument,
        except the each item is saved on the dataset as a "coordinate".
        These variables have an associated meaning: they describe
        constant/fixed/independent quantities, unlike the
        varying/measured/dependent quantities that belong in
        `variables`. Coordinates values may be given by 1-dimensional
        arrays or scalars, in which case `dims` do not need to be
        supplied: 1D arrays will be assumed to give index values along
        the dimension with the same name.

        The following notations are accepted:

        - mapping {coord name: DataArray}
        - mapping {coord name: Variable}
        - mapping {coord name: (dimension name, array-like)}
        - mapping {coord name: (tuple of dimension names, array-like)}
        - mapping {dimension name: array-like}
          (the dimension name is implicitly set to be the same as the
          coord name)

        The last notation implies that the coord name is the same as
        the dimension name.

    attrs : dict-like, optional
        Global attributes to save on this dataset.

    Examples
    --------
    Create data:

    >>> np.random.seed(0)
    >>> temperature = 15 + 8 * np.random.randn(2, 2, 3)
    >>> precipitation = 10 * np.random.rand(2, 2, 3)
    >>> lon = [[-99.83, -99.32], [-99.79, -99.23]]
    >>> lat = [[42.25, 42.21], [42.63, 42.59]]
    >>> time = pd.date_range("2014-09-06", periods=3)
    >>> reference_time = pd.Timestamp("2014-09-05")

    Initialize a dataset with multiple dimensions:

    >>> ds = xr.Dataset(
    ...     data_vars=dict(
    ...         temperature=(["x", "y", "time"], temperature),
    ...         precipitation=(["x", "y", "time"], precipitation),
    ...     ),
    ...     coords=dict(
    ...         lon=(["x", "y"], lon),
    ...         lat=(["x", "y"], lat),
    ...         time=time,
    ...         reference_time=reference_time,
    ...     ),
    ...     attrs=dict(description="Weather related data."),
    ... )
    >>> ds
    <xarray.Dataset>
    Dimensions:         (time: 3, x: 2, y: 2)
    Coordinates:
        lon             (x, y) float64 -99.83 -99.32 -99.79 -99.23
        lat             (x, y) float64 42.25 42.21 42.63 42.59
      * time            (time) datetime64[ns] 2014-09-06 2014-09-07 2014-09-08
        reference_time  datetime64[ns] 2014-09-05
    Dimensions without coordinates: x, y
    Data variables:
        temperature     (x, y, time) float64 29.11 18.2 22.83 ... 18.28 16.15 26.63
        precipitation   (x, y, time) float64 5.68 9.256 0.7104 ... 7.992 4.615 7.805
    Attributes:
        description:  Weather related data.

    Find out where the coldest temperature was and what values the
    other variables had:

    >>> ds.isel(ds.temperature.argmin(...))
    <xarray.Dataset>
    Dimensions:         ()
    Coordinates:
        lon             float64 -99.32
        lat             float64 42.21
        time            datetime64[ns] 2014-09-08
        reference_time  datetime64[ns] 2014-09-05
    Data variables:
        temperature     float64 7.182
        precipitation   float64 8.326
    Attributes:
        description:  Weather related data.
    """

    __slots__ = ['foo']

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def chunk(
        self,
        chunks: Union[None, Number, str, Mapping[Hashable, Union[None, Number, str, Tuple[Number, ...]]],] = None,
        name_prefix: str = "xarray-",
        token: str = None,
        lock: bool = False,
    ) -> "Dataset":
        """Coerce all arrays in this dataset into dask arrays with the given
        chunks.

        Non-dask arrays in this dataset will be converted to dask arrays. Dask
        arrays will be rechunked to the given chunk sizes.

        If neither chunks is not provided for one or more dimensions, chunk
        sizes along that dimension will not be updated; non-dask arrays will be
        converted into dask arrays with a single block.

        Parameters
        ----------
        chunks : int, 'auto' or mapping, optional
            Chunk sizes along each dimension, e.g., ``5`` or
            ``{"x": 5, "y": 5}``.
        name_prefix : str, optional
            Prefix for the name of any new dask arrays.
        token : str, optional
            Token uniquely identifying this dataset.
        lock : optional
            Passed on to :py:func:`dask.array.from_array`, if the array is not
            already as dask array.

        Returns
        -------
        chunked : xarray.Dataset
        """

        if isinstance(chunks, (Number, str)):
            chunks = dict.fromkeys(self.dims, chunks)

        if isinstance(chunks, (tuple, list)):
            chunks = dict(zip(self.dims, chunks))

        if chunks is not None:
            bad_dims = chunks.keys() - self.dims.keys()
            if bad_dims:
                raise ValueError("some chunks keys are not dimensions on this " "object: %s" % bad_dims)

        variables = {k: _maybe_chunk(k, v, chunks, token, lock, name_prefix) for k, v in self.variables.items()}
        return self._replace(variables)

xr.core.dataarray.Dataset = Dataset
xr.Dataset = Dataset

@keewis
Copy link
Collaborator

keewis commented Mar 14, 2021

I don't think there is a lot left to decide: we want to keep the conversion logic in from_dask_dataframe and maybe helper functions, and I think we should mirror the pandas integration as close as possible (which means we need a Dataset.from_dask_dataframe and a DataArray.from_dask_series class method).

The only thing I think is left to figure out is how to best compute the chunk sizes with as few computations of dask (as defined by raise_if_dask_computes) as possible.

cc @dcherian

@keewis
Copy link
Collaborator

keewis commented Mar 31, 2021

@pydata/xarray, any opinion on the API design?

@keewis keewis force-pushed the xarray.DataArray.from_dask_dataframe-feature branch from c1f4fba to e545a19 Compare April 12, 2021 19:53
@keewis
Copy link
Collaborator

keewis commented Apr 12, 2021

@AyrtonB, I took the liberty of pushing the changes I had in mind to your branch, using a adapted version of your docstring. The only thing that should be missing is to figure out if it's possible to reduce the number of computes to 2 instead of n_columns + 1.

@keewis
Copy link
Collaborator

keewis commented Apr 12, 2021

this should be ready for review

@shoyer
Copy link
Member

shoyer commented Apr 21, 2021

My main concern is really just if anybody will find this function useful in its current state, with all of the serious performance limitations. I expect conversion from dask data frames to xarray will be much more useful when we support out of core indexing, or can unstuck multiple columns into multidimensional arrays.

@sxwebster
Copy link

sxwebster commented Jan 16, 2023

I'm quite supportive of this effort as it would make raster calculation operations a whole lot more straight forward, not to mention doing things like joins of the dataframe which don't necessarily need to exist with the xarray object if selected columns are pushed back to rioxarray as bands. .

@jsignell
Copy link
Contributor

My understanding is that indexes have come a long way since this PR was last touched. Maybe now is the right time to rewrite this in a way that is more performant?

@dcherian
Copy link
Contributor

indexes have come a long way since this PR was last touched.

We still don't have a lazy / out-of-core index unfortunately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request xarray.Dataset.from_dask_dataframe Ability to Pass Dask Arrays as data in DataArray Creation
8 participants