Scalding Twitter Bijection Issue

160 views
Skip to first unread message

Brian Arnold

unread,
Feb 24, 2015, 6:35:18 PM2/24/15
to cascadi...@googlegroups.com
Hi,

I have recently upgraded our Scalding codebase from Scalding 0.8.11 on Scala 2.9.3, to Scalding 0.13.1 on Scala 2.10.  I fixed all issues except one that I am facing with the Avro bijection that we were using.    

I am now receiving the following error when serializing the avro objects to a byte array using the BinaryAvroCodec in Twitter Bijection (this worked before in our old codebase).  The avro object that we are serializing contains nested avro objects.  If I comment out setting the nested avro objects (see below), my jobs works fine.  But if I set these three nested avro objects prior to serialization, then it blows up with the error below.  Also, I am unable to recreate the issue locally using runHadoop in my scalding tests.  The bijections works fine on all my local tests, but blows up when running on the cluster. Does anyone have any idea what issue I may be running into?

Thanks!
Brian

private def buildProduct(productNbr: String, description: String, reviewCount: Int, averageRating: String, prices: List[RecommendationPrice], variations: String, skuMetaData: Jlist[SkuMetaData], promotions: List[Promotions]): Product = {
    Product.newBuilder()   // <--- Product is also an avro object
      .setProductNbr(productNbr)
      .setDescription(description)
      .setReviewCount(reviewCount)
      .setAverageRating(averageRating)
      //.setPrices(prices)        // <--- list of avro objects
      .setPColorVariations(variations)
      //.setSkuMetaData(skuMetaData)      // <--- list of avro objects
      //.setPromotions(promotions)    // <--- list of avro objects
      .build
  }  

  type Codec[T] = Injection[T, Array[Byte]]
  
  case class Base64String(str: String) extends BaseEncoding

  
  implicit lazy val bytes2Base64: Bijection[Array[Byte], Base64String] =
    new AbstractBijection[Array[Byte], Base64String] {
      def apply(bytes: Array[Byte]): Base64String = Base64String(BaseEncoding.base64().encode(bytes))

      override def invert(b64: Base64String): Array[Byte] = BaseEncoding.base64().decode(b64.str)
}
  
  def encodeBase64[T <: SpecificRecordBase: ClassTag](fs: (Fields, Fields))(avroInj: => Codec[T]): EncoderPipe = {
    val result = pipe.mapTo(fs) {
      in: (String, T) =>
        val (key, value) = in
        val base64Inj = implicitly[Injection[Array[Byte], Base64String]]
        val compose = avroInj andThen base64Inj
        (key, compose(value).str)
    }
    EncoderPipe(result)
  }



Product.avdl file

record Product{
         string productNbr;
         union {string,null} description = "null";
         union {int,null} reviewCount = 0;
         union {string,null} averageRating = "null";
         union {array<RecommendationPrice>,null} prices = "null";
         union {string,null} pColorVariations = "NONE";
         union {array<SkuMetaData>, null} skuMetaData="null";
         union {array<Promotions>, null} promotions="null";
   }

I am also using the following dependency versions:

avro: "1.7.5",
hadoop: "2.4.0.2.1.2.0-402",
scalding: "0.13.1",
scala: "2.10"

bijectionAvro: "com.twitter:bijection-avro_$versions.scala:0.7.2",
hadoopCommon: "org.apache.hadoop:hadoop-common:$versions.hadoop",
hadoopMapredClientCore: "org.apache.hadoop:hadoop-mapreduce-client-core:$versions.hadoop",
hadoopMiniCluster: "org.apache.hadoop:hadoop-minicluster:$versions.hadoop", 
scalaLibrary: "org.scala-lang:scala-library:2.10.4",
scalding_core: "com.twitter:scalding-core_$versions.scala:$versions.scalding",
scalding_avro: "com.twitter:scalding-avro_$versions.scala:$versions.scalding",
scalding_commons: "com.twitter:scalding-commons_$versions.scala:$versions.scalding",
avro: "org.apache.avro:avro:$versions.avro",
avroCompiler: "org.apache.avro:avro-compiler:$versions.avro" 

cascading.pipe.OperatorException: [_pipe_19*_pipe_20][mapTo() @ com.company.pipes.EncoderPipe.encodeBase64(EncoderPipe.scala:22)] operator Each failed executing operation
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
	at com.twitter.scalding.MapFunction.operate(Operations.scala:60)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.FunctionEachStage$1.collect(FunctionEachStage.java:80)
	at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
	at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
	at cascading.operation.NoOp.operate(NoOp.java:47)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
	at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
	at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:45)
	at cascading.flow.stream.OpenDuct.receive(OpenDuct.java:28)
	at cascading.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:93)
	at cascading.flow.hadoop.FlowReducer.reduce(FlowReducer.java:136)
	at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:444)
	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:415)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.lang.ClassCastException: cascading.tuple.Tuple cannot be cast to org.apache.avro.generic.IndexedRecord
	at org.apache.avro.generic.GenericData.getField(GenericData.java:537)
	at org.apache.avro.generic.GenericData.getField(GenericData.java:552)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:131)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:68)
	at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:106)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
	at com.twitter.bijection.avro.BinaryAvroCodec.apply(AvroCodecs.scala:271)
	at com.twitter.bijection.avro.BinaryAvroCodec.apply(AvroCodecs.scala:267)
	at com.twitter.bijection.Injection$$anon$1.apply(Injection.scala:41)
	at com.company.pipes.EncoderPipe$$anonfun$1.apply(EncoderPipe.scala:29)
at com.company.pipes.EncoderPipe$$anonfun$1.apply(EncoderPipe.scala:23)
at com.twitter.scalding.MapFunction.operate(Operations.scala:59) at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99) ... 24 more
Reply all
Reply to author
Forward
0 new messages