dataset.mean(), dask and performance

641 views
Skip to first unread message

gai...@stcorp.no

unread,
Jan 18, 2017, 9:01:31 AM1/18/17
to xarray, devel...@ccitools.org
Hi,

I'm trying to do daily -> monthly data aggregation with xarray, but I'm having performance problems. I would appreciate if anyone can hint on what I'm doing wrong and how I could improve my approach.

Current naive code:

===================
import xarray as xr
from datetime import datetime
import time

t = time.time()

sst = xr.open_mfdataset('/home/ccitbx/Desktop/sst/*.nc', chunks = {'lat':36, 'lon':72, 'time':31}, concat_dim = 'time')
sm = xr.open_mfdataset('/home/ccitbx/Desktop/sm/*.nc', chunks = {'lat':72, 'lon':144, 'time':31}, concat_dim = 'time')

sm_mean = sm.mean('time', keep_attrs=True, skipna=True)
sst_mean = sst.mean('time', keep_attrs=True, skipna=True)

t_dim = xr.DataArray([datetime(2000,1,1)], name='time')

sm_mean_time = xr.concat([sm_mean], t_dim)
sst_mean_time = xr.concat([sst_mean], t_dim)

ops.save_dataset(sm_mean_time, '/home/ccitbx/Desktop/sm_mean.nc')
ops.save_dataset(sst_mean_time, '/home/ccitbx/Desktop/sst_mean.nc')
print('Elapsed time: {}'.format(time.time()-t))
===================

The machine I'm running this on is a Ubuntu 16.04 VM with only 4GB RAM, but the tool is supposed to run on 'entry level' devices, just take longer, which is also why we opted for xarray, as out of core computation should be possible.

Both datasets have a month of data (31 daily files). For the 'sm' dataset each file is around 0.5MB, for the 'sst' dataset each file is around 19MB. Both are lat/lon/time. Both contain multiple netCDF variables, sst has quite high spatial resolution (chunking *1e2), sm is coarser (chunking * 10).

If I run the thing with default chunking (lat, lon, 1), the sm dataset would be done in under a minute, sst dataset would slowly eat away memory until the machine becomes unusable, indicating that the 'sst' dataset is larger than what my memory can hold. I guess that's due to compression, as the files themselves are not that large.

With chunking, the 'sm' dataset would be done in a few minutes, the 'sst' dataset would keep running for I don't know how long, I killed it after an hour, but at least it doesn't eat away memory and the machine stays usable.

I remember reading somewhere that methods like Dataset.mean, along with .apply and .groupby don't play nicely with dask. But I'm not even sure if that's my issue here. Or if it is, what would be the best way to solve it. I guess I could read in the dataset file by file and then do the mean manually, but that partly defeats the purpose of using xarray in the first place.

I could of course subset the dataset to smaller temporal extents, but I can't, as the tool I'm working with should be able to aggregate global datasets at original resolution.

Any help on what I'm doing wrong and how I could possibly approach this would be appreciated!

Best regards,
Janis

Ryan Abernathey

unread,
Jan 18, 2017, 10:46:09 AM1/18/17
to xar...@googlegroups.com
Janis,

