a question and comment on pbdMPI

122 views
Skip to first unread message

marto

unread,
Mar 10, 2013, 5:32:55 AM3/10/13
to rbigdatap...@googlegroups.com
Dear all,

I am running pbdMPI-0.1-4 with R-2.15.2 on Slackware64-Linux-13.37 with kernel 2.6.38.4.

I would like to ask you a question about bootstrapping with pbdMPI. Is the idea of L'Ecuyer implemented, just as it is
in the parallel R package? More specifically, are the seeds 2^127 steps apart in the random number stream?
From the documentation of pbdMPI it seems the answer is yes, but I would like You too see my minimal
working example, just to make sure. The example involves generating random vectors and writing them parallelly to a netcdf file.
Actually I decided to use pbdMPI, just because pbdNCDF4 provides the only option for parallel access to a ncdf file. Otherwise I could have
happily worked with base parallel. So my minimal working example is:

library(pbdNCDF4, quiet=TRUE);
writeChunk <- function(jn, nc, ns, n) { # jn=job number: it is a vector in the general case
 R <- sum(ns[jn]); # the numbers of replications in this jn; jn is a vector when more than 1 processes are distributed to a core
 chunk <- ns[1L]; # size of the full chunk
 x <- replicate(n=R, expr=sample.int(n=n, replace=TRUE), simplify="array"); # generate R random vectors
 ncvar_put(vals=x, start=c(1, chunk*(jn[1L] - 1L) + 1L), count=c(-1, R), nc=nc, varid="indices"); # store them in the ncdf file
}

R <- 9L; # total number of replications, i.e processes
chunk <- 2L; # number of tasks per core, i.e. 'chunk' of processes per rank
n <- 10L; # length of each vector
init();
vecind <- ncdim_def(name="vecind", vals=seq_len(n), units="", create_dimvar=TRUE); # vector index
vecnum <- ncdim_def(name="vecnum", vals=seq_len(R), units="", create_dimvar=TRUE); # vector number
x_var <- ncvar_def(name="indices", dim=list(vecind, vecnum), units="", prec="integer");
nc <- nc_create_par(filename="indices.nc", vars=x_var);
nc_var_par_access(nc=nc, var="indices", collective=TRUE);
nchunks <- R%/%chunk; # total number of full chunks
nrem <- R - nchunks*chunk; # remaining processes
ns <- c(rep(x=chunk, times=nchunks), if(!nrem) NULL else nrem); # numbers of processes for all chunks
id <- as.integer(get.jid(n=length(ns), method="block")); # divide jobs
comm.set.seed(seed=rep(x=12345L, times=6L), diff=TRUE); # different ranks use different streams;
# are the seeds 2^127 steps apart in the random number stream, as suggested by L'Ecuyer?
writeChunk(jn=id, ns=ns, nc=nc, n=n);
comm.end.seed();
nc_sync(nc=nc);
nc_close(nc=nc);
finalize();

If You do not mind, I would also like to comment on my experience with pbdMPI.
What surprised me at first was that I do not need to loop with writeChunk or incorporate it within lapply. It seems
get.jid takes care of this. I run this script with 4 processors (mpiexec -np 4 Rscript this.script), that is I have ranks from 0 to 3. So rank 0 gets process 1,
rank 1 process 2, rank 2 process 3, and rank 3 takes processes 4 and 5. So id for rank=3 is actually a vector, containing processes 4 and 5.
If I incorporate writeChunk in a loop along id, then the program hangs: It seems the 5-th process never gets distributed. But if all cores get the same number of
processes, that is if I take R=8 and chunk=2, that is 8/2=4 processes, i.e. 1 process  per core, looping along id works.

If I run the script on one core (mpiexec -np 1 Rscript this.script) the time it takes is actually the same as with 4 cores, even a bit better.
I think this is due to the communication overhead for such a small job, but anyway I do need a confirmation.


Thank you very much for Your attention.

Best regards

Martin
Message has been deleted
Message has been deleted

Wei-Chen Chen

unread,
Mar 10, 2013, 3:08:59 PM3/10/13
to rbigdatap...@googlegroups.com
Dear Martin,

The next is some answers.

1.
Yes, I think I followed the original rlucyber's design.
If diff = FALSE (default), then all ranks generate the same random number
from the given seed.
If diff = TRUE, then all ranks have different random number (each rank take
a stream) from the given seed.

2.
The fixed code is attached.

I guess you want to divide "R" jobs rather "length(ns)" jobs, therefore, it
should be "get.jid(n = R, ...)".
You may use "comm.print(id, allrank = TRUE)" to show job distribution.

As in you case, "R <- 5" and "mpiexec -np 4 Rscript this.script", you
will have 1 in rank 0, 2 in rank 1, 3 in rank 2, and none in rank 3
(not even 4.)

The "writeChunk()" should adjust accordingly. I presume "n = R" is used, then
"chunk" will be "1" in that function. This is really dependent on how you
distribute your jobs.

3.
Your pbdNCDF4 code is correct. However, if you don't have a parallel file
system, it probably gain no performance in a nc file. At least, the code is
concise. Be careful, serial writing in parallel, you need
sync/close/barrier/reopen the same nc file sequentially rank by rank, or
the writing may not be correct. (usually wrong.)

Let us know if there are further questions.

Sincerely,
Wei-Chen Chen

### "20130310-Martin_fixed.r"

library(pbdNCDF4, quiet=TRUE)

writeChunk <- function(jn, nc, ns, n) {
  # jn=job number: it is a vector in the general case
  #WCC R <- sum(ns[jn])
  R <- length(jn)

  # the numbers of replications in this jn;
  # jn is a vector when more than 1 processes are distributed to a core
  #WCC chunk <- ns[1L]
  chunk <- 1

  # size of the full chunk
  x <- replicate(n = R, expr = sample.int(n = n, replace = TRUE),
                 simplify = "array")

  # generate R random vectors
  ncvar_put(vals = as.vector(x), start = c(1, chunk * (jn[1L] - 1L) + 1L),
            count = c(-1, R),
            nc = nc, varid = "indices")

  # store them in the ncdf file
}

R <- 5L      # total number of replications, i.e processes

chunk <- 2L  # number of tasks per core, i.e. 'chunk' of processes per rank
n <- 10L     # length of each vector
init()
vecind <- ncdim_def(name = "vecind", vals = seq_len(n), units = "",
                    create_dimvar = TRUE)                       # vector index

vecnum <- ncdim_def(name = "vecnum", vals = seq_len(R), units = "",
                    create_dimvar = TRUE)                       # vector number

x_var <- ncvar_def(name = "indices", dim = list(vecind, vecnum),
                   units = "", prec = "integer")
nc <- nc_create_par(filename = "indices.nc", vars = x_var)
nc_var_par_access(nc = nc, var = "indices", collective = TRUE)
nchunks <- R %/% chunk                     # total number of full chunks
nrem <- R - nchunks * chunk                # remaining processes

ns <- c(rep(x = chunk, times = nchunks),
        if(! nrem) NULL else nrem)         # numbers of processes for all chunks
#WCC id <- as.integer(get.jid(n = length(ns),
id <- as.integer(get.jid(n = R,
                 method = "block"))        # divide jobs
comm.print(id, all.rank = TRUE)            # print job distribution for all.
comm.set.seed(seed = rep(x = 12345L, times = 6L),
              diff = TRUE)

                                  # different ranks use different streams
                                  # are the seeds 2^127 steps apart in the
                                  # random number stream, as suggested
                                  # by L'Ecuyer?
writeChunk(jn = id, ns = ns, nc = nc, n = n)
comm.end.seed()
nc_sync(nc = nc)
nc_close(nc = nc)

finalize()


Reply all
Reply to author
Forward
0 new messages