Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/requirements-py27-cdat+pynio.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
- pathlib2
- pynio
- pytest
- mock
- scipy
- seaborn
- toolz
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py27-min.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ name: test_env
dependencies:
- python=2.7
- pytest
- mock
- numpy==1.11
- pandas==0.18.0
- pip:
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py27-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- netcdf4
- pathlib2
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py34.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ dependencies:
- python=3.4
- bottleneck
- pytest
- mock
- pandas
- pip:
- coveralls
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py35.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-bottleneck-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-condaforge-rc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- seaborn
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-dask-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- seaborn
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-netcdf4-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- h5netcdf
- matplotlib
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-pandas-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- scipy
- toolz
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-py36.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies:
- matplotlib
- netcdf4
- pytest
- mock
- numpy
- pandas
- scipy
Expand Down
4 changes: 4 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ Enhancements
other means (:issue:`1459`).
By `Ryan May <https://github.com/dopplershift>`_.

- Support passing keyword arguments to ``load``, ``compute``, and ``persist``
methods. Any keyword arguments supplied to these methods are passed on to
the corresponding dask function (:issue:`1523`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Encoding attributes are now preserved when xarray objects are concatenated.
The encoding is copied from the first object (:issue:`1297`).
By `Joe Hamman <https://github.com/jhamman>`_ and
Expand Down
39 changes: 33 additions & 6 deletions xarray/core/dataarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,22 +565,31 @@ def reset_coords(self, names=None, drop=False, inplace=False):
dataset[self.name] = self.variable
return dataset

def load(self):
def load(self, **kwargs):
"""Manually trigger loading of this array's data from disk or a
remote source into memory and return this array.

Normally, it should not be necessary to call this method in user code,
because all xarray functions should either work on deferred data or
load data automatically. However, this method can be necessary when
working with many file objects on disk.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
ds = self._to_temp_dataset().load()
ds = self._to_temp_dataset().load(**kwargs)
new = self._from_temp_dataset(ds)
self._variable = new._variable
self._coords = new._coords
return self

def compute(self):
def compute(self, **kwargs):
"""Manually trigger loading of this array's data from disk or a
remote source into memory and return a new array. The original is
left unaltered.
Expand All @@ -589,18 +598,36 @@ def compute(self):
because all xarray functions should either work on deferred data or
load data automatically. However, this method can be necessary when
working with many file objects on disk.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
new = self.copy(deep=False)
return new.load()
return new.load(**kwargs)

def persist(self):
def persist(self, **kwargs):
""" Trigger computation in constituent dask arrays

This keeps them as dask arrays but encourages them to keep data in
memory. This is particularly useful when on a distributed machine.
When on a single machine consider using ``.compute()`` instead.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.persist``.

See Also
--------
dask.persist
"""
ds = self._to_temp_dataset().persist()
ds = self._to_temp_dataset().persist(**kwargs)
return self._from_temp_dataset(ds)

def copy(self, deep=True):
Expand Down
43 changes: 35 additions & 8 deletions xarray/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,23 @@ def sizes(self):
"""
return self.dims

def load(self):
def load(self, **kwargs):
"""Manually trigger loading of this dataset's data from disk or a
remote source into memory and return this dataset.

Normally, it should not be necessary to call this method in user code,
because all xarray functions should either work on deferred data or
load data automatically. However, this method can be necessary when
working with many file objects on disk.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
# access .data to coerce everything to numpy or dask arrays
lazy_data = {k: v._data for k, v in self.variables.items()
Expand All @@ -461,7 +470,7 @@ def load(self):
import dask.array as da

# evaluate all the dask arrays simultaneously
evaluated_data = da.compute(*lazy_data.values())
evaluated_data = da.compute(*lazy_data.values(), **kwargs)

for k, data in zip(lazy_data, evaluated_data):
self.variables[k].data = data
Expand All @@ -473,7 +482,7 @@ def load(self):

return self

def compute(self):
def compute(self, **kwargs):
"""Manually trigger loading of this dataset's data from disk or a
remote source into memory and return a new dataset. The original is
left unaltered.
Expand All @@ -482,11 +491,20 @@ def compute(self):
because all xarray functions should either work on deferred data or
load data automatically. However, this method can be necessary when
working with many file objects on disk.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
new = self.copy(deep=False)
return new.load()
return new.load(**kwargs)

def _persist_inplace(self):
def _persist_inplace(self, **kwargs):
""" Persist all Dask arrays in memory """
# access .data to coerce everything to numpy or dask arrays
lazy_data = {k: v._data for k, v in self.variables.items()
Expand All @@ -495,24 +513,33 @@ def _persist_inplace(self):
import dask

# evaluate all the dask arrays simultaneously
evaluated_data = dask.persist(*lazy_data.values())
evaluated_data = dask.persist(*lazy_data.values(), **kwargs)

for k, data in zip(lazy_data, evaluated_data):
self.variables[k].data = data

return self

def persist(self):
def persist(self, **kwargs):
""" Trigger computation, keeping data as dask arrays

This operation can be used to trigger computation on underlying dask
arrays, similar to ``.compute()``. However this operation keeps the
data as dask arrays. This is particularly useful when using the
dask.distributed scheduler and you want to load a large amount of data
into distributed memory.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.persist``.

See Also
--------
dask.persist
"""
new = self.copy(deep=False)
return new._persist_inplace()
return new._persist_inplace(**kwargs)

@classmethod
def _construct_direct(cls, variables, coord_names, dims=None, attrs=None,
Expand Down
28 changes: 24 additions & 4 deletions xarray/core/variable.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,29 +307,49 @@ def data(self, data):
def _indexable_data(self):
return orthogonally_indexable(self._data)

def load(self):
def load(self, **kwargs):
"""Manually trigger loading of this variable's data from disk or a
remote source into memory and return this variable.

Normally, it should not be necessary to call this method in user code,
because all xarray functions should either work on deferred data or
load data automatically.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
if not isinstance(self._data, np.ndarray):
if isinstance(self._data, dask_array_type):
self._data = np.asarray(self._data.compute(**kwargs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to invoke asarray if dask returns a scalar numpy type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, we don't define what can go in _data as carefully as we ought to. I guess there are two ways to define it:

  • anything "array like" that defines at least shape, dtype and __getitem__
  • what comes out of xarray.core.variable.as_compatible_data

Numpy scalars do actually pass through here (since they define all those attributes!)... but then would get converted into an array when calling .values anyways:

@property
def values(self):
"""The variable's data as a numpy.ndarray"""
return _as_array_or_item(self._data)

So I guess I agree, but on the other hand I'm also a little nervous that a dask routine might return a non-numpy scalar, which would definitely break if we don't wrap it in asarray. The safe thing to do is to leave this as is or call as_compatible_data on it.

elif not isinstance(self._data, np.ndarray):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clause should be removed as it causes inconsistent behaviour with numpy scalar types. I cannot think of any other use case where data is neither a dask ARRAY nor a numpy ndarray?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this allows for on-disk type arrays. @shoyer, any thoughts on calling np.asarray here and in the line above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need this to support on-disk arrays that aren't backed by dask. (I'd love to get rid of this in favor of always using dask, but dask has some limitations that make this tricky.)

self._data = np.asarray(self._data)
return self

def compute(self):
def compute(self, **kwargs):
"""Manually trigger loading of this variable's data from disk or a
remote source into memory and return a new variable. The original is
left unaltered.

Normally, it should not be necessary to call this method in user code,
because all xarray functions should either work on deferred data or
load data automatically.

Parameters
----------
**kwargs : dict
Additional keyword arguments passed on to ``dask.array.compute``.

See Also
--------
dask.array.compute
"""
new = self.copy(deep=False)
return new.load()
return new.load(**kwargs)

@property
def values(self):
Expand Down
Loading