kmeans_test <- function(data_size, num_clusters, num_iterations, directory) {
#Create data and move data to HDFS
input <- do.call(rbind,rep(list(matrix(rnorm(10*data_size, sd=10), ncol=2)), 20))
+ matrix(rnorm(200), ncol=2)
rhwrite(input, directory)
#Helper function: Euclidian distance
dist.fun <- function(C, P) {
apply(C, 1,
function(x)
colSums((t(P) - x)^2))
}
#Map function: Compute distances of points to centroids
kmeans.map <- expression(lapply(map.values), function(P) {
nearest = {
if(is.null(C))
sample(1:num_clusters, nrow(P), replace = T)
else {
distance <- dist.fun(C, P)
nearest <- max.col(-distance)
}
}
rhcollect(nearest, P)})
#Reduce: Compute new centroids
kmeans.reduce <- expression(pre = {total = NULL},
reduce = {total <- if total == NULL t(as.matrix(apply(reduce.values, 2, sum))))
else t(as.matrix(apply(rbind(total, reduce.values), 2, sum)))},
post={rhcollect(reduce.key, total)})
C = NULL
for(i in 1:num_iterations ) {
job <- rhmr(map=kmeans.map,
reduce = kmeans.reduce,
inout=c("text","sequence"),
ifolder=directory, ofolder="/output",
mapred=mapred, jobname="kmeans")
rhex(job)
C = rhread("/output")
}
C
}
Thank you!
Marek