from datetime import datetimeimport numpy as np
import scipy as sp
from scipy.stats import kstest as kstfrom scipy.stats import genextremefrom scipy.stats import kstestimport xarray as xrimport dask
from multiprocessing.pool import ThreadPool
dask.set_options(pool=ThreadPool(64))
def combogev(x): mle = genextreme.fit(x,loc=0)
T = np.array([1,5,10,15,20,25,30,40,50,100]) sT = genextreme.isf(1./T, mle[0], mle[1], mle[2])
dssT = xr.DataArray(sT) kstmp = kst(x, 'genextreme' , mle) return xr.Dataset(data_vars={'dssT':dssT,'dsk1':kstmp[0],'dsk2':kstmp[1]})
start = datetime.now()
batstartTime = datetime.now() # let's time this
print "Batch start time:"
print batstartTime
filechunk1 = 180 #Played around with different chunksizes for the 1st data read-in, compared to 2nd read in of resampled data.
filechunk2 = 180
print filechunk1
print filechunk2
# STEP 1 - 2 SECONDS
ds = xr.open_mfdataset(listy, chunks={'lon': filechunk1}, concat_dim='time')
# STEPS 2- 12 SECONDS
#ds = ds.sel(lat=slice(16,11), lon=slice(75.25,80.2)) # 10x10
#ds = ds.sel(lat=slice(16,06), lon=slice(75.25,85.2)) # 20x10
ds = ds.sel(lat=slice(48,18), lon=slice(75.25,135)) #60x120 # almost all land # <<<<< Lets use this 60x120 slice
#ds = ds.sel(lat=slice(48,18), lon=slice(115.25,175)) #60x120 # mostly sea (nans)
ds = ds.resample('5D', dim='time', how='mean') # Resample to 5 day mean Step 2,
ds = ds.dis.resample('AS', dim='time', how='max') # Resample to annual maxima, Step 3,
tstr = 'N:\\Wat-Data\\ISI-MIP1\\Processed_test\\temp\\temp'+str(GCM)+str(yrs[0])+'1.nc' # make a str to save the resampled data
#STEP 4 ~2 MINS for 10x10 array, 4 hours for 360x720
ds.to_dataset().to_netcdf(tstr, format='NETCDF4_CLASSIC')
del ds
#STEP5
ds=xr.open_dataset(tstr,chunks={'lon': filechunk2})
#STEPs 6&7
stacked = ds.stack(allpoints = ['lon', 'lat']).squeeze()
stacked = stacked.reset_coords(drop=True)
stacked.chunk({'allpoints':10}) # Doesnt seem to make a difference whether 1 or 1000....
#STEP 8 - the main computation - 2mins for a 10x10 area, 14mins for 60*120, 45 mins for 100x140, 24h+ for larger datasets
mle = stacked.dis.groupby('allpoints').apply(combogev)
#STEP 9
dsmle = mle.unstack('allpoints')
#STEP 10
dsmle.to_netcdf(dstr, format='NETCDF4_CLASSIC', encoding={'dssT':{'zlib': True,'complevel': 5}})
--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/xarray/7ae38b23-a567-4ed3-ab73-a4528bdb64f9%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
The fundamental problem here is that groupby operations don't actually parallelize. (This is a very common misunderstanding about how xarray works.) So your expensive step 8 will always scale linearly with the length of allpoints (or worse, because of the overhead of re-combining the output of your function).
--
You received this message because you are subscribed to the Google Groups "xarray" group.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+unsubscribe@googlegroups.com.
To post to this group, send email to xar...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/xarray/62e13728-ba51-44c4-9901-26dedd2675b7%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to xarray+un...@googlegroups.com.