kryo stack overflow when using scalding

695 views
Skip to first unread message

Jiang Mingwei

unread,
Jun 1, 2013, 3:14:07 PM6/1/13
to cascadi...@googlegroups.com

It seems to happen when I do the join. I tried kryo both 2.17 and 2.21. for scalding I'm using scalding-core_2.9.2 version 0.8.4


cascading.pipe.OperatorException: [TextLine(/apps/product...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:403)] operator Each failed executing operation
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$1.apply(Operations.scala:60)
    at com.twitter.scalding.FlatMapFunction$$anonfun$operate$1.apply(Operations.scala:58)
    at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
    at scala.collection.immutable.List.foreach(List.scala:76)
    at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:58)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
    at cascading.flow.stream.SourceStage.run(SourceStage.java:58)
    at cascading.flow.hadoop.FlowMapper.run(FlowMapper.java:127)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1178)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
Caused by: cascading.flow.stream.DuctException: internal error: ['330649308445', 'Dupes(m-wave    189841126    330649308445    0    9    false    panasonic sc-ptx7 home theater system dvd player, 3.1 speakers black    2011-11-27    2013-05-20    2013-05-20 19:16:54    175711    13    1    441.13    33.99    0    1000        mp9Xw4uzffNzkTpHVysFvnA    v    AspectBloomFilter_ver -> 2.0.3, AttributeExtractionClassifier_ver -> 1.0.50, BmpnProductClassifer_ver -> 1.0.2, Brand -> Panasonic, Brand_ae -> Panasonic, BuyerSegmentClassifier_ver -> 1.0.13, CoinsClassifier_ver -> 1.0.9, EServicesClassifier_ver -> 2.0.0, EZMAPR_ver -> 1.0.5, FlowersClassifier_ver -> 1.0.10, FragrancesClassifier_ver -> 1.0.9, GenuineOEM_ver -> 1.0.2, JewelryClassifier_ver -> 1.0.6, LHExposeToTSE -> mslocal, ListQuality -> 0, ListQuality_ver -> 1.0.4, MT -> dvd player, MergedNamespaceNames -> Brand LHExposeToTSE Type, MisCat_ver -> 1.0.1, ProductType_ver -> 1.0.6, SellerMissProductizationClassifier_ver -> 1.0.19, TicketsClassifier_ver -> 2.0.0, Tickets_ver -> 1.1.1, ToysAgeClassifierBundle_ver -> 1.0.16, Type -> DVD Player, UnifiedBrand_ver -> 1.0.5, csatagger_ver -> 1.0.6, gtin_ver -> 1.1.9    All returns accepted -> Returns Accepted, Item must be returned within -> 14 Days, Refund will be given as -> Money back or exchange (buyer's choice), Restocking Fee -> No, Return shipping will be paid by -> Buyer            0.0    89924.0    1.1120378092855157E-8    0    293    Consumer Electronics    1    330649308445    1    Y    203269.0    item-price)']
    at cascading.flow.hadoop.stream.HadoopCoGroupGate.receive(HadoopCoGroupGate.java:98)
    at cascading.flow.hadoop.stream.HadoopCoGroupGate.receive(HadoopCoGroupGate.java:43)
    at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
    at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:93)
    at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:86)
    at com.twitter.scalding.MapFunction.operate(Operations.scala:71)
    at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
    ... 21 more
Caused by: java.lang.StackOverflowError
    at scala.collection.mutable.ListBuffer.<init>(ListBuffer.scala:45)
    at scala.collection.immutable.List$.newBuilder(List.scala:461)
    at scala.collection.generic.GenericTraversableTemplate$class.genericBuilder(GenericTraversableTemplate.scala:68)
    at scala.collection.immutable.List.genericBuilder(List.scala:76)
    at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:49)
    at scala.collection.generic.GenTraversableFactory$GenericCanBuildFrom.apply(GenTraversableFactory.scala:44)
    at scala.collection.immutable.List.$plus$plus(List.scala:170)
    at com.twitter.scalding.Args$$anonfun$toList$1.apply(Args.scala:130)
    at com.twitter.scalding.Args$$anonfun$toList$1.apply(Args.scala:121)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:143)
    at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:143)
    at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:178)
    at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:347)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:143)
    at scala.collection.immutable.HashMap.foldLeft(HashMap.scala:38)
    at com.twitter.scalding.Args.toList(Args.scala:121)
    at com.twitter.scalding.Args.toString(Args.scala:136)
    at com.twitter.scalding.serialization.ArgsSerializer.write(KryoSerializers.scala:73)
    at com.twitter.scalding.serialization.ArgsSerializer.write(KryoSerializers.scala:69)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:487)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:474)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
   
The same stack trace goes on... seems to be a recursive call.


Ken Krugler

unread,
Jun 1, 2013, 3:17:03 PM6/1/13
to cascadi...@googlegroups.com
On Jun 1, 2013, at 12:14pm, Jiang Mingwei wrote:


