What is the status of parallel split-apply-combine style of computation

169 views
Skip to first unread message

Noah Brenowitz

unread,
Aug 22, 2017, 2:46:42 PM8/22/17
to xarray
I would like to be able to map some function in a parallel fashion over some subsets of the data. This could either be achieved by a groupby/apply or a wrapper around dask's map_blocks function. Ideally, the inner function would take DataArray objects as an input. However, there are bunch of issues/pull requests that seem relevant to this, and I was having trouble parsing the discussion. Is there any basic support for embarrassingly parallel computations in xarray that are more complicated than simple arithmetic? 

Thanks,
Noah

Stephan Hoyer

unread,
Aug 24, 2017, 1:34:42 AM8/24/17
to xarray
The short answer is that nothing has been added into xarray proper yet. I think this GitHub issue (https://github.com/pydata/xarray/issues/585) is still up to date.

For cases where the inner function takes and returns numpy arrays, there is a good integration point with apply_ufunc (not yet public API). Your email prompted me to finish up this work into a pull request, which will probably make it into a future release: https://github.com/pydata/xarray/pull/1517

Cases where the inner function takes and returns xarray objects are harder. The challenge is that for these cases all the xarray metadata (variable names, dimension names and sizes, index labels, attributes, etc.,) needs to be known in order to construct the result out of dask.array objects. This could either be done by providing some sort of explicit schema (which is rather painful), or could perhaps be inferred from the result of evaluating the inner function on a single block.

I still think this could be a viable option, I just never got around to figuring it out. Pull requests would be welcome.

Best,
Stephan

--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/xarray/267463a1-95c3-44ec-b0d4-1e2da9743c5d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Riley Brady

unread,
Jul 4, 2018, 6:08:08 PM7/4/18
to xarray
Hi all,

Should parallelization now work for groupby() objects with apply_ufunc()? The apply_ufunc documentation suggests so, but I haven't really gotten it to work.

Here is an example gist that attempts to perform a linear regression on 100 grid cells of randomly generated data.

Using .stack(points=['lat', 'lon']).groupby('points').apply(linear_regression) takes 1min on my Macbook (should not be parallelized). Using apply_ufunc on the grouped object (attempt at parallelizing over each "point" or grid cell) takes 1min18s on my quad-core laptop. Although not many cores, I would assume this should speed it up somewhat.

Any help would be appreciated. Apologies if this is redundant -- it seems like it's been a hot topic over past years, but I haven't been able to find any groupby() parallelization examples in the various pull requests on apply_ufunc().

Best,
Riley


On Wednesday, August 23, 2017 at 11:34:42 PM UTC-6, Stephan Hoyer wrote:
The short answer is that nothing has been added into xarray proper yet. I think this GitHub issue (https://github.com/pydata/xarray/issues/585) is still up to date.

For cases where the inner function takes and returns numpy arrays, there is a good integration point with apply_ufunc (not yet public API). Your email prompted me to finish up this work into a pull request, which will probably make it into a future release: https://github.com/pydata/xarray/pull/1517

Cases where the inner function takes and returns xarray objects are harder. The challenge is that for these cases all the xarray metadata (variable names, dimension names and sizes, index labels, attributes, etc.,) needs to be known in order to construct the result out of dask.array objects. This could either be done by providing some sort of explicit schema (which is rather painful), or could perhaps be inferred from the result of evaluating the inner function on a single block.

I still think this could be a viable option, I just never got around to figuring it out. Pull requests would be welcome.

Best,
Stephan
On Tue, Aug 22, 2017 at 11:46 AM Noah Brenowitz <nbr...@gmail.com> wrote:
I would like to be able to map some function in a parallel fashion over some subsets of the data. This could either be achieved by a groupby/apply or a wrapper around dask's map_blocks function. Ideally, the inner function would take DataArray objects as an input. However, there are bunch of issues/pull requests that seem relevant to this, and I was having trouble parsing the discussion. Is there any basic support for embarrassingly parallel computations in xarray that are more complicated than simple arithmetic? 

Thanks,
Noah

--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+un...@googlegroups.com.

Alejandro Salazar Romero

unread,
Jul 5, 2018, 10:52:09 AM7/5/18
to xarray
Hi Riley,

I was trying to achieve the same this past week for interpolation of time series of raster data. In this gist, I put the functions I used for parallelization of the ufunc over a xr.DataArray with chunks using dask. As I understand, the apply_ufunc computes in parallel the function with the dask-array chunks as a np.ndarrays, so you can apply your regression function over the last axis (time) in the np.ndarray, without the need of staking/unstacking.

I hope this helps.

-Alejandro

Stephan Hoyer

unread,
Jul 5, 2018, 3:13:38 PM7/5/18
to xar...@googlegroups.com
I think the short answer is that arrays of size 360 are too small to benefit from parallelization with dask. Dask works best with big chunk sizes (>1e6 elements), in part because each chunk adds additional overload:

For a calculation like linear regression, ideally you would either have very large regression problems (e.e., 1e6 example each) or solve many regression problems at once in a vectorized calculation using NumPy arrays. Then you could add dask on top of it, but I suspect you would be happy enough with performance that you wouldn't even need to bother. I don't know if there are any existing Python packages that do this vectorized calculation of many linear regressions already, but it would not be terribly difficult to code from scratch.

Indeed, in this case you could just use apply_ufunc over the time dimension instead of using groupby. With Alejandro's solution, this requires using np.apply_along_axis() internally, which probably is faster than using xarray's groupby, but slower than using a function written in a vectorized way to handle full numpy arrays. Roughly speaking, many xarray objects are much slower than many numpy arrays which is much slower than a single xarray/numpy object.

Riley Brady

unread,
Jul 9, 2018, 11:26:25 AM7/9/18
to xarray
Thank you Stephan and Alejandro. All of this is very helpful. I see how dask is overkill for not having large chunks, but it's definitely an easy way to get significant speedup out of the box.

Riley
Reply all
Reply to author
Forward
0 new messages