K-Means with RHIPE

30 views
Skip to first unread message

Marek Bejda

unread,
Apr 27, 2015, 12:54:44 PM4/27/15
to rh...@googlegroups.com
Hello All, 


   I am trying to run a K-means with RHIPE but it's giving me issues. I have some old code that I'm assuming used to work on an older version of RHIPE before it was changed over the rhwatch. 

I was able to change out the rhmr and save the generated pairs into hdfs, but I wasn't sure what kind of values are passed into the mapper function, whether k/v pairs or lists and it started getting a little complicated so instead here is the original code, it might be more useful than my modified. 

Would anybody know the changes I would have to make to get it working with a newer version of RHIPE? Also any advice on how to test the mapper/reducer functions to somewhat mimic hadoop? 

Like for example when building a Streaming mapper/reducer you can just cat ./input | ./mapper.R | sort-k1,1 | ./reducer.R  is there any such combination I can use for within R ? 



The original source can be found here: 


kmeans_test <- function(data_size, num_clusters, num_iterations, directory) {
    #Create data and move data to HDFS
    input <- do.call(rbind,rep(list(matrix(rnorm(10*data_size, sd=10), ncol=2)), 20))
        + matrix(rnorm(200), ncol=2)
    rhwrite(input, directory) 
    
    #Helper function: Euclidian distance
    dist.fun <- function(C, P) {
        apply(C, 1,
          function(x)
            colSums((t(P) - x)^2))
    }

    #Map function: Compute distances of points to centroids
    kmeans.map <- expression(lapply(map.values), function(P) {
        nearest = {
            if(is.null(C))
                sample(1:num_clusters, nrow(P), replace = T)
            else {
                distance <- dist.fun(C, P)
                nearest <- max.col(-distance)
            }
        }
        rhcollect(nearest, P)})

    #Reduce: Compute new centroids
    kmeans.reduce <- expression(pre = {total = NULL}, 
        reduce = {total <- if total == NULL t(as.matrix(apply(reduce.values, 2, sum))))
            else t(as.matrix(apply(rbind(total, reduce.values), 2, sum)))}, 
        post={rhcollect(reduce.key, total)})
    
    C = NULL
    for(i in 1:num_iterations ) {
        job <- rhmr(map=kmeans.map,
            reduce = kmeans.reduce,
            inout=c("text","sequence"),
            ifolder=directory, ofolder="/output",
            mapred=mapred, jobname="kmeans") 
        rhex(job)  
      C = rhread("/output")
    }
    C
}

Thank you! 
Marek

Marek Bejda

unread,
May 12, 2015, 12:46:40 PM5/12/15
to rh...@googlegroups.com
Greets I've been very much enjoying my time with RHIPE.  

Here is the working and updated version of the Kmeans code. 

#!/usr/bin/env Rscript
############################################################################################
## K-Means
############################################################################################

library(Rhipe)
rhinit()
rhoptions(runner = 'sh /home/smokey/R/lib64/R/library/Rhipe/bin/RhipeMapReduce.sh')


kmeans_test <- function(data_size, num_clusters, num_iterations, directory) {
  #Create data and move data to HDFS
  # input <- do.call(rbind,rep(list(matrix(rnorm(10*data_size, sd=10), ncol=2)), 20))    + matrix(rnorm(200), ncol=2)
  set.seed(2)
  input<-list()
  for(i in 1:(10*data_size)) input<-c(input,list(list(rnorm(1, sd=10),rnorm(1,sd=10))));
  rhwrite(input, file="hdfs:///user/smokey/centerInput") 
  
  setup=expression(
    map={
      load("centers.Rdata")
    }, 
    reduce={
      
    }) 
  
  mapper <- expression({  
    dist.fun <- function(C, P) {
      sapply(C,
             function(x){
               rowSums((as.data.frame(P) - as.data.frame(x))^2)
             })
    }
    
    for(i in seq_along(map.keys)) {
      k <- map.keys[[i]]
      r <- map.values[[i]]
      P <- data.frame(X=k,Y=r)
      
      nearest = {
        if(is.null(C))
          sample(1:num_clusters, nrow(P), replace = T)
        else {
          distance <- dist.fun(C, P)
          nearest <- max.col(t(-distance))
        }
      }     
      rhcollect(nearest,P)
    }
  })
  #Reduce: Compute new centroids
  reducer <- expression(
    pre = {
      total<-NULL
    },
    reduce = {
      centroid<-lapply(do.call(rbind, reduce.values), mean)
      rhcollect(centroid$X,centroid$Y)
    }, 
    post = {
    }
  )

  C<-NULL
  rhsave(C,num_clusters, file="hdfs:///user/smokey/centers.Rdata") 
  
  for(i in 1:num_iterations ){
    job <- rhwatch(
      map=mapper,
      reduce=reducer,
      input=rhfmt("hdfs:///user/smokey/centerInput",type="sequence"), 
      output="hdfs:///user/smokey/kmeansout",
      setup=setup,
      mapred=NULL,
      shared=c("hdfs:///user/smokey/centers.Rdata"),
      readback=TRUE,
      jobname="kmeans",
      verbose=TRUE) 

    C = rhread("hdfs:///user/smokey/kmeansout")
    rhsave(C,num_clusters,file="hdfs:///user/smokey/centers.Rdata")
  }
  C
}

directory<-"hdfs:///user/smokey/kmeans"
kmeans_test(1,3,2, directory)

In a less pretty way it's available on my Github at 


I think the biggest challenge for me to RHIPE was discovering how to actually test functionality without actually running a Hadoop job. 

data<-list(list(1,"Hello World"),list(2,"Long horn"))
map.keys<-lapply(data,"[[",1)
map.values<-lapply(data,"[[",2)

wordcount= function(map.keys, map.values){
  keys <- unlist(strsplit(unlist(map.values), split=' '))
  value <- 1
  lapply(keys, FUN=paste, value=value)
}

wordcount(map.keys,map.values)

and going through some of the RHIPE documentation at



Saptarshi Guha

unread,
May 12, 2015, 3:15:03 PM5/12/15
to rh...@googlegroups.com
Nice, One suggestion you can try

setting the reduce to (not sure if you can since you're not using a combiner. To use a combiner your map output shape should be the same as the reduce output shape). Using a combiner is strongly advised.
Once you do look at rhoptions()$templates

structure(rhoptions()$templates$raggregate({
      centroid<-lapply(do.call(rbind, adata), mean)
      rhcollect(centroid$X,centroid$Y)
},combiner=TRUE)


--

---
You received this message because you are subscribed to the Google Groups "rhipe" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rhipe+un...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages