RHadoop and Lineare Regression PMML creation

48 views
Skip to first unread message

Felix Becker

unread,
Oct 29, 2015, 11:20:38 AM10/29/15
to rha...@googlegroups.com
Hello community,

I'm working in a project, which builds a new application to optimize an actual running prediction application.

I got some R Scripts from my business team, which is plain R syntax. 
Because we are running Hadoop with a lot of node and I want to use RHadoop to run my calculations distributed using Hadoop.

I already did some wordcount tests, so my connection to Hadoop is working.
My project calculation will be a lineare regression for a data.frame, which contains every input variables like y, x1, x2, x3, x4 ...

I already built up an distributed lineare regression for y and x1, where the input of the mapper are lists. 
To get reach my project goals, I need to run my lineare regression calculation on a data.frame base for every input variable.

Actually I have a the following mapper methods:
 - calculate X*X  -> MapXtX
 - calculate X*Y -> MapXty
 - calculate Sum of X -> MapSumList
 - calculate Sum of Y -> MapSumList

// Calculates X*X
MapXtX = function(., Xi) {
  rmr.str(Xi)
  Xi = Xi[,1]
  keyval(1, list(t(Xi) %*% Xi))
}

// Calculates X*y
MapXty = function(., Xi) {
  yi = y[as.numeric(seq(1, nrow(Xi))),]
  Xi = Xi[,1]
  
  keyval(1, list(t(Xi) %*% yi))
}

// Sums up
Sum = function(.,YY) {
  keyval(1, list(Reduce('+', YY)))
}

// Map Reduce call
XtX = values(
  from.dfs(
    mapreduce(
      input = X.index,
      map = MapXtX,
      reduce = Sum,
      combine = TRUE
    )))[[1]]

// Map Reduce call
Xty = values(
  from.dfs(
    mapreduce(
      input = X.index,
      map = MapXty,
      reduce = Sum,
      combine = TRUE
    )))[[1]]

// Break list into chunks
MapSumList = function(., Listi) {
  keyval(1, list(t(Listi)))
}

// Sum up
SumList = function(., YY) {
  keyval(1, sum(unlist(YY)))
}

// Sum x values
XSum = values(
  from.dfs(
    mapreduce(
      input = X.index,
      map = MapSumList,
      reduce = SumList,
      combine = TRUE
    )))[[1]]

// Sum y values
YSum = values(
  from.dfs(
    mapreduce(
      input = y.index,
      map = MapSumList,
      reduce = SumList,
      combine = TRUE
    )))[[1]]


and finally to calculate the coefficent and x1 value, which is done by the following calculation:

a = ((YSum %*% XtX) - (XSum * Xty))/(nrow(X) * XtX - (XSum*XSum))
b = ((nrow(X) * Xty) - (XSum * YSum))/(nrow(X) * XtX - (XSum*XSum))



----------------------------------------------------------------------------------------------------------------------------------------------------------

The question is now, how can I change my MapReduce Jobs to use an data.frame to calculate all input variables like X*X and the finally coefficent calculation?

df=data.frame( y, x1,x2,x3,x4)


Regards,
Reply all
Reply to author
Forward
0 new messages