Difficulties using multi-threading and multi-processing and general speed problems.

912 views
Skip to first unread message

Edward

unread,
Nov 12, 2016, 4:00:09 AM11/12/16
to xarray
Hi

I have been having some troubles getting some processing and computations to a reasonable speed and am really struggling to improve them and further. I would be really grateful for your suggestions.

Problems: 
  • Computations seeming to take too long - between 14-27 hrs for one dataset
  • Unsure whether effectively using multi-processing/threading capability of the server (likely not)
  • Also my experiments with chunking have not yielded much.
  • Difficulty in efficiently excluding sections of arrays which are nans to cut out computation time
  • Can't find a way to see the CPU time (eg with %timeit) because using Windows
  • Computations seem to get slower the larger the array
Steps
  1. open_mfdataset - 30 yrs (time: 10950 days), daily hydrological data at 0.5x0.5 degree resolution (lat: 360, lon: 720)
  2. resample to 5-day mean
  3. get annual maximua of the 5 day means  (= total 30 values)
  4. write out to netcdf  
  5. read small dataset back in (this saves  a lot of time)
  6. stack data into 'allpoints' 
  7. Chunk the data along allpoints
  8. apply custom function using groupby().apply()
  9. unstack data
  10. write to netcdf

Hard/software:
  • Windows server
  • Intel 32 cores, 64 logical processors
  • 128 GB ram
  • Data stored on network drive with Gigabit ethernet connection that typically does max 500 Mbits/s read/write
  •  Anaconda 2 Python 2.7
  • xarray 0.82

Previous similar operations using, for example .apply(np.percentiles()), take 5min to 1/2 hour.


Use a scipy.stats fitting function to fit a distribution to an array of ~30 values (the annual maxima) in the time dimension
  • normally use stats.genextreme.fit() - ntakes perhaps 250ms per calculation
  • 360x720 = 259200. Thus 259200*0.5s = 36hrs
  • this function even calculates when the array is all nans and takes the same amount of time. There is no skipna=True option
  • The data in question is only land based so in reality we could/should cut this time in 1/3rd if we can efficiently exclude the nans calculation
  • Have tried including a line to skip nans in the custom .apply() function, but this seems to increase the time, e.g. using np.nansum(x)==0.0

Multi-threading
  • Can't find a way to see the CPU time (eg with %timeit) because using Windows
  • Have had varied success using  dask.set_options(pool=ThreadPool(x))
    • Hardly seems to vary the time whether I set it from 1 to 100,000
  • When I look at Windows Task Manager, throughput on the processors is often <5% and normally at least half are idle
    • It seems all the main computation still occurs on on one processor which goes through periods at 100%, whilst others flash at low values between 1-20%
    • Only half of the processors are ever used at any one time
  • I am a little confused as to whether I should be using multiple threads or processors...:
    • Calculation of a grid cell has no dependency on data/result of another gridcell... we could/should be able to calculate all in parallel
    • I read multi-threading better when using a shared memory space. Would a chunk constitute a shared memory space?
Chunking:
  • Read in/out etc seems to be fine. Found that generally large chunk sizes for the read-in / out work best (e.g. lon: 180)
  • When the dataset is read back in and stacked to allpoints, the data gets chunked , with no perceptible impact on computation