It seems to happen when I do the join. I tried kryo both 2.17 and 2.21. for scalding I'm using scalding-core_2.9.2 version 0.8.4

This particular exception typically indicates a circular reference problem. One or more of your fields is probably infinitely deep.

Do you have any details of what you're trying to serialize in the fields of your Tuples?

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Jiang Mingwei

unread,
Jun 1, 2013, 3:34:35 PM6/1/13
to cascadi...@googlegroups.com
here's the piece of code i think suspicious

val joined =
     dupes.groupBy(dupe => dupe.group)
             .join(dwListing.groupBy(dwItem => dwItem.item_id))
             .values.map { case (dupe, item) => DupesDW(item, dupe)}

dupes is of type TypedPipe[Dupes], and dwLisitng is of type TypedPipe[DwLstgItem]

after the join, i'm trying to map the values to another case class with some selected fields as defined below.

case class DupesDW(dwItem: DwLstgItem, dupe: Dupes) {
    def slrId = dupe.slr_id
    def criteria = dupe.criteria
    def endDt = dwItem.end_dt_raw
}


should I just pass the individual fields as the class parameter instead of passing in the type itself?


在 2013年6月1日星期六UTC-7下午12时14分07秒,Jiang Mingwei写道:

Ken Krugler

unread,
Jun 1, 2013, 4:55:52 PM6/1/13
to cascadi...@googlegroups.com
On Jun 1, 2013, at 12:34pm, Jiang Mingwei wrote:

here's the piece of code i think suspicious

val joined =
     dupes.groupBy(dupe => dupe.group)
             .join(dwListing.groupBy(dwItem => dwItem.item_id))
             .values.map { case (dupe, item) => DupesDW(item, dupe)}

dupes is of type TypedPipe[Dupes], and dwLisitng is of type TypedPipe[DwLstgItem]

What are Dupes and DwLstgItem?

after the join, i'm trying to map the values to another case class with some selected fields as defined below.

case class DupesDW(dwItem: DwLstgItem, dupe: Dupes) {
    def slrId = dupe.slr_id
    def criteria = dupe.criteria
    def endDt = dwItem.end_dt_raw
}


should I just pass the individual fields as the class parameter instead of passing in the type itself?

Wish I knew Scalding better...

Oscar?

-- Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Oscar Boykin

unread,
Jun 1, 2013, 4:56:15 PM6/1/13
to cascadi...@googlegroups.com
Are any or your case classes inner classes?

If so, this causes all sorts of problems, but is usually easy to fix: move them outside of the class or obect they are defined inside.




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Jiang Mingwei

unread,
Jun 1, 2013, 5:37:14 PM6/1/13
to cascadi...@googlegroups.com
yeah, these are inner classes, will try to move them outside. will update later on how it works. I almost spent a day on this try to nail down the issue. feeling touching a project with completely new language is a pain. :-)


在 2013年6月1日星期六UTC-7下午12时14分07秒,Jiang Mingwei写道:

Oscar Boykin

unread,
Jun 1, 2013, 5:56:24 PM6/1/13
to cascadi...@googlegroups.com
We have at least a plan to fix this, but really the scala compiler should not capture the outer reference if it is not needed.

The plan is to null out the outer objects before serialization using ASM. We had code that used to do this (from spark originally), but the compiler changed the bytecode and that ASM code no longer does the job.


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Jiang Mingwei

unread,
Jun 1, 2013, 9:38:28 PM6/1/13
to cascadi...@googlegroups.com
thanks guys! It worked out after moving case classes outside. Appreciate it!


在 2013年6月1日星期六UTC-7下午12时14分07秒,Jiang Mingwei写道:

Philippe Laflamme

unread,
Jun 3, 2013, 4:14:37 PM6/3/13
to cascading-user
Does this only affect the Typed API? We use Thrift structs compiled by Scrooge and this problem only seems to happen with the Typed API.

Thanks,
Philippe

Oscar Boykin

unread,
Jun 3, 2013, 4:17:33 PM6/3/13
to cascadi...@googlegroups.com
It can happen either way, but the details are slightly different in each case.

If you report an issue on github, we can at least track it.  The scrooge case is a little easier to deal with as we should be able to detect that they are thrift.

Philippe Laflamme

unread,
Jun 4, 2013, 9:29:24 AM6/4/13
to cascading-user
https://github.com/twitter/scalding/issues/449

Sorry for the lack of example, I got rid of the offending code. To reproduce: use scrooge to generate classes and then use an instance of that class in a simple M/R step.

Philippe

Christopher Severs

unread,
Jun 4, 2013, 2:39:28 PM6/4/13
to cascadi...@googlegroups.com
I've seen similar things with really complicated Avro specific records. I ended up just writing an AvroSerialization class in cascading.avro which uses Avro itself to do the serialization. The user can then include it in the config if they need it. Maybe the same thing would work for Thrift since it already knows how to do serialization.
Reply all
Reply to author
Forward
0 new messages