package com.twitter.scalding.geohash
import com.twitter.scalding._
import com.twitter.scalding.Csv
import scala.io.Source
import com.github.davidmoten.geo.GeoHash
import com.javadocmd.simplelatlng._
import com.javadocmd.simplelatlng.util.LengthUnit
import java.io.PrintWriter
class RedistributeCluster(args : Args, nodes : Array[String],clusterSize : Int) extends Job(args){
// code to redistribute cluster
val newNode : String = Tsv(args("output")).readAtSubmitter[String].head
println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"+newNode)
val updatedNodes = nodes :+ newNode
val csvSchema = List("gHash","clusterCenter")
Csv(args("input") ,separator = ",", fields = csvSchema).read
.map('gHash -> 'newClusterCenter) { gHash: String => new_cluster_center(gHash) }
.write(Csv(args("input"+clusterSize),",", fields=('gHash, 'newClusterCenter)))
override def next : Option[Job] = Some(new CalculateClusterNode(args, clusterSize , updatedNodes))
def new_cluster_center(geohash:String){
var nearest = updatedNodes(1)
var distanceFromNearest = distance(updatedNodes(1), geohash)
var i = 1
for(i <- 2 to updatedNodes.length){
if( distance(geohash, updatedNodes(i)) < distanceFromNearest){
nearest = updatedNodes(i)
distanceFromNearest = distance(geohash, updatedNodes(i))
}
}
nearest
}
def distance(geohash1: String,geohash2: String): Double = {
var latLong1 =GeoHash.decodeHash(geohash1)
var pos1 = new LatLng(latLong1.getLat(),latLong1.getLon())
var latLong2 =GeoHash.decodeHash(geohash2)
var pos2 = new LatLng(latLong2.getLat(),latLong2.getLon())
var distanceInMeter = LatLngTool.distance(pos1, pos2, LengthUnit.METER)
return distanceInMeter
}
}
// reads from input file and
class CalculateClusterNode(args : Args, clusterSize : Int ,nodes : Array[String]) extends Job(args) {
//input will be a commandline parameter, folder containing all csv files of geohash
if (nodes.length < clusterSize)
{
val csvSchema = List("geohash","clusterCenter")
Csv( args("input"),separator = ",", fields = csvSchema ).read
.map('geohash -> 'farthest) { line : String => farthest_from_existing_centers(line) }
.groupAll { _.sortBy('farthest).reverse }
.write( Tsv( args("output") ) )
}
else
exit
override def next : Option[Job] = Some(new RedistributeCluster(args, nodes,args("clusterSize").toInt))
def distance(geohash1: String,geohash2: String): Double = {
var latLong1 =GeoHash.decodeHash(geohash1)
var pos1 = new LatLng(latLong1.getLat(),latLong1.getLon())
var latLong2 =GeoHash.decodeHash(geohash2)
var pos2 = new LatLng(latLong2.getLat(),latLong2.getLon())
var distanceInMeter = LatLngTool.distance(pos1, pos2, LengthUnit.METER)
return distanceInMeter
}
def farthest_from_existing_centers(geohash: String): Double = {
//distance(geohash , "9q8up7n")
nodes.map(node => distance(geohash, node)).max
}
}
class KcenterClustering(args: Args) extends Job(args){
override def next : Option[Job] = Some(new CalculateClusterNode(args, args("clusterSize").toInt , Array("9q8up7n")))
}
Exception in thread "main" cascading.flow.planner.PlannerException: source taps are required
at cascading.flow.planner.FlowPlanner.verifyTaps(FlowPlanner.java:170)
at cascading.flow.planner.FlowPlanner.verifyAssembly(FlowPlanner.java:121)
at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:57)
at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
at com.twitter.scalding.Job.buildFlow(Job.scala:91)
at com.twitter.scalding.Job.run(Job.scala:124)
at com.twitter.scalding.Tool.start$1(Tool.scala:105)
at com.twitter.scalding.Tool.run(Tool.scala:121)
at com.twitter.scalding.Tool.run(Tool.scala:72)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at com.twitter.scalding.Tool$.main(Tool.scala:128)
at com.twitter.scalding.Tool.main(Tool.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:186)