Skip to content

pass dask compute/persist args through from load/compute/perist #1543

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 5, 2017
Merged

pass dask compute/persist args through from load/compute/perist #1543

merged 10 commits into from
Sep 5, 2017

Conversation

jhamman
Copy link
Member

@jhamman jhamman commented Aug 31, 2017

  • Closes Pass arguments to dask.compute() #1523
  • Tests added / passed
  • Passes git diff upstream/master | flake8 --diff
  • Fully documented, including whats-new.rst for all changes and api.rst for new API

cc @crusaderky - putting this up in case its useful to you.

I'm frankly not sure if my tests are doing what they need to but everything seems to be playing nicely.

if not isinstance(self._data, np.ndarray):
if isinstance(self._data, dask_array_type):
self._data = np.asarray(self._data.compute(**kwargs))
elif not isinstance(self._data, np.ndarray):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clause should be removed as it causes inconsistent behaviour with numpy scalar types. I cannot think of any other use case where data is neither a dask ARRAY nor a numpy ndarray?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this allows for on-disk type arrays. @shoyer, any thoughts on calling np.asarray here and in the line above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need this to support on-disk arrays that aren't backed by dask. (I'd love to get rid of this in favor of always using dask, but dask has some limitations that make this tricky.)

"""
if not isinstance(self._data, np.ndarray):
if isinstance(self._data, dask_array_type):
self._data = np.asarray(self._data.compute(**kwargs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't want to invoke asarray if dask returns a scalar numpy type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, we don't define what can go in _data as carefully as we ought to. I guess there are two ways to define it:

  • anything "array like" that defines at least shape, dtype and __getitem__
  • what comes out of xarray.core.variable.as_compatible_data

Numpy scalars do actually pass through here (since they define all those attributes!)... but then would get converted into an array when calling .values anyways:

@property
def values(self):
"""The variable's data as a numpy.ndarray"""
return _as_array_or_item(self._data)

So I guess I agree, but on the other hand I'm also a little nervous that a dask routine might return a non-numpy scalar, which would definitely break if we don't wrap it in asarray. The safe thing to do is to leave this as is or call as_compatible_data on it.

b4 = b.persist(get=dask.multiprocessing.get, num_workers=4)
assert b4._in_memory
assert_equal(b4, expected)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant with the test below?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, these should have been Variables.

@@ -403,6 +450,7 @@ def kernel():
kernel_call_count += 1
return np.ones(1)


def build_dask_array():
global kernel_call_count
kernel_call_count = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing tests for Variable

@crusaderky
Copy link
Contributor

@jhamman happy to go with your version

if not isinstance(self._data, np.ndarray):
if isinstance(self._data, dask_array_type):
self._data = np.asarray(self._data.compute(**kwargs))
elif not isinstance(self._data, np.ndarray):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need this to support on-disk arrays that aren't backed by dask. (I'd love to get rid of this in favor of always using dask, but dask has some limitations that make this tricky.)

"""
if not isinstance(self._data, np.ndarray):
if isinstance(self._data, dask_array_type):
self._data = np.asarray(self._data.compute(**kwargs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, we don't define what can go in _data as carefully as we ought to. I guess there are two ways to define it:

  • anything "array like" that defines at least shape, dtype and __getitem__
  • what comes out of xarray.core.variable.as_compatible_data

Numpy scalars do actually pass through here (since they define all those attributes!)... but then would get converted into an array when calling .values anyways:

@property
def values(self):
"""The variable's data as a numpy.ndarray"""
return _as_array_or_item(self._data)

So I guess I agree, but on the other hand I'm also a little nervous that a dask routine might return a non-numpy scalar, which would definitely break if we don't wrap it in asarray. The safe thing to do is to leave this as is or call as_compatible_data on it.

expected = Variable('x', [1, 4])
b = a * a
# compute
b1 = b.compute(get=dask.multiprocessing.get)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clean way to test this is probably with mock, e.g.,

In [63]: import numpy as np

In [64]: import dask.array as da

In [65]: from unittest import mock

In [66]: x = da.from_array(np.arange(3), chunks=(2,))

In [67]: with mock.patch.object(da.Array, 'compute', return_value=np.arange(3)) as mock_compute:
    ...:     x.compute(foo='bar')
    ...:

In [68]: mock_compute.assert_called_with(foo='bar')

In [69]: mock_compute.assert_called_with(bar='foo')
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-69-22937cf26ca1> in <module>()
----> 1 mock_compute.assert_called_with(bar='foo')

~/conda/envs/xarray-dev/lib/python3.5/unittest/mock.py in assert_called_with(_mock_self, *args, **kwargs)
    792         if expected != actual:
    793             cause = expected if isinstance(expected, Exception) else None
--> 794             raise AssertionError(_error_message()) from cause
    795
    796

AssertionError: Expected call: compute(bar='foo')
Actual call: compute(foo='bar')

unittest.mock is part of Python 3's standard library, but there's also a widely used Python 2 backport on pypi. I think it would be perfectly fine to add it as a dependency for our test suite.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shoyer, I'll add mock as part of this PR.

@jhamman jhamman mentioned this pull request Sep 5, 2017
13 tasks
Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the docs also need an update to mention that mock is a test-time requirement on Python 2.

with mock.patch.object(Variable, method,
return_value=np.arange(3)) as mock_method:
getattr(x, method)(foo='bar')
mock_method.assert_called_with(foo='bar')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can skip this first test. This verifies that Variable.compute() was called if you directly call Variable.compute().

@@ -4,6 +4,8 @@
import pickle
import numpy as np
import pandas as pd
import pytest
import mock
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On Python 3, use from unittest import mock to avoid adding the new dependency.

@shoyer
Copy link
Member

shoyer commented Sep 5, 2017

Can you also update tests_require in setup.py?

TESTS_REQUIRE = ['pytest >= 2.7.1']


with suppress(ImportError):
import dask
import dask.array as da
import dask.multiprocessing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer used

@jhamman jhamman merged commit 5472fb5 into pydata:master Sep 5, 2017
@jhamman jhamman deleted the feature/dask_compute_args branch September 5, 2017 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants