Read 3D NetCDF4 file in parallel

84 views
Skip to first unread message

David Wang

unread,
Sep 9, 2016, 6:21:58 PM9/9/16
to RBigDataProgramming
Hello pbdRers,

First, I'd like to applaud the developers for their impressive work that brings R to scales.

I currently need to load data from three large netcdf files and to apply SVD to the combined data matrix. Here is the metadata of one such netcdf file:

$ ncdump -h SLP.nc
netcdf SLP {
dimensions:
        Time = 8832 ;
        DateStrLen = 19 ;
        south_north = 283 ;
        west_east = 251 ;
variables:
        char Times(Time, DateStrLen) ;
        float SLP(Time, south_north, west_east) ;
                SLP:_FillValue = 9.96921e+36f ;
                SLP:units = "hPa" ;
                SLP:description = "Sea Level Pressure" ;

Since the size of the data is huge, I'd rather not load everything with processor 0 then distribute. Instead, I wish to let every processor read a portion in parallel, compute SVD with pbdDMAT::svd, and gather the right-singular vectors for output (I don't need the left-singular vectors). I have read the demos at https://github.com/RBigData/pbdDEMO/tree/master/demo but failed to get a clear idea how to process, in particular, to read a 3d data into ddmatrix. When doing SVD, the two spatial dimensions will be combined so that the global matrix will be of 251*283 by 8832. I also have to partition by rows because I need to remove (global) row means prior to SVD.

I have successfully installed pbdNCDF4 and and run the tests.

Can anyone give a pointer or two?

Thanks,
David

David Wang

unread,
Sep 16, 2016, 5:50:30 PM9/16/16
to RBigDataProgramming
Well, it turns out that my netcdf files cannot be read in parallel:

nc.format = NC_FORMAT_NETCDF4_CLASSIC is not for parallel I/O.
Warning: Serial write need more caution in parallel.

Too bad...

Ostrouchov, George

unread,
Sep 16, 2016, 11:25:45 PM9/16/16
to RBigDataProgramming
Use nc_open instead of nc_open_par. If you specify disjoint file chunks on different ranks, it will still read mostly in parallel. It is best to split along the slowest changing dimension so each rank reads a contiguous piece of the file. The _par version adds more coordination and enables parallel writing to one file but requires the newer HDF5 based NetCDF4 format.

But all this helps ONLY if you are working with a parallel file system. If you don’t have a parallel file system, it’s still best to read with one (sometimes two) cores and distribute.

George

--
Programming with Big Data in R
Simplifying Scalability
http://r-pbd.org/
---
You received this message because you are subscribed to the Google Groups "RBigDataProgramming" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rbigdataprogram...@googlegroups.com.
To post to this group, send email to rbigdatap...@googlegroups.com.
Visit this group at https://groups.google.com/group/rbigdataprogramming.
To view this discussion on the web visit https://groups.google.com/d/msgid/rbigdataprogramming/74c5051a-72f6-466d-a5d7-1f08c48a0ca4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

David Wang

unread,
Sep 19, 2016, 12:52:36 PM9/19/16
to RBigDataProgramming
Hello George,

Thanks for the response. How do I read with more than one cores and distribute when I use nc_open? I realized that I don't really need parallel reading but simply each rank taking a chunk so that process 0 won't need to have a huge amount of available memory. The code that I put together below seems to still read chunks only with process 0 and svd is likely incorrectly computed. Do you spot any mistakes?

Thanks,
David

suppressMessages(library(pbdMPI, quietly = TRUE))
suppressMessages(library(pbdNCDF4, quietly = TRUE))
suppressMessages(library(pbdDMAT, quietly = TRUE))

init.grid()

rank <- comm.rank()
size <- comm.size()

nc <- nc_open("SLP.nc")

nrow <- nc$dim$west_east$len
ncol <- nc$dim$south_north$len
ntim <- nc$dim$Time$len

ncol.per.rank <- ceiling(ncol / size)
st <- 1 + ncol.per.rank * rank
co <- ncol.per.rank
if (st + co > ncol) co <- ncol - st + 1

x <- ncvar_get(nc, "SLP", start = c(1, st, 1), count = c(-1, co, -1))
dim(x) <- c(nrow * co, ntim)
x <- x - rowMeans(x)
nc_close(nc)

comm.print(dim(x), all.rank = TRUE)

dx <- as.ddmatrix(x, bldim = c(16, 16))
dsvd <- La.svd(dx)
svd <- lapply(dsvd, as.matrix)
if (comm.rank() == 0) print(dim(svd$v))

finalize()

Using 4x4 for the default grid size

