completely stuck with source tab error at runtime

185 views
Skip to first unread message

Neeraj Nagi

unread,
Feb 8, 2014, 9:48:49 AM2/8/14
to cascadi...@googlegroups.com
I am running a simple algo using scalding
code is

package com.twitter.scalding.geohash
import com.twitter.scalding._
import com.twitter.scalding.Csv
import java.io
import scala.io.Source
import com.github.davidmoten.geo.GeoHash
import com.javadocmd.simplelatlng._
import com.javadocmd.simplelatlng.util.LengthUnit
import java.io.PrintWriter

class RedistributeCluster(args : Args, nodes : Array[String],clusterSize : Int) extends Job(args){
  // code to redistribute cluster
  val newNode : String = Tsv(args("output")).readAtSubmitter[String].head
  println("^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"+newNode)
  val  updatedNodes = nodes :+ newNode
  val csvSchema = List("gHash","clusterCenter")
  Csv(args("input") ,separator = ",", fields = csvSchema).read
    .map('gHash -> 'newClusterCenter) { gHash: String => new_cluster_center(gHash) }
    .write(Csv(args("input"+clusterSize),",", fields=('gHash, 'newClusterCenter)))
  override def next : Option[Job] = Some(new CalculateClusterNode(args, clusterSize , updatedNodes))

  def new_cluster_center(geohash:String){
    var nearest = updatedNodes(1)
    var distanceFromNearest = distance(updatedNodes(1), geohash)
    var i = 1
    for(i <- 2 to updatedNodes.length){
      if( distance(geohash, updatedNodes(i)) < distanceFromNearest){
        nearest = updatedNodes(i)
        distanceFromNearest = distance(geohash, updatedNodes(i))
      }
    }
    nearest
  }

  def distance(geohash1: String,geohash2: String): Double = {
    var latLong1 =GeoHash.decodeHash(geohash1)
    var pos1 = new LatLng(latLong1.getLat(),latLong1.getLon())
    var latLong2 =GeoHash.decodeHash(geohash2)
    var pos2 = new LatLng(latLong2.getLat(),latLong2.getLon())
    var distanceInMeter = LatLngTool.distance(pos1, pos2, LengthUnit.METER)
    return distanceInMeter
  }
}


// reads from input file and
class CalculateClusterNode(args : Args, clusterSize : Int ,nodes : Array[String]) extends Job(args) {
  //input will be a commandline parameter, folder containing all csv files of geohash
  if (nodes.length < clusterSize)
  {
    val csvSchema = List("geohash","clusterCenter")
    Csv( args("input"),separator = ",", fields = csvSchema ).read
      .map('geohash -> 'farthest) { line : String => farthest_from_existing_centers(line) }
      .groupAll { _.sortBy('farthest).reverse }
      .write( Tsv( args("output") ) )
  }
  else
    exit

  override def next : Option[Job] = Some(new RedistributeCluster(args, nodes,args("clusterSize").toInt))
  def distance(geohash1: String,geohash2: String): Double = {
    var latLong1 =GeoHash.decodeHash(geohash1)
    var pos1 = new LatLng(latLong1.getLat(),latLong1.getLon())
    var latLong2 =GeoHash.decodeHash(geohash2)
    var pos2 = new LatLng(latLong2.getLat(),latLong2.getLon())
    var distanceInMeter = LatLngTool.distance(pos1, pos2, LengthUnit.METER)
    return distanceInMeter
  }
  def farthest_from_existing_centers(geohash: String): Double = {
    //distance(geohash  , "9q8up7n")
    nodes.map(node => distance(geohash, node)).max
  }
}

class KcenterClustering(args: Args) extends Job(args){

      override def next : Option[Job] = Some(new CalculateClusterNode(args, args("clusterSize").toInt , Array("9q8up7n")))
}




--------------------------

upon executing 
 hadoop jar target/scalding-1.0-SNAPSHOT.jar  com.twitter.scalding.geohash.KcenterClustering --local --input ../scalding_data/input/1.csv --output ../scalding_data/output --clusterSize 5

I get this error

Exception in thread "main" cascading.flow.planner.PlannerException: source taps are required
        at cascading.flow.planner.FlowPlanner.verifyTaps(FlowPlanner.java:170)
        at cascading.flow.planner.FlowPlanner.verifyAssembly(FlowPlanner.java:121)
        at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:57)
        at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
        at com.twitter.scalding.Job.buildFlow(Job.scala:91)
        at com.twitter.scalding.Job.run(Job.scala:124)
        at com.twitter.scalding.Tool.start$1(Tool.scala:105)
        at com.twitter.scalding.Tool.run(Tool.scala:121)
        at com.twitter.scalding.Tool.run(Tool.scala:72)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at com.twitter.scalding.Tool$.main(Tool.scala:128)
        at com.twitter.scalding.Tool.main(Tool.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:186)


Any suggestion is most welcome

Oscar Boykin

unread,
Feb 8, 2014, 6:00:54 PM2/8/14
to cascadi...@googlegroups.com
You can't have an empty Job. Every Job must do something.

So, KcenterClustering is invalid.

Also,  if (nodes.length < clusterSize), then RedistributeCluster is invalid. As I mentioned in the other email, check out the PageRank example.


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/e09fb7d9-63bf-49e4-88c0-1ff57f80a1ac%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Oscar Boykin :: @posco :: http://twitter.com/posco
Reply all
Reply to author
Forward
0 new messages