First, a caveat I have very little to basically no real experience
with Rhipe. My first impression of using expressions as a data type in
R is that if you can avoid them you probably should. The reasons are
they don't have good language tools for joining expressions, removing
pieces of expressions, etc. Your options are to just cut and paste new
and different expressions or to do some fairly opaque manipulations.
Environments are a tad easier to manipulate.
So as an extensions to what I mentioned in a global setup thread, I
thought about how I could do away with all the expressions. And I
came up with rhmrEx. Ex stands for extended. And the extension is to
make the call check for function and environment arguments in map,
reduce, and setup, and if those are detected it uses the setup as an
environment scheme to create map and reduce expressions--thus hiding
the details of expressions from would be Rhipe users. If a map/
reduce function has no arguments it is assumed to be aware of Rhipes
global environment variables. If it has a single argument it assumes
you want the values. If it has two you are assumed to want
key,values. Environments are assumed to have functions with names
like that for map and reduce expressions (eg pre/map/post). The
environment form of map and reduce operates just like the expression
form in that regard except that everything in the map/reduce
environments comes along for the ride for no extra work. The code
goes like this though it is really just a proof of concept rather than
a formal submission. I haven't extensively tested it, and these
things being what they are I am sure it would break under some non-
trivial uses. Note: one way is that the environments for map and
reduce get attached before calling map and reduce functions.
Depending on what is in those environments that could be an issue.
Note: there is an attach in setup that never gets detached that causes
warnings.
The uses look like these.
Worst count words ever:
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
library(Rhipe)
rhinit()
source("rhmrExtended.R")
#countWordsExample
map = function(values){
table = table(unlist(strsplit(unlist(values)," ")))
names = names(table)
counts = as.numeric(table)
nnames = length(names)
for(i in 1:nnames){
rhcollect(names[i],counts[i])
}
}
reduce = function(key, values){
rhcollect(key,sum(unlist(values)))
}
DIR = getwd()
args = list()
args$map = map
args$reduce = reduce
args$ifolder = paste(DIR,"mobydick",sep="/")
args$ofolder = paste(DIR,"output", sep="/")
args$inout = c("text","text")
args$mapred = list(mapred.job.tracker='local')
mr = do.call("rhmrEx", args)
rhex(mr)
------------------------------------------------------------------------------------------------------------------------
Here is the get the airline data example. This last example
demonstrates the flexibility of thinking of map as an environment.
Here we just say "oh lets just put wget in the map environment to go
along for the ride." And its as simple as that.
------------------------------------------------------------------------------------------------------------------------------
library(Rhipe)
rhinit(TRUE,TRUE)
rhoptions(runner=sprintf("%s/
rhipe.runner.sh",Sys.getenv("HOME")))
source("rhmrExtended.R")
wget = function(on){
system(sprintf("wget %s --directory-prefix ./tmp 2> ./
errors",on))
if(length(grep("(failed)|(unable)",readLines("./errors")))>0){
stop(paste(readLines("./errors"),collapse="\n"))
}
}
map = function(map.values){
lapply(map.values,function(x){
x = 1986 + x
on <- sprintf("
http://stat-computing.org/dataexpo/
2009/%s.csv.bz2",x)
fn <- sprintf("./tmp/%s.csv.bz2",x)
rhstatus(sprintf("Downloading %s", on))
wget(on)
rhstatus(sprintf("Downloaded %s", on))
system(sprintf('bunzip2 %s',fn))
rhstatus(sprintf("Unzipped %s", on))
rhcounter("FILES",x,1)
rhcounter("FILES","_ALL_",1)
})
}
DIR = "/wsc/tmp/jrounds"
#DIR = getwd()
args = list()
args$map = new.env()
args$map$map = map
args$map$wget = wget
args$ofolder = paste(DIR,"airline", sep="/")
args$inout = c("lapply")
args$N = length(1987:2008)
args$mapred = list(mapred.reduce.tasks=0,mapred.task.timeout=0)
args$copyFiles = TRUE
mr = do.call("rhmrEx", args)
rhex(mr)
------------------------------------------------------------------------------------------------------------------------------------------
The code to make them work looks like this:
####################################################################################################
#
# handleSetup
# mc is call to rhmrExtended.
# if setup is function it makes an environment call.setup and calls
the function with no arguments.
# if setup is an environment it attaches it before map/reduce
# if setup is anything else it leaves it alone
####################################################################################################
handleSetup = function(mc, call.setup){
if(is.null(call.setup)) return(mc)
long.name.setup.function = function(){;}
if(class(call.setup) == "function"){
long.name.setup.function = call.setup
call.setup = new.env()
}else{
}
if(class(call.setup) == "environment"){
call.setup$long.name.rhmr.Extended.setup.function =
long.name.setup.function
env.setup = rawToChar(serialize(call.setup,NULL,ascii=TRUE))
if(!is.null(mc$mapred)){
mc$mapred$env.setup = env.setup
}else{
mc$mapred = list()
mc$mapred$env.setup = env.setup
}
mc$setup = expression({
env.setup = unserialize( charToRaw( Sys.getenv("env.setup") ))
attach(env.setup)
long.name.rhmr.Extended.setup.function()
})
}
return(mc)
}
################################################################################################
#
# handleMap
# mc is a call to rhmrExtended
# if map is a function it assumes it wants to be called once with
key.values, map.values arguments
# if map is an environment it checks for functions pre/map/post and
executes those
# in those to cases above it wants mc$setup to be NULL or an
environment and alters them with extra variables
# if map is anything else it leaves it alones
################################################################################################
handleMap = function(mc, map){
if(is.null(map)) return(mc)
if(is.null(mc$setup)){
mc$setup = new.env()
}
if(class(map) == "function"){
map.env = new.env()
map.env$map = map
map = map.env
}
if(class(map) == "environment"){
if(class(mc$setup) != "environment"){
cat("Error: in rhmrExtended::handleMap expected setup to be
environment.\n")
return(NULL)
}
if(is.null(map$map)){
cat("Error: expected map function in map environment\n")
}
mc$setup$rhmrExtended.map = map
mc$map = expression(
pre={ attach(rhmrExtended.map); if(!is.null(rhmrExtended.map$pre))
rhmrExtended.map$pre(); },
map = {
rhmrExtended.map.map.nformals = length(formals(rhmrExtended.map
$map))
if(rhmrExtended.map.map.nformals == 0)
rhmrExtended.map$map()
else if (rhmrExtended.map.map.nformals == 1)
rhmrExtended.map$map(map.values)
else
rhmrExtended.map$map(map.keys, map.values)
},
post = {if(!is.null(rhmrExtended.map$post)) rhmrExtended.map
$post(); detach(rhmrExtended.map); }
)
}
return(mc)
}
################################################################################################
#
# handleReduce
# mc is a call to rhmrExtended
# mirror image of handleMap
# literally: text replace on handleMap was used to write it.
################################################################################################
handleReduce = function(mc, reduce){
if(is.null(reduce)) return(mc)
if(is.null(mc$setup)){
mc$setup = new.env()
}
if(class(reduce) == "function"){
reduce.env = new.env()
reduce.env$reduce = reduce
reduce = reduce.env
}
if(class(reduce) == "environment"){
if(class(mc$setup) != "environment"){
cat("Error: in rhmrExtended::handleReduce expected setup to be
environment.\n")
return(NULL)
}
if(is.null(reduce$reduce)){
cat("Error: expected reduce function in reduce environment\n")
}
mc$setup$rhmrExtended.reduce = reduce
mc$reduce = expression(
pre={attach(rhmrExtended.reduce); if(!is.null(rhmrExtended.reduce
$pre)) rhmrExtended.reduce$pre() },
reduce = {
rhmrExtended.reduce.reduce.nformals =
length(formals(rhmrExtended.reduce$reduce))
if(rhmrExtended.reduce.reduce.nformals == 0)
rhmrExtended.reduce$reduce()
else if (rhmrExtended.reduce.reduce.nformals == 1)
rhmrExtended.reduce$reduce(reduce.values)
else
rhmrExtended.reduce$reduce(reduce.key, reduce.values)
},
post = {
if(!is.null(rhmrExtended.reduce$post)){
rhmrExtended.reduce.post.nformals =
length(formals(rhmrExtended.reduce$post))
if(rhmrExtended.reduce.post.nformals == 0)
rhmrExtended.reduce$post()
else
rhmrExtended.reduce$post(reduce.key, reduce.values)
}
detach(rhmrExtended.reduce)
}
)
}
return(mc)
}
rhmrExtended = function(...){
mc = match.call()
mc = as.list(mc)
mc[[1]] = NULL # removes call from list
mc = handleMap(mc,mc$map)
mc = handleReduce(mc, mc$reduce)
mc = handleSetup(mc,mc$setup)
return(do.call("rhmr", mc))
}
rhmrEx = function(...) rhmrExtended(...)