Summary:
  • The computation appears to be the main bottleneck (unless possible it is when the data is stacked?
  • I don't think I am correctly managing to multi-thread. Tried multi-process but this gave pickle errors
  • Processing a 60x120x30 array takes 14 minutes
  • Whole process seems to get slower and slower the larger the array sizes. 
    • With 10x10 array we get ~ 10cells/s throughput when calculating the function, with 100x140 this was down to 5/s
    • And that is excluding the time taken to write out the resampled dataset

CODE:
from datetime import datetime
import numpy as np
import scipy as sp
from scipy.stats import kstest as kst
from scipy.stats import genextreme
from scipy.stats import kstest
import xarray as xr
import dask
from multiprocessing.pool import ThreadPool
dask
.set_options(pool=ThreadPool(64))

def combogev(x):
        mle = genextreme.fit(x,loc=0)
    T = np.array([1,5,10,15,20,25,30,40,50,100])
    sT = genextreme.isf(1./T, mle[0], mle[1], mle[2])
    dssT = xr.DataArray(sT)
    kstmp = kst(x, 'genextreme' , mle)
    return  xr.Dataset(data_vars={'dssT':dssT,'dsk1':kstmp[0],'dsk2':kstmp[1]})


start
= datetime.now()
batstartTime
= datetime.now() # let's time this
print "Batch start time:"
print batstartTime
filechunk1
= 180      #Played around with different chunksizes for the 1st data read-in, compared to 2nd read in of resampled data.
filechunk2
= 180
print filechunk1
print filechunk2

# STEP 1 - 2 SECONDS
ds
= xr.open_mfdataset(listy, chunks={'lon': filechunk1}, concat_dim='time')

# STEPS 2- 12 SECONDS
#ds = ds.sel(lat=slice(16,11), lon=slice(75.25,80.2)) # 10x10
#ds = ds.sel(lat=slice(16,06), lon=slice(75.25,85.2)) # 20x10
ds
= ds.sel(lat=slice(48,18), lon=slice(75.25,135))  #60x120 # almost all land  # <<<<< Lets use this 60x120 slice
#ds = ds.sel(lat=slice(48,18), lon=slice(115.25,175))  #60x120 # mostly sea (nans)


ds
= ds.resample('5D', dim='time', how='mean')                                 # Resample to 5 day mean     Step 2,                             
ds
= ds.dis.resample('AS', dim='time', how='max')                              # Resample to annual maxima, Step 3, 
tstr
= 'N:\\Wat-Data\\ISI-MIP1\\Processed_test\\temp\\temp'+str(GCM)+str(yrs[0])+'1.nc'   # make a str to save the resampled data

#STEP 4 ~2 MINS for 10x10 array, 4 hours for 360x720
ds
.to_dataset().to_netcdf(tstr, format='NETCDF4_CLASSIC')                                      
del ds
#STEP5
ds
=xr.open_dataset(tstr,chunks={'lon': filechunk2})                                                      

#STEPs 6&7
stacked
= ds.stack(allpoints = ['lon', 'lat']).squeeze()
stacked
= stacked.reset_coords(drop=True)
stacked
.chunk({'allpoints':10})    # Doesnt seem to make a difference whether 1 or 1000....

#STEP 8 - the main computation - 2mins for a 10x10 area, 14mins for 60*120, 45 mins for 100x140, 24h+ for larger datasets
mle
= stacked.dis.groupby('allpoints').apply(combogev)


#STEP 9
dsmle
= mle.unstack('allpoints')
#STEP 10
dsmle
.to_netcdf(dstr, format='NETCDF4_CLASSIC', encoding={'dssT':{'zlib': True,'complevel': 5}})


Thanks for all your suggestions and let me know if/how I can provide more info and/or what to try.


Ryan Abernathey

unread,
Nov 12, 2016, 10:15:46 AM11/12/16
to xar...@googlegroups.com
Edward,

We recently added some optimization tips to the docs:
But I see you are already implementing many of those suggestions (e.g. writing intermedia results to disk).

The fundamental problem here is that groupby operations don't actually parallelize. (This is a very common misunderstanding about how xarray works.) So your expensive step 8 will always scale linearly with the length of allpoints (or worse, because of the overhead of re-combining the output of your function).

The developers are working hard on addressing this problem. There are several open issues and pull requests right now which might help your case:

You could check out pull request 964 and see if you get improved performance using the new apply method.

Stephan will probably have some better suggestions...

Cheers,
Ryan





--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/xarray/7ae38b23-a567-4ed3-ab73-a4528bdb64f9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Stephan Hoyer

unread,
Nov 12, 2016, 1:57:30 PM11/12/16
to xarray
On Sat, Nov 12, 2016 at 7:15 AM, Ryan Abernathey <ryan.ab...@gmail.com> wrote:
The fundamental problem here is that groupby operations don't actually parallelize. (This is a very common misunderstanding about how xarray works.) So your expensive step 8 will always scale linearly with the length of allpoints (or worse, because of the overhead of re-combining the output of your function).

Indeed, this probably needs some clarification in the docs.

To be clear: groupby operations work in parallel if you apply a function that *already* is hooked into dask. But they do not make an eager operation like scipy.stats.genextreme.isf automatically lazy. This is tricky to do automatically, for reasons that are outlined in the first GitHub issue Ryan links to.

To make a generic numpy/scipy function lazy, you need to wrap it with dask. Dask has a bunch of tools for doing this, but the ones I would most recommend are either dask.array.map_blocks or dask.array.atop (for cases where you preserve existing axes):

Or dask.array.from_delayed for slightly more complex cases, where you are happy to operate on whole arrays at a time, probably including this one:

Here's how I would try this one:

from scipy.stats import genextreme
from scipy.stats import kstest
from dask import delayed
import dask.array as da
import numpy as np
import xarray as xr

def combogev(x):
    # need to pull out dask array for dask.delayed to work properly
    # (for now, at least until after https://github.com/dask/dask/pull/1068)
    data = x.data
    T = np.array([1,5,10,15,20,25,30,40,50,100])

    # pure=True means this fit will be cached when passed the same values
    mle = delayed(genextreme.fit, pure=True)(data, loc=0)
    # don't use pure=True here, because we want to generate new RNG values 
    sT_value = delayed(genextreme.isf)(1./T, mle[0], mle[1], mle[2])
    kstmp_value = delayed(kstest)(data, 'genextreme', mle)

    # now we need to turn these dask.delayed objects into dask.array objects
    sT = da.from_delayed(sT_value, T.shape, np.float64)
    ks_statistic = da.from_delayed(kstmp_value[0], (), np.float64)
    p_value = da.from_delayed(kstmp_value[1], (), np.float64)

    # put it all back into xarray
    return xr.Dataset({'samples': ('T', sT),
                       'ks_statistic': ks_statistic,
                       'p_value': p_value},
                      coords={'T': T})

Does it work?

In [5]: data = xr.DataArray(25 + 10 * np.random.randn(50, 100000), dims=['x', 'y']).chunk()

# construct the dask graph -- should be fast
In [6]: %time ds = data.groupby('x').apply(combogev)
CPU times: user 121 ms, sys: 2.55 ms, total: 124 ms
Wall time: 123 ms

# do the computation
In [7]: %time ds.load()
/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/scipy/stats/_continuous_distns.py:1776: RuntimeWarning: invalid value encountered in true_divide
  np.sign(c)*(-g3+(g2+2*g2mg12)*g1)/((g2mg12)**(3./2.)))