The workflow you are describing should definitely perform well with dask. It is one of the most standard use cases. Furthermore, it should work fine with the default chunking, as long as you can hold a few chunks in memory, which (at 19MB) you definitely can. The calculation should definitely not accumulate more and more memory. .mean should play very well with dask (although groupby does not, but that's not relevant here).

I can't immediately see what is wrong with your code. My one question is about the call to "ops.save_dataset". What is the "ops" module? This is not part of the xarray public api. The recommended way to save datasets is with the .to_netcdf() method. Could you try with that? (I doubt this will make a difference.)

You could also try calling sm_mean_time.load() directly. Do you still get the same performance problems?

-Ryan


--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/xarray/744f25ab-0648-45f7-b24d-2e88f8c91349%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

gai...@stcorp.no

unread,
Jan 18, 2017, 11:33:40 AM1/18/17
to xarray
Ryan,

thank you for the quick answer! The 'ops' module comes from here (https://github.com/CCI-Tools/cate-core), the save_dataset function for netCDF files is just a thin wrapper around Dataset.to_netcdf. I tried with the native 'to_netcdf' with default chunking and the results were the same as previously. The 'sm' dataset (small one) finished quickly, while the 'sst' one eventually brought the system to a standstill.

Right now I'm trying to use the .load explicitly (with default chunking). The 'sm' dataset finished quickly as expected, the 'sst' dataset does not kill the machine, but .load on 'sst' is now running for 30 minutes.

Out of interest I did 'ds.nbytes * (2 ** -30)' trick from (https://www.continuum.io/content/xray-dask-out-core-labeled-arrays-python) to try to estimate how much data it actually is. The output is slightly surprising.

For the small 'sm' dataset it's 1.915, for the 'sst' dataset it's 29.935. I don't pretend to understand how a dataset loaded from a 0.5GB large directory becomes a 30GB dataset in memory. I guess it would explain the performance issues I'm seeing though.

I'd really appreciate any other comments you or anyone else might have!

Regards,
Janis

To unsubscribe from this group and stop receiving emails from it, send an email to xarray+un...@googlegroups.com.

Stephan Hoyer

unread,
Jan 18, 2017, 2:00:58 PM1/18/17
to xarray
Can you lookup how/if the variables are chunked in the file? You can find this under ds['variable_name'].encoding after loading one of the individual netCDF files in `ds`. One possible explanation for the poor performance here is that you are using chunks with dask that do not line up well with the chunking in the file.

dask.array uses tree-reductions for aggregations like mean, which duplicates arrays in memory a few times for enhanced parallelism. The default value for split_every is 4. You might try setting it lower, e.g., try dask.set_options(split_every=1):

To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.

To post to this group, send email to xar...@googlegroups.com.

gai...@stcorp.no

unread,
Jan 19, 2017, 3:17:37 PM1/19/17
to xarray
Hi Stephan,

thank you for your answer! Sorry for not replying sooner, I wanted to try to investigate this a bit further to be able to come back with more meaningful information about the problem I'm facing.

I tried your suggestions. The chunksize differs between variables in the dataset, it's either one quarter or one third of lat/lon resolution. So (1,1800,900), or (1, 2400, 1200) for the ones I checked. Is there a good heuristic to choose dask chunk size for the whole dataset if the chunksize differs between variables?

I also tried changing the split_every setting. Didn't do much.

However, then I realized that I only have something like 9GB of free disk space on that machine. While checking available disk size using 'df' it didn't seem like it's being eaten away, but I increased it to having 50GB free disk space anyway. This then solved the problem of the VM becoming very sluggish while running the script, and it managed to get far enough to end in an error. To this end. Is there a rule of thumb regarding how much free disk space one should have when working with xarray depending on dataset size?

So then, the error I got was this:

Traceback (most recent call last):
  File "daily_sbox.py", line 47, in <module>
    sst_mean_time.to_netcdf('/home/ccitbx/Desktop/sst_mean.nc')
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 782, in to_netcdf
    engine=engine, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/api.py", line 354, in to_netcdf
    dataset.dump_to_store(store, sync=sync, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 730, in dump_to_store
    store.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/netCDF4_.py", line 289, in sync
    super(NetCDF4DataStore, self).sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 192, in sync
    self.writer.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 171, in sync
    da.store(self.sources, self.targets)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/core.py", line 712, in store
    Array._get(dsk, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/base.py", line 43, in _get
    return get(dsk2, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/threaded.py", line 57, in get
    **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 481, in get_async
    raise(remote_exception(res, tb))
dask.async.MemoryError: 

Traceback
---------
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/reductions.py", line 212, in mean_chunk
    total = sum(x, dtype=dtype, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/lib/nanfunctions.py", line 513, in nansum
    return np.sum(a, axis=axis, dtype=dtype, out=out, keepdims=keepdims)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/core/fromnumeric.py", line 1835, in sum
    out=out, keepdims=keepdims)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/core/_methods.py", line 32, in _sum
    return umr_sum(a, axis, dtype, out, keepdims)

So, obviously a memory problem. I ran it again, this time monitoring with 'top'. Peak memory consumption of the process was 48% (out of 4GB). I got again a Memory error, but a slightly different stack trace:

Traceback (most recent call last):
  File "daily_sbox.py", line 47, in <module>
    sst_mean_time.to_netcdf('/home/ccitbx/Desktop/sst_mean.nc')
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 782, in to_netcdf
    engine=engine, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/api.py", line 354, in to_netcdf
    dataset.dump_to_store(store, sync=sync, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 730, in dump_to_store
    store.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/netCDF4_.py", line 289, in sync
    super(NetCDF4DataStore, self).sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 192, in sync
    self.writer.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 171, in sync
    da.store(self.sources, self.targets)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/core.py", line 712, in store
    Array._get(dsk, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/base.py", line 43, in _get
    return get(dsk2, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/threaded.py", line 57, in get
    **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 481, in get_async
    raise(remote_exception(res, tb))
dask.async.MemoryError: 

Traceback
---------
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/reductions.py", line 214, in mean_chunk
    dtype=[('total', total.dtype), ('n', n.dtype)])

So I then tried custom chunking again. The code ran considerably slower. This consumed up to 80% of the memory and ended in a memory error, again, with a different stack trace:

Traceback (most recent call last):
  File "daily_sbox.py", line 47, in <module>
    sst_mean_time.to_netcdf('/home/ccitbx/Desktop/sst_mean.nc')
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 782, in to_netcdf
    engine=engine, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/api.py", line 354, in to_netcdf
    dataset.dump_to_store(store, sync=sync, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 730, in dump_to_store
    store.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/netCDF4_.py", line 289, in sync
    super(NetCDF4DataStore, self).sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 192, in sync
    self.writer.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 171, in sync
    da.store(self.sources, self.targets)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/core.py", line 712, in store
    Array._get(dsk, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/base.py", line 42, in _get
    dsk2 = cls._optimize(dsk, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/optimization.py", line 24, in optimize
    dsk2 = cull(dsk, keys)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/optimize.py", line 39, in cull
    seen.update(nxt)
MemoryError

And the last thing I tried was again with default chunking, setting split_every=1 again. Consumed up to 48% of memory and ended in a stack trace:

Traceback (most recent call last):
  File "daily_sbox.py", line 47, in <module>
    sst_mean_time.to_netcdf('/home/ccitbx/Desktop/sst_mean.nc')
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 782, in to_netcdf
    engine=engine, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/api.py", line 354, in to_netcdf
    dataset.dump_to_store(store, sync=sync, encoding=encoding)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/core/dataset.py", line 730, in dump_to_store
    store.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/netCDF4_.py", line 289, in sync
    super(NetCDF4DataStore, self).sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 192, in sync
    self.writer.sync()
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/xarray-0.8.2-py3.5.egg/xarray/backends/common.py", line 171, in sync
    da.store(self.sources, self.targets)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/core.py", line 712, in store
    Array._get(dsk, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/base.py", line 43, in _get
    return get(dsk2, keys, **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/threaded.py", line 57, in get
    **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 481, in get_async
    raise(remote_exception(res, tb))
dask.async.MemoryError: 

Traceback
---------
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 264, in execute_task
    result = _execute_task(task, data)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/async.py", line 246, in _execute_task
    return func(*args2)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/toolz/functoolz.py", line 381, in __call__
    ret = f(ret)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/dask/array/reductions.py", line 221, in mean_combine
    n = sum(pair['n'], **kwargs)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/lib/nanfunctions.py", line 513, in nansum
    return np.sum(a, axis=axis, dtype=dtype, out=out, keepdims=keepdims)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/core/fromnumeric.py", line 1835, in sum
    out=out, keepdims=keepdims)
  File "/home/ccitbx/miniconda3/envs/ect_env/lib/python3.5/site-packages/numpy/core/_methods.py", line 32, in _sum
    return umr_sum(a, axis, dtype, out, keepdims)


Sorry for all the stack traces. So now I don't really have any ideas left! I would appreciate any comments you might have. This is also not really just about this particular case. We're using xarray as a backend for a tool that we're working on. It has worked well up until now, but eventually we will have to deal with larger datasets, maybe ten years of monthly data, a few years of daily data, so I would really like to understand what's happening and to be able to solve it, as this will not be the only time we'll have to deal with something like this!

If anyone has the interest to look into this more into depth and has necessary time to allocate, I put the data, as well as the script I was using here:

Thanks for the insights you've given already, xarray has a really nice community!

Best,
Janis



Stephan Hoyer

unread,
Jan 19, 2017, 4:10:46 PM1/19/17
to xarray
On Thu, Jan 19, 2017 at 12:17 PM, <gai...@stcorp.no> wrote:
Hi Stephan,

thank you for your answer! Sorry for not replying sooner, I wanted to try to investigate this a bit further to be able to come back with more meaningful information about the problem I'm facing.

I tried your suggestions. The chunksize differs between variables in the dataset, it's either one quarter or one third of lat/lon resolution. So (1,1800,900), or (1, 2400, 1200) for the ones I checked. Is there a good heuristic to choose dask chunk size for the whole dataset if the chunksize differs between variables?

Generally, a dask chunk should consist of multiple chunks on disk (adjust the number to optimize performance), and chunks on disk should only fall into exactly one dask chunk.

If the same chunk on disk falls into multiple dask chunks, performance will be very slow because you need to read and uncompress the same data multiple times to fill up different dask chunks.  I suspect that is what is happening here -- your dask chunking of 36x72 in space requires 625 fold redundant reads of the data from disk.

In your case, I might leave the arrays divided into a single chunk along the time axis, with no chunking across space (e.g., chunks={'time': 1} in xarray). For calculating simple statistics such as the mean across time, this should work fine. If you really do need to put load all times at once, you will need to rewrite the netCDF files to use a different chunking scheme that is better aligned with your access patterns.
 
I also tried changing the split_every setting. Didn't do much.

If memory usage is still a concern, try adjusting this again with the differently aligned chunks. 

However, then I realized that I only have something like 9GB of free disk space on that machine. While checking available disk size using 'df' it didn't seem like it's being eaten away, but I increased it to having 50GB free disk space anyway. This then solved the problem of the VM becoming very sluggish while running the script, and it managed to get far enough to end in an error. To this end. Is there a rule of thumb regarding how much free disk space one should have when working with xarray depending on dataset size?

I think this is just a symptom of running out of memory -- your computer is trying to page to disk (which is extremely slow).

gai...@stcorp.no

unread,
Jan 31, 2017, 6:41:55 AM1/31/17
to xarray
Hi Stephan et al.,

I just wanted to give a quick update on how I've been doing with this.

Yes, it turns out that the particular dataset had insane compression level, a single file of 16MB expands into a 1GB in-memory thing. I verified this by (eventually) running the aggregation I wanted to do and saving the result. Without compression a single file is ~1GB. This is why the low-memory machine hiccups, having a single time-slice (1 file) as a dask chunk is too large. The initial chunking I chose was too fine though, so it would run into performance problems due to redundant disk access, as you mentioned.

Reading the dataset with chunking (2x2) seems to be working well enough, the operation takes time, but memory usage stays in check.

The takeaway is, we just have to be a bit smart about opening our datasets making sure to check how large the uncompressed data would be and use some chunking heuristics to determine appropriate dask chunk size.

Thanks, this was super useful!

Janis 
Reply all
Reply to author
Forward
0 new messages