If Ryan reads this I would be curious if there was a datadr ideom for merging ddo objects, but at the moment I have no pressing need. I did what follows both dxsummary_2 and ili_dmisid_mmwr_moving_average were ddo storage directories.
path1 = "/user/roun308/chiron/dxsummary_2"
path2 = "/user/roun308/chiron/ili_dmisid_mmwr_moving_average"
output = "/user/roun308/chiron/all_mmwr"
minput=rhfmt(type='map',folder=c(path1,path2))
moutput = rhfmt(type='map', folder=output)
map = expression({
for(i in seq_along(map.values)){
#kind of hacky but the class of the map value tells us which input it is from
v = map.values[[i]]
k = map.keys[[i]]
type_of = switch(class(v),
data.frame = {"total_visits"},
list = {"ili_visits"},
{stop("Unknown value class", class(v))}
)
new.value = list()
new.value[[type_of]] = v
rhcollect(k, new.value)
}
})
reduce = expression(
pre= {
data = list()
},
reduce = {
data = append(data, reduce.values)
},
post = {
rhcollect(reduce.key, data)
}
)
mapred =list(
rhipe_map_buff_size=100,
mapreduce.map.memory.mb=4000,
mapreduce.map.java.opts= "-Xmx2000M",
#
rhipe_reduce_buff_size=100,
mapreduce.reduce.memory.mb=4000,
mapreduce.reduce.java.opts= "-Xmx2000M",
#
mapreduce.job.maps= 100,
mapreduce.job.reduces=100,
)
ret = rhwatch(map, reduce, input=minput, output=moutput, mapred=mapred)
hdfs_mmwr = hdfsConn("/user/roun308/chiron/all_mmwr")
ddo_mmwr = ddo(hdfs_mmwr)