/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/scipy/stats/_continuous_distns.py:1781: RuntimeWarning: invalid value encountered in true_divide
  (g4+(-4*g3+3*(g2+g2mg12)*g1)*g1)/((g2mg12)**2))
/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/scipy/stats/_distn_infrastructure.py:1543: RuntimeWarning: divide by zero encountered in log
  return log(self._pdf(x, *args))
/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/scipy/optimize/optimize.py:462: RuntimeWarning: invalid value encountered in subtract
  numpy.max(numpy.abs(fsim[0] - fsim[1:])) <= ftol):
CPU times: user 2min 44s, sys: 18.2 s, total: 3min 2s
Wall time: 32.4 s
Out[7]:
<xarray.Dataset>
Dimensions:       (T: 10, x: 50)
Coordinates:
  * T             (T) int64 1 5 10 15 20 25 30 40 50 100
  * x             (x) int64 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 ...
Data variables:
    p_value       (x) float64 0.0 0.0 0.0 0.0 0.0 nan nan 0.0 0.0 0.0 0.0 ...
    ks_statistic  (x) float64 0.7445 0.9173 0.9162 0.9167 0.9876 nan nan ...
    samples       (x, T) float64 -inf 10.0 16.24 20.2 23.17 25.58 27.62 ...

Yes, it works, and we get a nice speedup! My 4 core laptop (with 2x hyper-threading) does 3 minutes worth of computation (600% CPU) in 30 seconds of wall clock time.

Note that this does require a very large number of samples (e.g., 10000+ points per chunk) to see the full CPU utilization -- this is pretty normal for dask in code with lots of Python overhead.

Edward

unread,
Nov 14, 2016, 10:13:56 AM11/14/16
to xarray
Thanks so much Ryan and Stephan for your quick and helpful replies.

I have been trying this out mainly on small datasets and there seem to improvements. 

Stephan's example worked nicely and it was easy to see the process working in parallel on the task manager. Just to note however the example is also slightly different:
  • groupby('x') for the example given calls the function 50 times and runs it with an array of 100,000 values
  • groupby(ý') calls the function 100,000 times with an array of 50 values  - which is more similar to problem I am having and takes quite some more time: 30x10000 took about 12 mins wall time

Thanks for showing how to implement dask into the function. 
Yes, I will also try the new apply function. Linking for other users: https://gist.github.com/rabernat/a0ec6a7e947f2d928615a30f5cb91ee9 

Will update again soon. Cheers!

Edward

unread,
Nov 16, 2016, 6:13:14 AM11/16/16
to xarray
Just want to update that there have been some good improvements so will relate some of what I have learned below as may be helpful for others:

  1. I have split the resampling and groupby apply processes into two completely separate procedures. The resampling in this case is a substantial task that can take in the order of 2 hours. 
    1. I also experimented with saving the file to netcdf inbetween the 2 resampling proceedurs but this was not productive.
    2. it would also appear that in the original code above I had substantial variable self-reassignment (e.g. ds = ds.... ) and it would appear (I don't know how much truth there is in this) that for some lines assigning new variables smoothed things out a bit (e.g. ds1 = ds....; ds2 = ds1....). Both reduced errors and increased speed.
  2. Including the dask in the function is definitely using the multiple cores on the machine as hoped for. Characteristic to this problem is that is does not require loads of computation (very low core use, e.g. ~5%), but needs to be done many thousands of times. This was slightly different to the example Stephan gave above which was computationally expensive (results in 100% core use) but was only done 50 times.
  3. Because of the above fact means that I can run  multiple python consoles all running the same script at the same time and using all the cores, as in the photo. It still takes time, but I think computation time (for one run) has been cut by a factor of about 6 - obviously many more times if I run 10 scripts simultaneously.



Thanks Ryan and Stephan!


Stephan Hoyer

unread,
Nov 16, 2016, 11:47:48 AM11/16/16
to xarray
Thanks for the update.

This appears to be a case where xarray's approach to parallelization with dask breaks down -- you need to parallelize xarray's Python operations, not just the wrapped NumPy/SciPy routines.

It would be nice if we had a better usability story that let you orchestrate all of this from a single Python script (e.g., perhaps using dask-distributed to do the xarray operations in other processes), but we're not there yet.

--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.

Edward

unread,
Nov 21, 2016, 3:00:36 AM11/21/16
to xarray
I appreciate the help. it's definitely working better now, and the resampling only needs to be a one-time process. ;)
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+un...@googlegroups.com.

To post to this group, send email to xar...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages