Rewriting Rhipe to have an expressionless user interface.

64 views
Skip to first unread message

Jeremiah Rounds

unread,
Mar 2, 2011, 6:42:14 PM3/2/11
to rhipe
First, a caveat I have very little to basically no real experience
with Rhipe. My first impression of using expressions as a data type in
R is that if you can avoid them you probably should. The reasons are
they don't have good language tools for joining expressions, removing
pieces of expressions, etc. Your options are to just cut and paste new
and different expressions or to do some fairly opaque manipulations.
Environments are a tad easier to manipulate.

So as an extensions to what I mentioned in a global setup thread, I
thought about how I could do away with all the expressions. And I
came up with rhmrEx. Ex stands for extended. And the extension is to
make the call check for function and environment arguments in map,
reduce, and setup, and if those are detected it uses the setup as an
environment scheme to create map and reduce expressions--thus hiding
the details of expressions from would be Rhipe users. If a map/
reduce function has no arguments it is assumed to be aware of Rhipes
global environment variables. If it has a single argument it assumes
you want the values. If it has two you are assumed to want
key,values. Environments are assumed to have functions with names
like that for map and reduce expressions (eg pre/map/post). The
environment form of map and reduce operates just like the expression
form in that regard except that everything in the map/reduce
environments comes along for the ride for no extra work. The code
goes like this though it is really just a proof of concept rather than
a formal submission. I haven't extensively tested it, and these
things being what they are I am sure it would break under some non-
trivial uses. Note: one way is that the environments for map and
reduce get attached before calling map and reduce functions.
Depending on what is in those environments that could be an issue.
Note: there is an attach in setup that never gets detached that causes
warnings.



The uses look like these.
Worst count words ever:
------------------------------------------------------------------------------------------------------------------------------------------------------------------------

library(Rhipe)
rhinit()
source("rhmrExtended.R")
#countWordsExample
map = function(values){
table = table(unlist(strsplit(unlist(values)," ")))
names = names(table)
counts = as.numeric(table)
nnames = length(names)
for(i in 1:nnames){
rhcollect(names[i],counts[i])
}
}

reduce = function(key, values){
rhcollect(key,sum(unlist(values)))
}


DIR = getwd()
args = list()
args$map = map
args$reduce = reduce
args$ifolder = paste(DIR,"mobydick",sep="/")
args$ofolder = paste(DIR,"output", sep="/")
args$inout = c("text","text")
args$mapred = list(mapred.job.tracker='local')
mr = do.call("rhmrEx", args)
rhex(mr)
------------------------------------------------------------------------------------------------------------------------

Here is the get the airline data example. This last example
demonstrates the flexibility of thinking of map as an environment.
Here we just say "oh lets just put wget in the map environment to go
along for the ride." And its as simple as that.

------------------------------------------------------------------------------------------------------------------------------
library(Rhipe)
rhinit(TRUE,TRUE)
rhoptions(runner=sprintf("%s/rhipe.runner.sh",Sys.getenv("HOME")))
source("rhmrExtended.R")
wget = function(on){
system(sprintf("wget %s --directory-prefix ./tmp 2> ./
errors",on))
if(length(grep("(failed)|(unable)",readLines("./errors")))>0){
stop(paste(readLines("./errors"),collapse="\n"))
}
}

map = function(map.values){
lapply(map.values,function(x){
x = 1986 + x
on <- sprintf("http://stat-computing.org/dataexpo/
2009/%s.csv.bz2",x)
fn <- sprintf("./tmp/%s.csv.bz2",x)
rhstatus(sprintf("Downloading %s", on))
wget(on)
rhstatus(sprintf("Downloaded %s", on))
system(sprintf('bunzip2 %s',fn))
rhstatus(sprintf("Unzipped %s", on))
rhcounter("FILES",x,1)
rhcounter("FILES","_ALL_",1)
})
}
DIR = "/wsc/tmp/jrounds"
#DIR = getwd()
args = list()
args$map = new.env()
args$map$map = map
args$map$wget = wget
args$ofolder = paste(DIR,"airline", sep="/")
args$inout = c("lapply")
args$N = length(1987:2008)
args$mapred = list(mapred.reduce.tasks=0,mapred.task.timeout=0)
args$copyFiles = TRUE
mr = do.call("rhmrEx", args)
rhex(mr)

------------------------------------------------------------------------------------------------------------------------------------------

The code to make them work looks like this:
####################################################################################################
#
# handleSetup
# mc is call to rhmrExtended.
# if setup is function it makes an environment call.setup and calls
the function with no arguments.
# if setup is an environment it attaches it before map/reduce
# if setup is anything else it leaves it alone
####################################################################################################
handleSetup = function(mc, call.setup){
if(is.null(call.setup)) return(mc)
long.name.setup.function = function(){;}
if(class(call.setup) == "function"){
long.name.setup.function = call.setup
call.setup = new.env()
}else{

}
if(class(call.setup) == "environment"){
call.setup$long.name.rhmr.Extended.setup.function =
long.name.setup.function
env.setup = rawToChar(serialize(call.setup,NULL,ascii=TRUE))
if(!is.null(mc$mapred)){
mc$mapred$env.setup = env.setup
}else{
mc$mapred = list()
mc$mapred$env.setup = env.setup
}
mc$setup = expression({
env.setup = unserialize( charToRaw( Sys.getenv("env.setup") ))
attach(env.setup)
long.name.rhmr.Extended.setup.function()

})
}
return(mc)

}

################################################################################################
#
# handleMap
# mc is a call to rhmrExtended
# if map is a function it assumes it wants to be called once with
key.values, map.values arguments
# if map is an environment it checks for functions pre/map/post and
executes those
# in those to cases above it wants mc$setup to be NULL or an
environment and alters them with extra variables
# if map is anything else it leaves it alones
################################################################################################
handleMap = function(mc, map){
if(is.null(map)) return(mc)
if(is.null(mc$setup)){
mc$setup = new.env()
}
if(class(map) == "function"){
map.env = new.env()
map.env$map = map
map = map.env
}
if(class(map) == "environment"){
if(class(mc$setup) != "environment"){
cat("Error: in rhmrExtended::handleMap expected setup to be
environment.\n")
return(NULL)
}
if(is.null(map$map)){
cat("Error: expected map function in map environment\n")
}
mc$setup$rhmrExtended.map = map
mc$map = expression(
pre={ attach(rhmrExtended.map); if(!is.null(rhmrExtended.map$pre))
rhmrExtended.map$pre(); },
map = {
rhmrExtended.map.map.nformals = length(formals(rhmrExtended.map
$map))
if(rhmrExtended.map.map.nformals == 0)
rhmrExtended.map$map()
else if (rhmrExtended.map.map.nformals == 1)
rhmrExtended.map$map(map.values)
else
rhmrExtended.map$map(map.keys, map.values)
},
post = {if(!is.null(rhmrExtended.map$post)) rhmrExtended.map
$post(); detach(rhmrExtended.map); }
)
}
return(mc)

}

################################################################################################
#
# handleReduce
# mc is a call to rhmrExtended
# mirror image of handleMap
# literally: text replace on handleMap was used to write it.
################################################################################################
handleReduce = function(mc, reduce){
if(is.null(reduce)) return(mc)
if(is.null(mc$setup)){
mc$setup = new.env()
}
if(class(reduce) == "function"){
reduce.env = new.env()
reduce.env$reduce = reduce
reduce = reduce.env
}
if(class(reduce) == "environment"){
if(class(mc$setup) != "environment"){
cat("Error: in rhmrExtended::handleReduce expected setup to be
environment.\n")
return(NULL)
}
if(is.null(reduce$reduce)){
cat("Error: expected reduce function in reduce environment\n")
}
mc$setup$rhmrExtended.reduce = reduce
mc$reduce = expression(
pre={attach(rhmrExtended.reduce); if(!is.null(rhmrExtended.reduce
$pre)) rhmrExtended.reduce$pre() },
reduce = {
rhmrExtended.reduce.reduce.nformals =
length(formals(rhmrExtended.reduce$reduce))
if(rhmrExtended.reduce.reduce.nformals == 0)
rhmrExtended.reduce$reduce()
else if (rhmrExtended.reduce.reduce.nformals == 1)
rhmrExtended.reduce$reduce(reduce.values)
else
rhmrExtended.reduce$reduce(reduce.key, reduce.values)
},
post = {
if(!is.null(rhmrExtended.reduce$post)){
rhmrExtended.reduce.post.nformals =
length(formals(rhmrExtended.reduce$post))
if(rhmrExtended.reduce.post.nformals == 0)
rhmrExtended.reduce$post()
else
rhmrExtended.reduce$post(reduce.key, reduce.values)
}
detach(rhmrExtended.reduce)
}
)
}
return(mc)

}


rhmrExtended = function(...){
mc = match.call()
mc = as.list(mc)
mc[[1]] = NULL # removes call from list
mc = handleMap(mc,mc$map)
mc = handleReduce(mc, mc$reduce)
mc = handleSetup(mc,mc$setup)
return(do.call("rhmr", mc))
}
rhmrEx = function(...) rhmrExtended(...)

Saptarshi Guha

unread,
Mar 4, 2011, 7:52:05 PM3/4/11
to rh...@googlegroups.com
First, a caveat I have very little to basically no real experience
> with Rhipe. My first impression of using expressions as a data type in
> R is that if you can avoid them you probably should. The reasons are
> they don't have good language tools for joining expressions, removing
> pieces of expressions, etc. Your options are to just cut and paste new
> and different expressions or to do some fairly opaque manipulations.
> Environments are a tad easier to manipulate.


Not sure about that.

x <- expression({ foo +1 })
as.list(x[[1]])

You can then swap in parts and modify expressions. Also, see [1]
for some examples of quote replacement.

However, I have spoken to some people and expressions are eschewed.

Also, I notice this

rhmrExtended.reduce$reduce()

If I create a variable in reduce$setup, using your approach
can i make persistant modifications in reduce$pre() and reduce$reduce()
so that e.g. reduce$post() will see them?

I think I made use of this somewhere in the manual. It is not a often
used construct, so if your method doesn't it's not a big deal.

We could include it.

Regards

Saptarshi

[1] https://github.com/saptarshiguha/RHIPE-Additions/blob/master/rhipe.lm.R

Jeremiah Rounds

unread,
Mar 5, 2011, 5:20:13 PM3/5/11
to rhipe
>rhmrExtended.reduce$reduce()
>If I create a variable in reduce$setup, using your approach
>can i make persistant modifications in reduce$pre() and reduce$reduce()
>so that e.g. reduce$post() will see them?

Thanks for the question as it reminded me I was unclear on a point. I
take it the issue is that there can be many different R instances
between a pre call and a post call? Is that the issue?



Jeremiah Rounds

unread,
Mar 5, 2011, 6:01:51 PM3/5/11
to rhipe
I had come to terms with map operations and reduce operations, but not
pre and post yet. I think its the case in Rhipe that every key in
hadoop reduce for a single key gets its own R instance, The pre
expression gets ran once before reducing and the post expression gets
ran once after keys are ran. The reduce expression is ran in a single
R instance multiple times if the size of value for a single key is
huge.

Assuming that is the case the question is about scope and not about
instances of R and machines etc. So, yes there is the added
complication of a scope issue when using functions for map/reduce.
There are two ways to deal with the scope issue.
For example let us consider the code from [1] (not the hard code the
easy code used to demonstrate the hard code):

reduce <- expression(
pre={ total <- 0},
reduce = { total <- total+sum(unlist(reduce.values)) }
post = { rhcollect(reduce.key,total) }
)


The two ways that should do the same work (these are untested) (here
args is just my style of forming rhmr arguments prior to the call)

1) Use "<<-" to create and alter a variable in the global scope
(assuming it doesn't run into a variable with the same name sooner).
args = list()
args$reduce = new.env()
args$reduce$pre = function() { total <<- 0}
args$reduce$reduce = function() { total <<- total +
sum(unlist(reduce.values))}
args$reduce$post = function() {rhcollect(reduce.key, total)}

2) Add a working environment to the reduce environment and make
changes to it
args = list()
args$reduce = new.env()
args$reduce$working = new.env()
args$reduce$pre = function() {working$total = 0}
args$reduce$reduce = function() working$total = working$total +
sum(unlist(reduce.values))}
args$reduce$post = function() {rhcollect(reduce.key, working$total)}


If the issue is subtler than that then we have exposed something I
need to learn! Thanks for the offer to consider adding it. I got
some major Rhipe work coming. There is a good chance I will tweak as I
get experience. This is just an idea, and some hours of chasing it.
In the end, the toughest selling point is there is nothing you can do
with environments that you can't do with expressions.

Saptarshi Guha

unread,
Mar 5, 2011, 6:15:03 PM3/5/11
to rh...@googlegroups.com, Jeremiah Rounds
> I had come to terms with map operations and reduce operations, but not
> pre and post yet. I think its the case in Rhipe that every key in
> hadoop reduce for a single key gets its own R instance,  The pre
> expression gets ran once before reducing and the post expression gets
> ran once after keys are ran. The reduce expression is ran in a single
> R instance multiple times if the size of value for a single key is
> huge.

Nopes, for every /block/ of data there is an R instance. (Note, in the
JNI version (on github) you can have one jvm start which will process
a series of blocks, in this case one R instances trundles through
those blocks) So, when a JVM is assigned a new block (which could
contain thousands of key-value pairs), an R instance is started. For
the map: setup is called once, then the keys/values are buffered and
the map expressions is calle repeatedly till the key-values have been
processed. Once done, the cleanup/close is called.

For reduce: Each JVM is assigned a series of keys (and their
corresponding values) to process. R is started and the setup for
reduce is called The pre is called for each new key assigned to that
reducer (which depends on the partitioner) then the reduce is called
repeatedly and then post. Then back to pre for the next key. Finally
the cleanup/close is called.

So, if i create/load a data set in setup in Reduce it is visible to
all the subsequent keys. And loading a data set once is more
efficient. This holds true in your modification. However, I can change
a variable in pre and the pre for subsequent keys will pick this up
(and post and reduce). This can be used for say computing differences
or weird stuff (see manual) - admittely i doubt many will need this.


> reduce <- expression(
>    pre={ total <- 0},
>    reduce = { total <- total+sum(unlist(reduce.values)) }
>    post = { rhcollect(reduce.key,total) }
>    )
>
>
> The two ways that should do the same work (these are untested) (here
> args is just my style of forming rhmr arguments prior to the call)

This works since each expression is run in the global environment.

>
> 1) Use "<<-" to create and alter a variable in the global scope
> (assuming it doesn't run into a variable with the same name sooner).

I abhor the <<- operator. It is not nice to modify parent scopes.


>
> 2) Add a working environment to the reduce environment and make
> changes to it
> args = list()
> args$reduce = new.env()
> args$reduce$working = new.env()
> args$reduce$pre = function() {working$total = 0}
> args$reduce$reduce = function() working$total = working$total +
> sum(unlist(reduce.values))}
> args$reduce$post = function() {rhcollect(reduce.key, working$total)}
>

That works. Globalenvironment is the default environment.

Reply all
Reply to author
Forward
0 new messages