Skip to content

NetCDF: Not a valid ID when trying to retrieve values from Dask array #2305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
edougherty32 opened this issue Jul 23, 2018 · 15 comments
Closed

Comments

@edougherty32
Copy link

edougherty32 commented Jul 23, 2018

Hi, I am attempting to pull values from an xarray dataset to accumulate rainfall at specific times over a large number of dimensions. The dataset, concat_floods_all, is as follows:

<xarray.Dataset>
Dimensions:  (south_north: 1015, west_east: 1359)
Coordinates:
    XLAT     (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
    XLONG    (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
Dimensions without coordinates: south_north, west_east

With 658 variables (all accumulated rainfall at different times over the same domain):

Data variables:
    var0     (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
    var1     (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
...
    var658    (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)

The issue is when I sum all the variables up, using the following:

sum_floods = concat_floods_all.sum(skipna = True, dim='variable').compute()

I get the following error message:

RuntimeError: NetCDF: Not a valid ID

Based on ##1001, I believe this error is due to opening numerous files I search, and then appending them to a list in a for loop (I chose this method over mfdataset, due to combining some files and deleting redundant ones).

var_list = []
### Find files that match start and end year/month of flood (fflood_start_end) and subset based on flood duration

for c, item in enumerate(fflood_start_end):   
glob.glob(os.path.join('/glade2/collections/rda/data/ds612.0/CTRL/**/wrf2d_d01_CTRL_'+var+ '_*')):
        if item in name:
            wrf_match_fflood = xr.open_dataset(name, chunks = {'Time':10})
            # pull only times of floods
            var= wrf_match_fflood[var].sel(Time = slice(date_fflood_st_dt2[c], date_fflood_end_dt2[c]))
            var_list.append(var)

I am wondering how to get the actual 1015x1359 array of values for sum_floods and work around this issue.

Thanks!

@jhamman
Copy link
Member

jhamman commented Jul 24, 2018

@edougherty32 - I'm not exactly sure what your problem is but I have some ideas for you:

  1. open_mfdataset can take a list of filenames in addition to a glob string. You may be able to filter your list of filenames ahead of time and give that to open_mfdataset.
  2. I'm thinking it may be possible to create a time dimension/coordinate in your dataset of length 659. This tends to be a more common workflow. It will also generally play nicer than 600 individual variables.
  3. If you can switch back to open_mfdataset, you may try the autoclose=True flag. This will help with the too many open files.

@edougherty32
Copy link
Author

@jhamman–Thanks for the suggestions, I'll definitely try some of these out.

In regards to #2, I'm guessing that involves defining a new xarray in which a new coordinate is defined using variables 0–658?

@jhamman
Copy link
Member

jhamman commented Jul 24, 2018

In regards to #2, I'm guessing that involves defining a new xarray in which a new coordinate is defined using variables 0–658?

Yes. My reading of your comment above made me think that each variable corresponded to a specific event/time. If that is the case, you could populate your coordinate values with the corresponding time stamps.

@edougherty32
Copy link
Author

Thanks, @jhamman! I attempted using open_mfdataset, but that will not work for what I need to do with my data, even with filtering filenames ahead of time.

How would I go about defining a new coordinate and populating with corresponding time stamps? Would that involve using Dataset.set_coords() to create a new dataset with a coordinate of length 659 and using the data from the 659 variables?

I am still worried that will not work, since I am still receiving the same error message as before that will not load data from dask arrays into numpy arrays, as shown by this test:

print(concat_floods_all[0].values())

Which results in the following error message:

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-49-a698f5985450> in <module>()
----> 1 print(eu_list_combine[0].values)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/dataarray.py in values(self)
    403     def values(self):
    404         """The array's data as a numpy.ndarray"""
--> 405         return self.variable.values
    406 
    407     @values.setter

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/variable.py in values(self)
    385     def values(self):
    386         """The variable's data as a numpy.ndarray"""
--> 387         return _as_array_or_item(self._data)
    388 
    389     @values.setter

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/variable.py in _as_array_or_item(data)
    209     TODO: remove this (replace with np.asarray) once these issues are fixed
    210     """
--> 211     data = np.asarray(data)
    212     if data.ndim == 0:
    213         if data.dtype.kind == 'M':

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1190 
   1191     def __array__(self, dtype=None, **kwargs):
-> 1192         x = self.compute()
   1193         if dtype and x.dtype != dtype:
   1194             x = x.astype(dtype)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    152         dask.base.compute
    153         """
--> 154         (result,) = compute(self, traverse=False, **kwargs)
    155         return result
    156 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    405     keys = [x.__dask_keys__() for x in collections]
    406     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407     results = get(dsk, keys, **kwargs)
    408     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    409 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2096             try:
   2097                 results = self.gather(packed, asynchronous=asynchronous,
-> 2098                                       direct=direct)
   2099             finally:
   2100                 for f in futures.values():

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1506             return self.sync(self._gather, futures, errors=errors,
   1507                              direct=direct, local_worker=local_worker,
-> 1508                              asynchronous=asynchronous)
   1509 
   1510     @gen.coroutine

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    613             return future
    614         else:
--> 615             return sync(self.loop, func, *args, **kwargs)
    616 
    617     def __repr__(self):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in f()
    236             yield gen.moment
    237             thread_state.asynchronous = True
--> 238             result[0] = yield make_coro()
    239         except Exception as exc:
    240             error[0] = sys.exc_info()

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1383                             six.reraise(type(exception),
   1384                                         exception,
-> 1385                                         traceback)
   1386                     if errors == 'skip':
   1387                         bad_keys.add(key)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/array/core.py in getter()
     74         c = a[b]
     75         if asarray:
---> 76             c = np.asarray(c)
     77     finally:
     78         if lock:

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/numpy/core/numeric.py in asarray()
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/indexing.py in __array__()
    600 
    601     def __array__(self, dtype=None):
--> 602         return np.asarray(self.array, dtype=dtype)
    603 
    604     def __getitem__(self, key):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/numpy/core/numeric.py in asarray()
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/indexing.py in __array__()
    506     def __array__(self, dtype=None):
    507         array = as_indexable(self.array)
--> 508         return np.asarray(array[self.key], dtype=None)
    509 
    510     def transpose(self, order):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/backends/netCDF4_.py in __getitem__()
     64         with self.datastore.ensure_open(autoclose=True):
     65             try:
---> 66                 array = getitem(self.get_array(), key.tuple)
     67             except IndexError:
     68                 # Catch IndexError in netCDF4 and return a more informative

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Dimension.__len__()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._ensure_nc_success()

RuntimeError: NetCDF: Not a valid ID
Future exception was never retrieved
future: <Future finished exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py", line 1357, in wait
    raise AllExit()
distributed.client.AllExit
Future exception was never retrieved
future: <Future finished exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py", line 1357, in wait
    raise AllExit()
distributed.client.AllExit
Future exception was never retrieved
future: <Future finished exception=AllExit()>
Traceback (most recent call last):
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/glade/u/home/doughert/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py", line 1357, in wait
    raise AllExit()
distributed.client.AllExit

I only get this error message when opening the files in a loop with chunking (which I need to do for efficiency and methodological purposes):

### Get subset of files based on flood duration 
var_list = []

### Find files that match start and end year/month of flood and subset based on flood duration
for c, item in enumerate(fflood_start_end_Ym2_unique):   
    for name in glob.glob(os.path.join('/glade2/collections/rda/data/ds612.0/CTRL/**/wrf2d_d01_CTRL_'+var+ '_*')):
        if item in name:
            wrf_match_fflood = xr.open_dataset(name, chunks = {'Time':10}, lock=False)
            # pull only times of floods
            var= wrf_match_fflood[var].sel(Time = slice(date_fflood_st_unique[c], date_fflood_end_unique[c]))
            var_list.append(var)

Again, I'm not sure how to get around these issues in my current framework, so please let me know if you have any more suggestions!

@shoyer
Copy link
Member

shoyer commented Jul 30, 2018

If you're interested in testing out development versions of xarray, there's a decent chance that this pull request will fix this issue:
#2261

I would be curious to know if this works.

@edougherty32
Copy link
Author

@shoyer– Thanks, I'll try this out and let you know how it works.

@edougherty32
Copy link
Author

@shoyer–Sorry for my own ignorance, but how do I implement the xarray.backends.file_manager within the framework of my code? Do I need to download the .py files to my directory and call them in my own script?

The functionality looks promising, but I admit that the use of these tools is somewhat new to me and I would appreicate any additional guidance. Thanks!

@jhamman
Copy link
Member

jhamman commented Jul 31, 2018

@edougherty32 - from inside your environment (conda or virtual env), you'll want to run something like:

source activate pangeo
pip install --upgrade --no-deps git+git://github.com/shoyer/xarray@file-manager

@edougherty32
Copy link
Author

Thanks @jhamman!

Then to implement this in my script, should I follow something like that xarray/tests/test_backends_file_manager.py under #2261 using the test_file_manager_write_consecutive?

Again, I apologize for being new to this.

@jhamman
Copy link
Member

jhamman commented Jul 31, 2018

@edougherty32 - I think using @shoyer's branch (as installed using the pip command above), you should just try rerunning your failing example. The pip command above should update your version of xarray to #2261.

@edougherty32
Copy link
Author

Great, thanks @jhamman!

@edougherty32
Copy link
Author

Hi @jhamman and @shoyer–updating my version of xarray to ##2261 mostly solved the issue I mentioned above!

However, I am now having a new issue when trying to access values from a dataarray,
concat_floods_all_ar, with the following dimensions:

<xarray.DataArray 'stack-10cc15f05bda9db8e96c68120fb29f3a' (variable: 658, south_north: 1015, west_east: 1359)>
dask.array<shape=(658, 1015, 1359), dtype=float32, chunksize=(1, 1015, 1359)>
Coordinates:
    XLAT      (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
    XLONG     (south_north, west_east) float32 dask.array<shape=(1015, 1359), chunksize=(1015, 1359)>
  * variable  (variable) <U6 'var0' 'var1' 'var2' ... 'var655' 'var656' 'var657'
Dimensions without coordinates: south_north, west_east

Where each variable is accumulated precipiation over the U.S. for a particular flood case.

When I access the first variable, var0 values:
print(var_val_conus[0].values)

I receive a numpy array, as expected (thus solving the issue from above).

 [nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]
 [nan nan nan ... nan nan nan]]

Yet, when I try to access other variables, I receive the following error message:
print(var_val_conus[542].values)

---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-125-cc538b653f54> in <module>()
----> 1 test = concat_floods_all_ar[542].values
      2 print(np.shape(test))

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/dataarray.py in values(self)
    403     def values(self):
    404         """The array's data as a numpy.ndarray"""
--> 405         return self.variable.values
    406 
    407     @values.setter

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/variable.py in values(self)
    385     def values(self):
    386         """The variable's data as a numpy.ndarray"""
--> 387         return _as_array_or_item(self._data)
    388 
    389     @values.setter

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/xarray/core/variable.py in _as_array_or_item(data)
    209     TODO: remove this (replace with np.asarray) once these issues are fixed
    210     """
--> 211     data = np.asarray(data)
    212     if data.ndim == 0:
    213         if data.dtype.kind == 'M':

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/numpy/core/numeric.py in asarray(a, dtype, order)
    490 
    491     """
--> 492     return array(a, dtype, copy=False, order=order)
    493 
    494 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1190 
   1191     def __array__(self, dtype=None, **kwargs):
-> 1192         x = self.compute()
   1193         if dtype and x.dtype != dtype:
   1194             x = x.astype(dtype)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
    152         dask.base.compute
    153         """
--> 154         (result,) = compute(self, traverse=False, **kwargs)
    155         return result
    156 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
    405     keys = [x.__dask_keys__() for x in collections]
    406     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 407     results = get(dsk, keys, **kwargs)
    408     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    409 

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, **kwargs)
   2096             try:
   2097                 results = self.gather(packed, asynchronous=asynchronous,
-> 2098                                       direct=direct)
   2099             finally:
   2100                 for f in futures.values():

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1506             return self.sync(self._gather, futures, errors=errors,
   1507                              direct=direct, local_worker=local_worker,
-> 1508                              asynchronous=asynchronous)
   1509 
   1510     @gen.coroutine

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    613             return future
    614         else:
--> 615             return sync(self.loop, func, *args, **kwargs)
    616 
    617     def __repr__(self):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/utils.py in f()
    236             yield gen.moment
    237             thread_state.asynchronous = True
--> 238             result[0] = yield make_coro()
    239         except Exception as exc:
    240             error[0] = sys.exc_info()

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1097 
   1098                     try:
-> 1099                         value = future.result()
   1100                     except Exception:
   1101                         self.had_exception = True

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/gen.py in run(self)
   1105                     if exc_info is not None:
   1106                         try:
-> 1107                             yielded = self.gen.throw(*exc_info)
   1108                         finally:
   1109                             # Break up a reference to itself

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1383                             six.reraise(type(exception),
   1384                                         exception,
-> 1385                                         traceback)
   1386                     if errors == 'skip':
   1387                         bad_keys.add(key)

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/buffer.py in __setitem__()
     80             self.fast[key] = value
     81         else:
---> 82             self.slow[key] = value
     83 
     84     def __delitem__(self, key):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/func.py in __setitem__()
     40 
     41     def __setitem__(self, key, value):
---> 42         self.d[key] = self.dump(value)
     43 
     44     def __contains__(self, key):

~/miniconda3/envs/pangeo/lib/python3.6/site-packages/zict/file.py in __setitem__()
     78 
     79     def __setitem__(self, key, value):
---> 80         with open(os.path.join(self.directory, _safe_key(key)), 'wb') as f:
     81             if isinstance(value, (tuple, list)):
     82                 for v in value:

FileNotFoundError: [Errno 2] No such file or directory: '/gpfs/fs1/work/doughert/flooddata/dask-worker-space/worker-ndjdgimu/storage/%28%27open_dataset-getitem-05742037c2598486787e2b5accee36e9%27%2C%200%2C%200%2C%200%29'

This only happens for variables for which I previously utilized the
xarray.Dataset.combine_first()
function to combine two consecutive files that contains information on the same flood (files are grouped in 3 month increments, so it's possible that a flood starting at the end of May ends at the beginning of July, which is in a different file).

Is there a work-around on this new issue? Thanks.

@shoyer
Copy link
Member

shoyer commented Aug 1, 2018

Thanks for testing it out!

That pull request still needs a bit of work with dask.distributed -- it's own tests are still failing. When we get that working, it will probably be ready for another test.

@edougherty32
Copy link
Author

edougherty32 commented Aug 2, 2018

@shoyer–No problem!

Ok, thanks for letting me know! Do you know when that pull request would be working for another test? No rush, but I'm just curious. Thanks again for all the help!

@shoyer
Copy link
Member

shoyer commented Aug 20, 2018

@edougherty32 This took a while, but I think #2261 is ready for another test now.

@jhamman jhamman closed this as completed Jan 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants