thanks for responsed,i'm now using hadoop 0.20.2 with cdh3u4,there are 1 namenode and 10 datanode in my cluster,rhadoop installed with rmr2.2.2
my code is here:
library(topicmodels)
library(rmr2)
library(tm)
library(slam)
tm_mapreduce<-function(x){
words<-strsplit(x,',')
corpus = Corpus(VectorSource(words))
sample.dtm <- DocumentTermMatrix(corpus, control = list(wordLengths = c(1, Inf)))
k <- 3
setAs("NULL", "CTM_VEMcontrol", function(from, to) new(to))
VEM = LDA(sample.dtm, k = k,control = NULL)
Terms <- terms(VEM, 10)
return(Terms)
}
try_lda <- function(x) {
out <- tryCatch(
tm_mapreduce(x),
error=function(e) {
message(paste("it seems error on:", x,"message:",e))
return("ERROR")
}
)
return(out)
}
try_word <- function(x,ind,split) {
out <- tryCatch(
word(x,ind,sep=fixed(split)),
error=function(e) {
message(paste("it seems split error on:", x,"message:",e))
return("ERROR")
}
)
return(out)
}
tmp <- tryCatch( tm_mapreduce(x), error = identity )
keyword <- function (input, output){
mapreduce(
input=input, output=output,
map=function(k, v){
keyval(try_word(v,3,"\001"),try_word(v,5,"\001"))
#keyval(1,v)
},
reduce=function(k,vv){
d<-data.frame(k,vv)
acc_nbr <- as.character(unique(d$k))
term<-lapply(acc_nbr,function(x){as.character(d[which(d$k==x),]$vv)})
keyval(acc_nbr,term)
}
)
}
tm_lda<-function(input, output){
mapreduce(
input=input,output=output,
map=function(k,v){
n<-1:length(v)
val<-lapply(n,function(x){try_lda(v[[x]])})
keyval(k,val)
},
backend.parameters = list(hadoop = list(D = 'mapred.task.timeout=36000000'))
)
}
keyword('/user/hive/warehouse/tmp_hfh_keyword2','/rhipe/keyword')
tm_lda('/rhipe/keyword','/rhipe/lda')
-------
are there any problems?
在 2013年8月13日星期二UTC+8上午1时12分08秒,Antonio Piccolboni写道: