Scalding Execution and PlannerException: source taps are required

482 views
Skip to first unread message

ravi kiran holur vijay

unread,
Oct 7, 2016, 2:02:17 PM10/7/16
to cascading-user
Hello,

I am trying to run a barebones scalding job using execution, but am running into PlannerException. However, my map-reduce job finishes successfully and I can see the output on S3. Can any of you please share any fixes you are aware of for this?

Scalding Job:
class TestJobFailure(args: Args) extends Job(args) {

val inputFile = "s3://xxx/testdata/latest"
val outputFile = "s3://xxx/delete"

val job1 = TypedPipe.from(TextLine(inputFile)).sample(0.01).writeExecution(TypedTsv(outputFile))

val u: Unit = job1.waitFor(scaldingConfig, mode).get

}

Exception:

App > Exception in thread "main" java.lang.Throwable: GUESS: Cascading requires all sources to have final sinks on disk.

App > If you know what exactly caused this error, please consider contributing to GitHub via following link.

App > https://github.com/twitter/scalding/wiki/Common-Exceptions-and-possible-reasons#cascadingflowplannerplannerexception

App > at com.twitter.scalding.Tool$.main(Tool.scala:130)

App > at com.twitter.scalding.Tool.main(Tool.scala)

App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

App > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

App > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

App > at java.lang.reflect.Method.invoke(Method.java:606)

App > at org.apache.hadoop.util.RunJar.main(RunJar.java:163)

App > Caused by: cascading.flow.planner.PlannerException: source taps are required

App > at cascading.flow.planner.FlowPlanner.verifyTaps(FlowPlanner.java:234)

App > at cascading.flow.planner.FlowPlanner.verifyAllTaps(FlowPlanner.java:175)

App > at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:242)

App > at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)

App > at cascading.flow.FlowConnector.connect(FlowConnector.java:459)

App > at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:53)

App > at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:100)

App > at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)

App > at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)

App > at scala.util.Success.flatMap(Try.scala:230)

App > at com.twitter.scalding.Job.buildFlow(Job.scala:225)

App > at com.twitter.scalding.Job.run(Job.scala:295)

App > at com.twitter.scalding.Tool.start$1(Tool.scala:102)

App > at com.twitter.scalding.Tool.run(Tool.scala:118)

App > at com.twitter.scalding.Tool.run(Tool.scala:66)

App > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)

App > at com.twitter.scalding.Tool$.main(Tool.scala:126)

App > ... 6 more


ravi kiran holur vijay

unread,
Oct 7, 2016, 6:36:19 PM10/7/16
to cascading-user
In case anyone else runs into this issue, you can fix it using this workaround:

class TestJobFailure(args: Args) extends Job(args) {

val inputFile = "s3://xxx/testdata/latest"
val outputFile = "s3://xxx/delete"
  val outputFile2 = "s3://xxx/delete2"

TypedPipe
.from(TextLine(inputFile)).sample(0.01)
.writeExecution(TypedTsv(outputFile))
.waitFor(scaldingConfig, mode)

println("Finishing up ...")

TypedPipe
.from(Seq(1, 2, 3))
.write(TypedTsv(outputFile2))

}
Reply all
Reply to author
Forward
0 new messages