Scalding: Handling errors in input file using TypedJson

261 views
Skip to first unread message

Lakshmi

unread,
Jul 7, 2015, 4:32:51 PM7/7/15
to cascadi...@googlegroups.com
Hi,

I would like to understand the best way to handle input data errors. My json input file (data exported from a db) has several rows in an older format I don't want to include in the map/reduce job. These rows are missing some fields or generally have some bad data. So I would like to add error handling to skip the row and move on to the next one. However I am not sure how to make this work with my TypedJson input. 

My current workaround will have to get rid of the bad rows in a sanitization step outside of the job. Is there any other recommended approach?

val workoutPipe = TypedPipe.from(TypedJson[Workout](args("input")))

 Here is a sample error:

Exception in thread "main" cascading.flow.FlowException: local step failed
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:230)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:150)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:474)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
... 4 more
Caused by: com.twitter.bijection.InversionFailure: Failed to invert: {"EntryTime":1211394503127,"Id":"w2kr01","LocalEntryTime":"2008-05-21T10:28:23.127-08:00","startTime":"2008-05-21T10:28:23.127-08:00"}
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:43)
at com.twitter.bijection.InversionFailure$$anonfun$partialFailure$1.applyOrElse(InversionFailure.scala:42)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at scala.util.Failure.recoverWith(Try.scala:202)
at com.twitter.bijection.Inversion$.attempt(Inversion.scala:30)
at com.twitter.scalding.TypedJson$$anon$1.invert(TypedJson.scala:31)
at com.twitter.scalding.TypedJson$$anon$1.invert(TypedJson.scala:28)
at com.twitter.scalding.TypedJson$$anonfun$transformForRead$3.apply(TypedJson.scala:52)
at com.twitter.scalding.TypedJson$$anonfun$transformForRead$3.apply(TypedJson.scala:52)
at com.twitter.scalding.MapFunction.operate(Operations.scala:54)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 8 more
Caused by: org.json4s.package$MappingException: No usable value for X
Did not find value which can be converted into java.lang.String
at org.json4s.reflect.package$.fail(package.scala:96)
at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:462)
at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:482)
at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$14.apply(Extraction.scala:482)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$instantiate(Extraction.scala:470)
at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:515)
at org.json4s.Extraction$ClassInstanceBuilder$$anonfun$result$6.apply(Extraction.scala:512)
at org.json4s.Extraction$.org$json4s$Extraction$$customOrElse(Extraction.scala:524)
at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:512)
at org.json4s.Extraction$.extract(Extraction.scala:351)
at org.json4s.Extraction$.extract(Extraction.scala:42)
at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
at org.json4s.native.Serialization$.read(Serialization.scala:71)
at com.twitter.scalding.TypedJson$$anon$1$$anonfun$invert$1.apply(TypedJson.scala:31)
at com.twitter.scalding.TypedJson$$anon$1$$anonfun$invert$1.apply(TypedJson.scala:31)
at com.twitter.bijection.Inversion$$anonfun$attempt$1.apply(Inversion.scala:30)
at scala.util.Try$.apply(Try.scala:191)
... 15 more
Caused by: org.json4s.package$MappingException: Did not find value which can be converted into java.lang.String
at org.json4s.Extraction$.convert(Extraction.scala:603)
at org.json4s.Extraction$.extract(Extraction.scala:350)
at org.json4s.Extraction$ClassInstanceBuilder.org$json4s$Extraction$ClassInstanceBuilder$$buildCtorArg(Extraction.scala:450)
... 36 more



Thanks,
Lakshmi

Oscar Boykin

unread,
Jul 7, 2015, 4:44:57 PM7/7/15
to cascadi...@googlegroups.com
This code was implemented for output, not really input, so sanitation was not considered.

You could subclass it and modify the decoding to ignore a certain number of errors, or you can do the sanitation phase as you are currently doing. Up to you.

Cleaning the data is generally a good idea, in my view.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/7575635d-b035-4720-806f-e7a47898f203%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Lakshmi

unread,
Jul 7, 2015, 4:54:07 PM7/7/15
to cascadi...@googlegroups.com
Thanks, Oscar. I was considering a subclass too and will use the approach if my input files look too large for a simple shell command cleanup. When you say "a certain number of errors", are you referring to just having a fail fast approach so the job doesn't go too far if there are too many errors? 

Lakshmi

Oscar Boykin

unread,
Jul 7, 2015, 5:35:01 PM7/7/15
to cascadi...@googlegroups.com
Yes. Have a local counter that allows something like 100, or 1000 error records, after that, throw. If you have too much bad data, often your assumptions are wrong.



For more options, visit https://groups.google.com/d/optout.

Lakshmi

unread,
Jul 7, 2015, 5:38:29 PM7/7/15
to cascadi...@googlegroups.com
Got it, thanks!

Lakshmi
Reply all
Reply to author
Forward
0 new messages