scalding: flow planner exception

436 views
Skip to first unread message

Miguel Ping

unread,
Nov 13, 2013, 7:45:08 AM11/13/13
to cascadi...@googlegroups.com
I'm hitting a strange error with the code below. It seems that it's related to our cluster upgrade (we're using cloudera), since I could run this script before.
The error is a little strange, I'm guessing it has to do with my case class. I checked the "GUESS" but i think it has nothing to do with the error at the mentioned link.

We're using Hadoop 2.0.0-cdh4.4.0. Do you guys have any idea? Thanks!

import com.twitter.scalding._
import cascading.pipe.Pipe
import cascading.tuple._

import scala.collection.mutable

case class KVTextLine(p: String, fields: Fields = Fields.ALL) extends FixedPathSource(p) with TextLineScheme {

  import Dsl._

  //converts our pig format of type K<tab>V<tab> (contentHost<tab>www.google.pt<tab>contentPath<tab>...)
  //to cascading's format, accepting the supplied fields
  override def transformForRead(pipe: Pipe) = pipe.mapTo('line -> fields) {
    line: String =>
      val map = scala.collection.mutable.Map[String, String]()
      line.split("\t").sliding(2, 2).foreach((x: Array[String]) => //map(/*Symbol*/ (x(0))) = x(1)
        x.length match {
          case 2 => map(x(0)) = x(1)
          case 1 => map(x(0)) = null
          case _ => //NOP, ignore
        }
      )

      var values = mutable.MutableList[String]()
      val iterator = fields.iterator()
      while (iterator.hasNext) {
        values += map.getOrElse(iterator.next().toString, null) //cons
      }

      new cascading.tuple.Tuple(values: _*)
  }

  override def toString = "KVTextLine(" + p + ", " + fields.toString + ")"
}

class SimpleCountJob(args: Args) extends Job(args) {

  //scala has a limit of 22 fields per tuple, we may use a list
  val waSchema = List('contentHost, 'processingTime) //actually there are more fields

  KVTextLine(args("input"), waSchema)
    .project('contentHost, 'processingTime)
    .groupBy('contentHost) {
    _.sizeAveStdev('processingTime -> ('count, 'mean, 'stdev) )
    }
    .write(Tsv(args("output")))
}


13/11/13 12:41:22 INFO util.HadoopUtil: resolving application jar from found main method on: com.twitter.scalding.Tool$
13/11/13 12:41:22 INFO planner.HadoopPlanner: using application jar: /home/mping/scalding-core-assembly-0.9.0rc4.jar
13/11/13 12:41:22 INFO property.AppProps: using app.id: 121717910EA64318974CF905E9967174
Exception in thread "main" java.lang.Throwable: GUESS: Cascading requires all sources to have final sinks on disk.
If you know what exactly caused this error, please consider contributing to GitHub via following link.
        at com.twitter.scalding.Tool$.main(Tool.scala:154)
        at com.twitter.scalding.Tool.main(Tool.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
Caused by: cascading.flow.planner.PlannerException: could not build flow from assembly: [null]
        at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:576)
        at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:263)
        at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
        at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
        at com.twitter.scalding.Job.buildFlow(Job.scala:205)
        at com.twitter.scalding.Job.runFlow(Job.scala:226)
        at com.twitter.scalding.Job.run(Job.scala:232)
        at com.twitter.scalding.Tool.start$1(Tool.scala:114)
        at com.twitter.scalding.Tool.run(Tool.scala:132)
        at com.twitter.scalding.Tool.run(Tool.scala:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at com.twitter.scalding.Tool$.main(Tool.scala:140)
        ... 6 more
Caused by: java.lang.NullPointerException
        at cascading.tap.hadoop.io.MultiInputFormat.addInputFormat(MultiInputFormat.java:76)
        at cascading.flow.hadoop.HadoopFlowStep.initFromSources(HadoopFlowStep.java:345)
        at cascading.flow.hadoop.HadoopFlowStep.getInitializedConfig(HadoopFlowStep.java:99)
        at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:201)
        at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:69)
        at cascading.flow.planner.BaseFlowStep.getFlowStepJob(BaseFlowStep.java:676)
        at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1181)
        at cascading.flow.BaseFlow.initialize(BaseFlow.java:199)
        at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:257)
        ... 16 more

Miguel Ping

unread,
Nov 13, 2013, 8:19:36 AM11/13/13
to cascadi...@googlegroups.com
We're also hitting the same error with a simple cascalog job, so I suspect it may be some weird dependency mismatch with the version numbers.

Oscar Boykin

unread,
Nov 13, 2013, 10:54:13 AM11/13/13
to cascadi...@googlegroups.com
How are you getting cascading running with Hadoop 2?

Are you excluding the cascading pulled in by scalding and using the Hadoop 2 version? That is the way it has to be done.

Chris posted about Hadoop 2 yesterday.
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/21eafc3d-7980-479d-b387-586bd6cea3ab%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.


--
Oscar Boykin :: @posco :: http://twitter.com/posco

Koert Kuipers

unread,
Nov 13, 2013, 11:11:46 AM11/13/13
to cascadi...@googlegroups.com
are you using cdh4-mr1 or cdh4-yarn?


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

Chris K Wensel

unread,
Nov 13, 2013, 7:09:04 PM11/13/13
to cascadi...@googlegroups.com
if you are on the equivalent of Apache Hadoop 2.2, its likely you will get an NPE as Hadoop2 doesn't not set the "mapred.job.tracker" property.

i'll update Cascading 2.2 to issue a proper warning. but you should be using Cascading 2.5.

ckw


For more options, visit https://groups.google.com/groups/opt_out.

Miguel Ping

unread,
Nov 14, 2013, 6:17:09 AM11/14/13
to cascadi...@googlegroups.com
Thanks for the tips guys.

It seems that I'm in fact using cascading 2.2, the "sbt dependency-tree" plugin reports just that.
I followed your advice and it's working. I had to do some more tricks:

If I use scalding 0.8.11, I have to exclude kryo also (which is 2.17), and manually include kryo 2.21
If I try to use scalding 0.9.0rc4, I have a CNFE on "com.twitter.scalding.LowPriorityConversions", so I rolled back to 0.8.11 which is working.

I ended up using leiningen for the dependencies, I'm still struggling with sbt and scald.rb

Anyway I'm good to go, pretty sure that the cascalog version can be also easily fixed.
BTW, if you're new to hadoop, this whole versioning thing is a mess: mr1 vs yarn, hadoop 1 vs. 2,  jars with "0.20" as version, I'm never quite sure of the versions I'm using :\ I'd pay a (remote) pizza and a beer if someone wrote a blog post explaining this mess.

Thanks!

Oscar Boykin

unread,
Nov 14, 2013, 11:19:30 AM11/14/13
to cascadi...@googlegroups.com
Scalding 0.9.0 will at least be binary incompatible with all prior and is also in a few cases source in compatible.

(Plus was deprecated and unified with sum).

Until 1.0.0 we bump the minor version on binary incompatibility, patch version on any API addition or bugfix.
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/ffa3e0ad-d9e0-4bc5-98cb-803483a71d5a%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/CANx3uAiD7xVAteVQLow7DSxwFMnbfi0HinwG-5T_cHZiA0d52w%40mail.gmail.com.
For more options, visit https://groups.google.com/groups/opt_out.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/groups/opt_out.


--
Reply all
Reply to author
Forward
0 new messages