Shekhar
unread,Apr 29, 2011, 3:52:04 AM4/29/11Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to Bangalore R Users - BRU
Hi,
Now i hope you are getting the hang of what are the capabilities of R
and the Rhipe package. Here i present another most daunting problem
which is both data intensive and computational intensive i.e. k-means
clustering algorithm. The main application of this algorithm could be
in ad-analysis, sentimental analysis, identifying trends in the market
etc.
Basically K-means clustering is a data mining / machine learning
algorithm used to cluster observations into groups of related
observations without any prior knowledge of those relationships. More
specifically it can be defined as the method of cluster analysis which
aims to partition n observations into k clusters in which each
observation belongs to the cluster with the nearest mean. K-means
employs an iterative refinement technique, where new centers are
calculated iteratively until the centers are tightly coupled to a set
of points.
Now lets define the problem:
Given a set of 2D points and initial cluster / centers, group the
points tightly to the given centers using minimum square distance /
Euclidean distance criteria. For simplicity the first, second and
third quartile of the data set is taken as initial centers.
As you can see that its an iterative map reduce job , but Rhipe
doesn't support this.
As a workaround we will run the script iteratively.
K-Means clustering attempts to group the set of points to centers
iteratively but the RHIPE library doesn’t support iterative map reduce
jobs. Such jobs can be accomplished by calling the R script
iteratively. At every iteration a new job object is created.
The script is executed iteratively till the condition when the
previous and current centers are stable i.e. the Euclidean distance
between them is less than defined threshold value.
The following RHIPE and R functions are used for accomplishing
iterative MapReduce job on R:
(1)Rhsave
This is a RHIPE function which is used for saving the R variables into
HDFS. These variable can beloaded into the R session during the map
reduce job through the function load.
(2)Load
It’s a R function whose syntax is “load (file, envir =
parent.frame())”, which basically loads the R object.
(3)Setup
In RHIPE, each task is a sequence of many thousands of key, value
pairs. Before running the map and reduce expression (and before any
key, value pairs have been read), RHIPE will evaluate expressions in
setup. This is very suitable place where if a file has to be read
before the map expression. The Setup expression is similar to
DistributedFileCache class in Java, where the user can provide the
jar, files, etc to be uploaded before the job is executed.
Here the initial centers are stored on HDFS and this is loaded during
every map tasks.
(4)Shared
This is a character vector of files located on the HDFS. At the
beginning of the MapReduce job, these files will be copied to the
local hard disks of the Tasktrackers (cluster computers on which the
compute nodes/cores are located). User provided R code can read these
files from the current directory (which is located on the local hard
disk). For example, if /path/to/file.Rdata is located on the HDFS and
shared, it is possible to read it in the R expressions as
load(’file.Rdata’). Note, there is no need for the full path; the file
is copied to the current directory of the R process.
NOTE
I.There is no condition check for the termination of the algorithm,
but we are running for 10 iterations only
II. Here two scripts are used namely “iter.R” and” kmean.R”. The
script “iter.R” calls the “kmean.R” iteratively.
---------------------------------iter.R
Algorithm-------------------------------------
1. Save the initial centers to the HDFS in a file ”centers.Rdata”
using the RHIPE function rhsave.
2. Write the initial centers to a file also for final output purpose.
3. Start a loop
4. Delete the “kmean.R” output .If this is first iteration don’t
delete the” kmean.R” output (otherwise the map reduce job won’t be
executed since the error is thrown by the RHIPE so it will abort the
job).
5. Call the script kmean.R
6. Read the output file
7. Calculate the new mean
8. Replace the old centers with the new centers.
9. Write the new centers to file
10. Delete the “centers.Rdata’ file using rhdel function
11. Write the new centers to “centers.Rdata” file.
12. Go to Step 4 until the condition is satisfied.
-----------------------------Kmean.R
Algorithm----------------------------------------
1. Read the 2D points from the hadoop which comes to the RHIPE in the
variable map.values, and split the string into vector and form the
matrix which is stored in the variable y. Column 1 in the matrix “y”
consist of the “x” coordinates and the column 2 consist of “y”
coordinates.
2. Assign the names to the columns of the matrix that consist of x and
y coordinates.
3. Assign the initial centers c1,c2,c3
4. Create the data frame with the columns created from the step 2.
5. Create 3 empty data frame for each of the centers.
6. Iterate through each of the rows in the data frame and calculate
the Euclidean distance from each of the centers c1, c2 and c3, using R
“dist” function.
Find the minimum distance and assigned it to the respective data frame
7. Emit the centers c1 , c2 and c3 as the keys and their respective
data frames d1, d2 and d3 as their values
8. In the reducer part, collect the entire data frame from the
different map outputs and post the data frame.
--------------Iter.R
Code---------------------------------------------------------------------
library(Rhipe)
# providing the initial centres
firstCenter<-c(16929,28295)
secondCenter<-c(34357,44202)
thirdCenter<-c(47667,64978)
#writing the centers to a file
write("Initial Centers",file="IterativeMR/centers",append=TRUE)
write(firstCenter,file=" /IterativeMR/centers",append=TRUE)
write(secondCenter,file="IterativeMR/centers",append=TRUE)
write(thirdCenter,file="IterativeMR/centers",append=TRUE)
write("---------------------------------------------",file="IterativeMR/
centers",append=TRUE)
rhsave(firstCenter,secondCenter,thirdCenter,file="data/
centers.Rdata")
for( j in 1:10)
{
#deleting the output file of the map reduce job
# rhdel("/out01") if the out01 doesnt exit and if trying to
delete then it will throw the error and doesnt continue with the next
statement.Need to delete this file for subsequent iteration so for the
first iteration don’t delete the file. Also preserve this file for
the final output
if( j > 1)
rhdel("/out01")
source("/home/drilldown/RKmeans/IterativeMR/kmd.R")
# reading the output and calculating the new centers and
replacing the previous center
output<-rhread("/out01/part-r-00000")
centers<-list(output[[1]][[1]],output[[2]][[1]],output[[3]][[1]])
center1<-c(mean(output[[1]][[2]]$col.x),mean(output[[1]][[2]]$col.y))
center2<-c(mean(output[[2]][[2]]$col.x),mean(output[[2]][[2]]$col.y))
center3<-c(mean(output[[3]][[2]]$col.x),mean(output[[3]][[2]]$col.y))
for(i in 1:length(centers))
{
if ( identical(centers[[i]],firstCenter))
{
firstCenter = center1
}
else if(identical(centers[[i]], secondCenter))
{
secondCenter = center2
}
else if (identical(centers[[i]],thirdCenter))
{
thirdCenter = center3
}
}
#writing the new centers to a file
write(paste("Center After iteration",j,sep=":"),file="IterativeMR/
centers",append=TRUE) write(firstCenter,file="IterativeMR/
centers",append=TRUE)
write(secondCenter,file="IterativeMR/centers",append=TRUE)
write(thirdCenter,file="IterativeMR/centers",append=TRUE)
write("---------------------------------------------",file="/home/
drilldown/RKmeans/IterativeMR/centers",append=T RUE)
#deleting the previous centers and output
rhdel("data/centers.Rdata")
#saving the new centers
rhsave(firstCenter,secondCenter,thirdCenter,file="data/centers.Rdata")
}
--------------------------------------
Kmean.R---------------------------------------------------
library(Rhipe)
rhinit()
map.setup = expression({
load("centers.Rdata") # no need to give full path
})
map<-expression({
y<-do.call("rbind",lapply(map.values,function(r){
as.numeric(strsplit(r," ")[[1]])
}))
if(nrow(y) > 0)
{
col.x = y[,1]
col.y = y[,2]
c1<-firstCenter
c2<-secondCenter
c3<-thirdCenter
centerMat<-rbind(c1,rbind(c2,c3))
#forming the full data frame
d<-data.frame(col.x=col.x,col.y=col.y,stringsAsFactors=FALSE)
#Appeding the center matrix to the top of the data frame
dmat<-rbind(centerMat,as.matrix(d))
#Finding the euclidean distance
reqMat<-(as.matrix(dist(dmat,method="euclidean")))[4:nrow(y),1:3]
#creating three data frame for three different centers
d1<-data.frame()#data frame for centre1
d2<-data.frame()#data frame for centre2
d3<-data.frame()#data frame for centre3
for( i in 1:nrow(reqMat) )
{
minimum = which.min(reqMat[i,])
if(minimum==1) d1<-rbind(d1,d[i, ])
else if(minimum==2) d2<-rbind(d2,d[i, ])
else if(minimum==3) d3<-rbind(d3,d[i, ])
}
rhcollect(c1,d1)
rhcollect(c2,d2)
rhcollect(c3,d3)
}
})
reduce<-expression(
pre = { collect<-NULL } ,
reduce = {
collect<-rbind(collect,do.call("rbind",reduce.values))
},
post = {
rhcollect(reduce.key,collect)
}
)
mapred<-
list(rhipe_map_buff_size=20,mapred.task.timeout=0,mapred.reduce.tasks=)
ifolder="/km"
ofolder="/out01"
z<-
rhmr(map=map,reduce=reduce,inout=c("text","sequence"),ifolder=ifolder,
ofolder=ofolder,setup=map.setup,
shared=c("data/centers.Rdata"),mapred=mapred,jobname="kairo")
66 rhex(z)
Table 9: K-means Script "kmean.R"
The Output for three iteration is as follows:
1 Initial Centers
2 16929 28295
3 34357 44202
4 47667 64978
5 ---------------------------------------------
6 Center After iteration:1
7 16611.42 29098.74
8 51817.52 54263.62
9 37140.38 43023.43
10 ---------------------------------------------
11 Center After iteration:2
12 15167.49 31598.07
13 54047.03 49126.61
14 35697.03 42651.44
So that is how we accomplished k-means clustering.
Regards,
Som Shekhar