class TestJobFailure(args: Args) extends Job(args) {
val inputFile = "s3://xxx/testdata/latest"
val outputFile = "s3://xxx/delete"
val job1 = TypedPipe.from(TextLine(inputFile)).sample(0.01).writeExecution(TypedTsv(outputFile))
val u: Unit = job1.waitFor(scaldingConfig, mode).get
}
App > Exception in thread "main" java.lang.Throwable: GUESS: Cascading requires all sources to have final sinks on disk.
App > If you know what exactly caused this error, please consider contributing to GitHub via following link.
App > at com.twitter.scalding.Tool$.main(Tool.scala:130)
App > at com.twitter.scalding.Tool.main(Tool.scala)
App > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
App > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
App > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
App > at java.lang.reflect.Method.invoke(Method.java:606)
App > at org.apache.hadoop.util.RunJar.main(RunJar.java:163)
App > Caused by: cascading.flow.planner.PlannerException: source taps are required
App > at cascading.flow.planner.FlowPlanner.verifyTaps(FlowPlanner.java:234)
App > at cascading.flow.planner.FlowPlanner.verifyAllTaps(FlowPlanner.java:175)
App > at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:242)
App > at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
App > at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
App > at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:53)
App > at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:100)
App > at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
App > at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
App > at scala.util.Success.flatMap(Try.scala:230)
App > at com.twitter.scalding.Job.buildFlow(Job.scala:225)
App > at com.twitter.scalding.Job.run(Job.scala:295)
App > at com.twitter.scalding.Tool.start$1(Tool.scala:102)
App > at com.twitter.scalding.Tool.run(Tool.scala:118)
App > at com.twitter.scalding.Tool.run(Tool.scala:66)
App > at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
App > at com.twitter.scalding.Tool$.main(Tool.scala:126)
App > ... 6 more
class TestJobFailure(args: Args) extends Job(args) {
val inputFile = "s3://xxx/testdata/latest"
val outputFile = "s3://xxx/delete"
val outputFile2 = "s3://xxx/delete2"
TypedPipe
.from(TextLine(inputFile)).sample(0.01)
.writeExecution(TypedTsv(outputFile))
.waitFor(scaldingConfig, mode)
println("Finishing up ...")
TypedPipe
.from(Seq(1, 2, 3))
.write(TypedTsv(outputFile2))
}