Scalding/Twitter Bijection Issue

52 views
Skip to first unread message

Brian Arnold

unread,
Feb 24, 2015, 6:37:41 PM2/24/15
to bije...@googlegroups.com
Hi,

I have recently upgraded our Scalding codebase from Scalding 0.8.11 on Scala 2.9.3, to Scalding 0.13.1 on Scala 2.10.  I fixed all issues except one that I am facing with the Avro bijection that we were using.    

I am now receiving the following error when serializing the avro objects to a byte array using the BinaryAvroCodec in Twitter Bijection (this worked before in our old codebase).  The avro object that we are serializing contains nested avro objects.  If I comment out setting the nested avro objects (see below), my jobs works fine.  But if I set these three nested avro objects prior to serialization, then it blows up with the error below.  Also, I am unable to recreate the issue locally using runHadoop in my scalding tests.  The bijections works fine on all my local tests, but blows up when running on the cluster. Does anyone have any idea what issue I may be running into?

Thanks!
Brian

private def buildProduct(productNbr: String, description: String, reviewCount: Int, averageRating: String, prices: List[RecommendationPrice], variations: String, skuMetaData: Jlist[SkuMetaData], promotions: List[Promotions]): Product = {
    Product.newBuilder()   // <--- Product is also an avro object
      .setProductNbr(productNbr)
      .setDescription(description)
      .setReviewCount(reviewCount)
      .setAverageRating(averageRating)
      //.setPrices(prices)        // <--- list of avro objects
      .setPColorVariations(variations)
      //.setSkuMetaData(skuMetaData)      // <--- list of avro objects
      //.setPromotions(promotions)    // <--- list of avro objects
      .build
  }  

  type Codec[T] = Injection[T, Array[Byte]]
  
  case class Base64String(str: String) extends BaseEncoding

  
  implicit lazy val bytes2Base64: Bijection[Array[Byte], Base64String] =
    new AbstractBijection[Array[Byte], Base64String] {
      def apply(bytes: Array[Byte]): Base64String = Base64String(BaseEncoding.base64().encode(bytes))

      override def invert(b64: Base64String): Array[Byte] = BaseEncoding.base64().decode(b64.str)
}
  
  def encodeBase64[T <: SpecificRecordBase: ClassTag](fs: (Fields, Fields))(avroInj: => Codec[T]): EncoderPipe = {
    val result = pipe.mapTo(fs) {
      in: (String, T) =>
        val (key, value) = in
        val base64Inj = implicitly[Injection[Array[Byte], Base64String]]
        val compose = avroInj andThen base64Inj
        (key, compose(value).str)
    }
    EncoderPipe(result)
  }



Product.avdl file

record Product{
         string productNbr;
         union {string,null} description = "null";
         union {int,null} reviewCount = 0;
         union {string,null} averageRating = "null";
         union {array<RecommendationPrice>,null} prices = "null";
         union {string,null} pColorVariations = "NONE";
         union {array<SkuMetaData>, null} skuMetaData="null";
         union {array<Promotions>, null} promotions="null";
   }

I am also using the following dependency versions:

avro: "1.7.5",
hadoop: "2.4.0.2.1.2.0-402",
scalding: "0.13.1",
scala: "2.10"

bijectionAvro: "com.twitter:bijection-avro_$versions.scala:0.7.2",
hadoopCommon: "org.apache.hadoop:hadoop-common:$versions.hadoop",
hadoopMapredClientCore: "org.apache.hadoop:hadoop-mapreduce-client-core:$versions.hadoop",
hadoopMiniCluster: "org.apache.hadoop:hadoop-minicluster:$versions.hadoop", 
scalaLibrary: "org.scala-lang:scala-library:2.10.4",
scalding_core: "com.twitter:scalding-core_$versions.scala:$versions.scalding",
scalding_avro: "com.twitter:scalding-avro_$versions.scala:$versions.scalding",
scalding_commons: "com.twitter:scalding-commons_$versions.scala:$versions.scalding",
avro: "org.apache.avro:avro:$versions.avro",
avroCompiler: "org.apache.avro:avro-compiler:$versions.avro" 

cascading.pipe.OperatorException: [_pipe_19*_pipe_20][mapTo() @ com.company.pipes.EncoderPipe.encodeBase64(EncoderPipe.scala:22)] operator Each failed executing operation
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
	at com.twitter.scalding.MapFunction.operate(Operations.scala:60)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
	at cascading.operation.NoOp.operate(NoOp.java:47)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
	at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
	at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:93)
	at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:136)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.ClassCastException: cascading.tuple.Tuple cannot be cast to org.apache.avro.generic.IndexedRecord
	at org.apache.avro.generic.GenericData.getField(GenericData.java:537)
	at org.apache.avro.generic.GenericData.getField(GenericData.java:552)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
	at com.twitter.bijection.avro.BinaryAvroCodec.apply(AvroCodecs.scala:271)
	at com.twitter.bijection.avro.BinaryAvroCodec.apply(AvroCodecs.scala:267)
	at com.twitter.bijection.Injection$$anon$1.apply(Injection.scala:41)
	at com.company.pipes.EncoderPipe$$anonfun$1.apply(EncoderPipe.scala:29)
at com.company.pipes.EncoderPipe$$anonfun$1.apply(EncoderPipe.scala:23)
at com.twitter.scalding.MapFunction.operate(Operations.scala:59) at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99) ... 24 more

Oscar Boykin

unread,
Feb 24, 2015, 6:56:27 PM2/24/15
to Brian Arnold, Christopher Severs, Mansur Ashraf, bije...@googlegroups.com
Hello Brian.

Sorry the upgrade is not going smoothly. In fact, I have never used Avro once in my life. We accepted that contribution from outside of twitter, but I can't support it as well as it may need.

I've CC'ed Chris Severs as well as Mansur Ashraf (now at Twitter). Both know a lot more about Avro and this code than I do. I hope we can get you going.

--
You received this message because you are subscribed to the Google Groups "bijection" group.
To unsubscribe from this group and stop receiving emails from it, send an email to bijection+...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Christopher Severs

unread,
Feb 24, 2015, 7:19:58 PM2/24/15
to bije...@googlegroups.com
That this is happening only on the cluster is really odd. It doesn't look like the Avro error I see most, which is Kryo having trouble serializing nested Avro records in between map and reduce phases. 

Is it possible that the Hadoop vs. local thing is a red herring and the issue is that on your cluster there are a couple actual pieces of data that are rare (so you don't see them in the local test set) but triggering this problem? I definitely ran into such issues with the above mentioned Kryo stuff. Are some of the Hadoop examples working and then it chokes at some point, or does this happen right away?

Mansur Ashraf

unread,
Feb 25, 2015, 2:15:39 AM2/25/15
to bije...@googlegroups.com
Brian,

It seems like *somehow* a few of your avro record contain cascading tuples? I would suggest two things

1)  try using cascading trap to capture the record that is causing the exception to be thrown
2) remove avro bijection and use plain avro writer and see if this goes away

I honestly dont think its an avro issue but bad data issue.

Brian Arnold

unread,
Feb 25, 2015, 12:34:45 PM2/25/15
to Oscar Boykin, Christopher Severs, Mansur Ashraf, bije...@googlegroups.com
Thats alright, thanks Oscar! I have spoke with Mansur about this issue a little already.  Chris do you have any thoughts on this issue?

Thanks
Brian

Chris Severs

unread,
Feb 26, 2015, 3:10:01 AM2/26/15
to Brian Arnold, Oscar Boykin, bije...@googlegroups.com, Mansur Ashraf
Hi Brian,

I made a post on the bijection Google Group. Basically I was wondering if the failure on the cluster was happening for every piece of data or only some. 

Brian Arnold

unread,
Feb 26, 2015, 5:59:21 PM2/26/15
to Chris Severs, Oscar Boykin, bije...@googlegroups.com, Mansur Ashraf
Hey Chris,

Yes, it is happening to all data, not just a few records.  So as a total hack for now, I actually got this to work by serializing the Avro object map-side with the bijection, and then deserializing reduce side where I need to reference the fields in the avro object.  So somehow something is transforming my avro object, or some of the containing fields into a cascading Tuple.

Thanks

Christopher Severs

unread,
Mar 9, 2015, 1:21:35 PM3/9/15
to bije...@googlegroups.com, chris....@gmail.com, os...@twitter.com, mansoor...@gmail.com
I wish I knew more about what bijection is doing in order to help better. My guess is that since this isn't during serialization to/from disk that there is an implicit lurking somewhere that is being a bit too zealous. It sounds like doing the conversions you need explicitly is probably the best answer for now. 
Reply all
Reply to author
Forward
0 new messages