[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 4518 8856
[1] 3263 8856
Warning message:
In base.rpdgesvd(jobu = jobu, jobvt = jobvt, m = m, n = n, a = x@Data,  :
  ScaLAPACK returned INFO=-3; returned solution is likely invalid
To unsubscribe from this group and stop receiving emails from it, send an email to rbigdataprogramming+unsub...@googlegroups.com.

David Wang

unread,
Sep 20, 2016, 11:58:26 AM9/20/16
to RBigDataProgramming
Well, I have made some progress, following the advanced example in the pbdDEMO guide: First read data to the first processor column in a new BLACS context and redistribute onto the full processor grid. One thing I'm not entirely sure is in the creation of the ddmatrix object in the new context whether or not bldim should be identical for every rank. Here I set bldim = dim(x) because the number of rows of the global matrix is not a multiple of 6. Does anyone know? Thanks!

suppressMessages(library(pbdMPI, quietly = TRUE))
suppressMessages(library(pbdBASE, quietly = TRUE))
suppressMessages(library(pbdDMAT, quietly = TRUE))
library(ncdf4)

init.grid(NPROW = 6, NPCOL = 6)
newctxt <- base.minctxt()
init.grid(NPROW = 6, NPCOL = 1, ICTXT = newctxt)
grid <- blacs(ICTXT = newctxt)

nc <- nc_open("SLP.nc")

nrow <- nc$dim$west_east$len
ncol <- nc$dim$south_north$len
ntim <- nc$dim$Time$len

if (grid$MYROW == -1 || grid$MYCOL == -1) {
  x <- matrix(0)
} else {
  ncol.per.prow <- ceiling(ncol / 6)
  st <- 1 + (ncol.per.prow * grid$MYROW)
  co <- ncol.per.prow
  if (st + co > ncol) co <- ncol - st + 1
  x <- ncvar_get(nc, "SLP", start = c(1, st, 1), count = c(-1, co, ntim))
  dim(x) <- c(nrow*co, ntim)
  x <- x - rowMeans(x)
}
nc_close(nc)

dx <- new("ddmatrix", Data = x,
          dim = c(nrow*ncol, ntim), ldim = dim(x), bldim = dim(x),
          ICTXT = newctxt)
dx <- redistribute(dx, bldim = 256, ICTXT = 0)
print(dx)

dsvd <- svd(dx)
if (comm.rank() == 0) {
  v <- as.matrix(dsvd$v)
  d <- as.matrix(dsvd$d)
  save(v, d, file = "svdout.rda")
}

finalize()

David Wang

unread,
Sep 20, 2016, 2:05:21 PM9/20/16
to RBigDataProgramming
It looks like bldim should be identical for every rank. So I now set bldim = c(nrow * ceiling(ncol / grid$NPROW), ntim). With that, I seem to be all set:)

Ostrouchov, George

unread,
Sep 20, 2016, 8:00:44 PM9/20/16
to RBigDataProgramming
Hi David,

The default init.grid() sets up contexts 0, 1, and 2. ICTXT=2 means your matrix is partitioned only across rows and all columns are present on every rank, 1 is the opposite, and 0 means partitioned across both rows and columns (default tries to be near square). The defaults are enough for most applications, I think including yours.

You may want to look at the RBigData/pbdIO package on GitHub, which is incomplete but what is there works at this point. There is a function comm.chunk() that will do some of the needed indexing for you (use type="equal",  form="IOpair", lo.side="right"). It returns a vector that is equivalent to your c(st, co) and takes care of edge cases when things don’t divide evenly.
 
Looks like you are reading/creating blocks of rows of your final matrix so when you use new() to glue things together, use ICTXT=2. As you mention, bldim should be identical on every rank so you might need to do an allreduce(nrow(x), op="max") for its second component to treat edge cases if things don’t divide evenly. 

You might get better performance with a smaller bldim in redistribute(). Experiment.

George


To unsubscribe from this group and stop receiving emails from it, send an email to rbigdataprogram...@googlegroups.com.

To post to this group, send email to rbigdatap...@googlegroups.com.
Visit this group at https://groups.google.com/group/rbigdataprogramming.

David Wang

unread,
Sep 21, 2016, 10:13:52 AM9/21/16
to RBigDataProgramming
Hi George,

Thanks for the informative response. My solution was basically adopted from an advanced example in pbdDMAT-guide.pdf (page 16-17), where besides the default square grid (6 by 6 in my case), it creates a column process grid of 6 by 1 and reads blocks of rows and redistribute to the default square grid. I guess based on your suggestion I could instead create a column grid of 36 by 1 with ICTXT = 2 and use it to read blocks of rows.

comm.chuck() seems a handy function that will simplifies the manual work I have done with st and co. All my read-in row blocks have the same number of rows except for the last one which has less rows, so allreduce(nrow(x), op="max") should be equal to the number of rows in any rank except the last.

I will experiment with bldim values. But my global matrix is massive (213099 x 8832). Does that mean I should choose a large bldim value?

Thanks,
David
Reply all
Reply to author
Forward
0 new messages