[scalding] how to flatten an avro array

876 views
Skip to first unread message

Felix

unread,
Jun 30, 2014, 5:56:45 PM6/30/14
to cascadi...@googlegroups.com
I have an avro file that have the following definition.
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"UserProfileFlatAggregate\",\"namespace\":\"com.bluekai.analytics.jobs\",\"fields\":[{\"name\":\"uuid\",\"type\":\"string\"},{\"name\":\"profile\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"SiteCatNsTuple\",\"fields\":[{\"name\":\"site\",\"type\":\"int\"},{\"name\":\"ns\",\"type\":\"int\"},{\"name\":\"cat\",\"type\":\"int\"}]}},\"aliases\":[\"bag\"]}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public java.lang.CharSequence uuid;
  @Deprecated public java.util.List<com.bluekai.analytics.jobs.SiteCatNsTuple> profile;

in my code I am trying to flatten the array and generate uuid, cat pairs.
 avroPipe = UnpackedAvroSource[UserProfileFlatAggregate](args("input")).read
             .flatMapTo('profile -> ('uuid, 'cat)){
              userProfile: UserProfileFlatAggregate =>
              val uuid = userProfile.getUuid
              val profile = userProfile.getProfile
              for(t <- profile.asScala) yield (uuid, t.getCat)
      }

this gives the following exception.
Caused by: cascading.pipe.OperatorException: [com.twitter.scalding.a...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:480)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
... 4 more
Caused by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to com.bluekai.analytics.jobs.UserProfileFlatAggregate
at com.bluekai.ds.Categories$$anonfun$4.apply(CategorisJob.scala:36)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:39)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 8 more

I am wondering what is the proper way to flatten that array.

thanks,

Christopher Severs

unread,
Jun 30, 2014, 6:01:16 PM6/30/14
to cascadi...@googlegroups.com
Hi Felix,

The UnpackedAvroSource is already doing some unpacking work for you so you get back a tuple rather than a UserProfileFlatAggregate. If you use PackedAvroSource it should be more like what you're expecting.

------
Chris

Felix

unread,
Jun 30, 2014, 6:18:09 PM6/30/14
to cascadi...@googlegroups.com
thanks Chris for the quick response.  I changed the code to use PackedAvroSource and now I have the following error.
 val avroPipe = PackedAvroSource[UserProfileFlatAggregate](args("input")).read
        .flatMapTo('UserProfileFlatAggregate ->('uuid, 'cat)) {
        userProfile: UserProfileFlatAggregate =>
          val uuid = userProfile.getUuid
          val tuples = userProfile.getProfile
          for (t <- tuples.asScala) yield (uuid, t.getCat)
      }


Caused by: cascading.pipe.OperatorException: [com.twitter.scalding.a...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:480)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
... 4 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.bluekai.analytics.jobs.UserProfileFlatAggregate
at com.bluekai.ds.Categories$$anonfun$19.apply(CategorisJob.scala:75)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:39)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 8 more




Christopher Severs

unread,
Jun 30, 2014, 6:42:14 PM6/30/14
to cascadi...@googlegroups.com
Is the Avro class autogenerated or hand written?

Felix

unread,
Jun 30, 2014, 6:46:30 PM6/30/14
to cascadi...@googlegroups.com
it is auto generated from the avro-tools.jar

Christopher Severs

unread,
Jun 30, 2014, 7:04:12 PM6/30/14
to cascadi...@googlegroups.com
Sorry couple more questions. Which version of Scalding (and scalding-avro) also which version of Avro are you using and which version was the tools jar from?

Thanks,
Chris

Felix

unread,
Jun 30, 2014, 7:25:19 PM6/30/14
to cascadi...@googlegroups.com
Sorry i should have included those information earlier.
 // Scalding
        compile( 'com.twitter:scalding-core_2.9.3:0.9.0rc4' )
        compile( 'com.twitter:scalding-avro_2.9.3:0.9.0rc4' )

The class is generated with the avro-tools-1.7.6.jar and the avro that is used in my local machine is 1.7.4. 

Jonathan Coveney

unread,
Jun 30, 2014, 7:56:13 PM6/30/14
to cascadi...@googlegroups.com
Can you use 0.10.0? I don't know that it will fix it but 090rc4 has some known bugs, some pretty unhappy.

Felix

unread,
Jun 30, 2014, 8:15:48 PM6/30/14
to cascadi...@googlegroups.com
tried the 
        compile( 'com.twitter:scalding-core_2.9.3:0.10.0' )
        compile( 'com.twitter:scalding-avro_2.9.3:0.10.0' )
still the same exception
4/06/30 17:14:17 ERROR stream.SourceStage: caught throwable
cascading.pipe.OperatorException: [com.twitter.scalding.a...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:494)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.bluekai.analytics.jobs.UserProfileFlatAggregate
at com.bluekai.ds.Categories$$anonfun$19.apply(CategoriesJob.scala:73)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 8 more

Christopher Severs

unread,
Jul 1, 2014, 1:49:05 PM7/1/14
to cascadi...@googlegroups.com
I suspect some oddness with Avro but I'm not entirely sure. Can you cut this down to a minimal example so I can run it on my machine and fix it?

Thanks,
Chris

Felix

unread,
Jul 1, 2014, 2:20:37 PM7/1/14
to cascadi...@googlegroups.com
sure thing Chris,


I have attached a sample data. 

I ran the job as  hadoop jar build/libs/Categories.jar com.bluekai.ds.Test --local --input sample_30days_agg_10.avro

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.bluekai.analytics.jobs.UserProfileFlatAggregate
at com.bluekai.ds.Test$$anonfun$3.apply(Test.scala:21)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 8 more
sample_30days_agg_10.avro

Christopher Severs

unread,
Jul 1, 2014, 2:43:32 PM7/1/14
to cascadi...@googlegroups.com
Perfect, thanks! I'll get on it and hopefully we can squash whatever is going wrong.

Felix

unread,
Jul 3, 2014, 12:56:58 PM7/3/14
to cascadi...@googlegroups.com
Chris,

not to rush you or anything, did you find out what caused the exception? 

Thanks,

Felix

Christopher Severs

unread,
Jul 7, 2014, 4:34:36 PM7/7/14
to cascadi...@googlegroups.com
Hi Felix,

Apologies, I didn't get to this yet. I had some family issues come up. I should be able to take a look tomorrow or the next day though.

Thanks,
Chris

Felix

unread,
Jul 7, 2014, 5:33:39 PM7/7/14
to cascadi...@googlegroups.com
Chris,

Thanks for getting back, no problem at all, family first. 

Best,

Felix

Felix

unread,
Jul 17, 2014, 1:00:08 PM7/17/14
to cascadi...@googlegroups.com
Chris,

Any updates? do you have a workaround for me to use?

thanks,

Felix

Christopher Severs

unread,
Jul 22, 2014, 12:40:37 PM7/22/14
to cascadi...@googlegroups.com
Hi Felix,

Hopefully in the next couple days. I blocked off time on Thursday afternoon to work on this.

Sorry for the delay,
Chris
Message has been deleted

Felix

unread,
Aug 15, 2014, 4:40:24 PM8/15/14
to cascadi...@googlegroups.com
Chris,

any updates on this issue? 

Thanks,

Felix

Christopher Severs

unread,
Aug 20, 2014, 2:43:06 PM8/20/14
to cascadi...@googlegroups.com
Hey Felix,

Nothing yet, sorry. My normal day job has pulled me away from Scalding stuff for a bit so it's pretty far down the list of stuff for me to get to. If you, or someone else, wants to take a stab at fixing I would be really happy to offer assistance. Outside of that though I'll get to it when I can but as you've noticed that might be a bit. 

Sorry I don't have a better answer for this. 

-------
Chris

dlaba...@shutterstock.com

unread,
Mar 2, 2015, 6:04:51 PM3/2/15
to cascadi...@googlegroups.com
Was there any progress on this? I believe I'm having the same issue.
I'm using 
 avro 1.7.7
 scala 2.10.4
 scalding 2.11
I generated a java object for my avro schema with avro tools

My scalding code is just

val packedData = new PackedAvroSource[DetailPage](Seq("detailPage.avro"))
packedData.read
.filter[DetailPage]('DetailPage){dp:DetailPage => dp.getLanguage == "fr"}
.write(Tsv("./detail.txt"))

And I get
Caused by: cascading.pipe.OperatorException: [com.twitter.scalding.a...][filter() @ com.shutterstock.scalding.DetailPageTx.<init>(DetailPageTx.scala:65)] operator Each failed executing operation
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:81)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:34)
at cascading.flow.stream.SourceStage.map(SourceStage.java:102)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
... 4 more
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.shutterstock.avro.schema.DetailPage
at com.shutterstock.scalding.DetailPageTx$$anonfun$1.apply(DetailPageTx.scala:65)
at com.twitter.scalding.FilterFunction.isRemove(Operations.scala:260)
at cascading.flow.stream.FilterEachStage.receive(FilterEachStage.java:70)
... 8 more

I can dig into the code myself if someone gives me direction.

David

Oscar Boykin

unread,
Mar 3, 2015, 3:39:35 PM3/3/15
to cascadi...@googlegroups.com, Ken Krugler
I wonder if this is related:

https://issues.apache.org/jira/browse/AVRO-1240

here is a similar case on Hadoop:
http://stackoverflow.com/questions/15150040/error-using-avro-1-7-4-specific-api-with-mapreduce

And another thread:
http://apache-avro.679487.n3.nabble.com/Type-cast-exception-td756884.html

The above says we should be using: SpecificDatumReader, which we are attempting to use:
https://github.com/twitter/scalding/blob/develop/scalding-avro/src/main/scala/com/twitter/scalding/avro/SchemaType.scala#L73

and is mentioned here:
https://github.com/ScaleUnlimited/cascading.avro/blob/master/scheme/src/main/java/cascading/avro/AvroScheme.java#L346

The bottom line, is that your specific type is being lost after you read off disk and you are left with org.apache.avro.generic.GenericData$Record.

Maybe Ken has some pointers? I don't use Avro. We use thrift and scrooge at Twitter.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/36ac46f1-98fa-4e54-b89a-3109dc963a53%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Christopher Severs

unread,
Mar 9, 2015, 1:04:59 PM3/9/15
to cascadi...@googlegroups.com, kkru...@scaleunlimited.com
Let me double check that we're not doing something strange when the fields API is being used. I can't see why it would be a problem but maybe there is a conversion happening to GenericData$Record that shouldn't be occurring. 

Christopher Severs

unread,
Mar 9, 2015, 1:14:18 PM3/9/15
to cascadi...@googlegroups.com
Looking at this a bit more. Can you double check the version of Avro which is actually getting loaded on the classpath (or at least packed with your jar)? 

Joey Guinta

unread,
Oct 6, 2015, 3:37:25 PM10/6/15
to cascading-user
Hi Chris,

Was this problem ever resolved? I am also running into the same issue. I'd be happy to provide details if you need them.

Thanks!
Reply all
Reply to author
Forward
0 new messages