scalding: serialization issue during join

794 views
Skip to first unread message

jeffo

unread,
May 21, 2013, 2:39:27 PM5/21/13
to cascadi...@googlegroups.com
I am attempting to join a TSV source with an avro source.

I am using scalding 0.8.2 and the scalding.avro project (in this specific job, I'm using the PackedAvroSource).

When I attempt to run the job on my cluster, it throws the following exception during initialization:

Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [unable to pack object: cascading.flow.hadoop.HadoopFlowStep]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:533)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:237)
at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
at com.twitter.scalding.Job.buildFlow(Job.scala:91)
at com.twitter.scalding.Job.run(Job.scala:124)
at com.twitter.scalding.Tool.start$1(Tool.scala:105)
at com.twitter.scalding.Tool.run(Tool.scala:121)
at com.twitter.scalding.Tool.run(Tool.scala:72)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at com.twitter.scalding.Tool$.main(Tool.scala:128)
at com.twitter.scalding.Tool.main(Tool.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:156)
Caused by: cascading.flow.FlowException: unable to pack object: cascading.flow.hadoop.HadoopFlowStep
at cascading.flow.hadoop.HadoopFlowStep.pack(HadoopFlowStep.java:195)
at cascading.flow.hadoop.HadoopFlowStep.getInitializedConfig(HadoopFlowStep.java:170)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:201)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:69)
at cascading.flow.planner.BaseFlowStep.getFlowStepJob(BaseFlowStep.java:680)
at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1148)
at cascading.flow.BaseFlow.initialize(BaseFlow.java:198)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:231)
... 14 more
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at java.util.ArrayList.writeObject(ArrayList.java:570)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at java.util.HashMap.writeObject(HashMap.java:1001)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at cascading.flow.hadoop.util.JavaObjectSerializer.serialize(JavaObjectSerializer.java:57)
at cascading.flow.hadoop.util.HadoopUtil.serializeBase64(HadoopUtil.java:264)
at cascading.flow.hadoop.HadoopFlowStep.pack(HadoopFlowStep.java:191)
... 21 more

I have not made any changes to the Job configuration, everything should be set to the default values.

It looks as if it is attempting to perform the serialization of org.apache.avro.Schema$RecordSchema with Java serialization although the value of io.serializations configuration key is set to org.apache.hadoop.io.serializer.WritableSerialization,cascading.tuple.hadoop.TupleSerialization,com.twitter.scalding.serialization.KryoHadoop.

Any help is greatly appreciated.

Oscar Boykin

unread,
May 21, 2013, 2:51:07 PM5/21/13
to cascadi...@googlegroups.com
Cascading (and thus scalding) use java serialization to serialize the job steps. The RecordSchema is getting captured by your Job, and it is having to be serialized.

You have a few easy options:

1) make the RecordSchema a lazy val, and if it is only used in the mappers/reducers, things will be fine.
2) don't capture this instance in a val, instead create the real object you need in a def and return that (avoid gratuitous vals).
3) Try com.twitter.chill.Meatlocker( ) to box this instance in something that is java serializable, but uses Kryo internally.

Serialization is a pain.


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Oscar Boykin :: @posco :: http://twitter.com/posco

jeffo

unread,
May 21, 2013, 3:13:14 PM5/21/13
to cascadi...@googlegroups.com
Thanks for the quick response.

I'd like to try implementing option 1 or 2 and I just need a bit of clarification since I've no reference to RecordSchema explicitly in my code.

My job (with name substitutions) is simply:

class JoinExample(args: Args) extends Job(args) {

  val foos = Tsv(args("foo"), ('id, 'x, 'y))

  val bars = PackedAvroSource[Bar](args("bar"))

  val joinableBars = bars.map('Bar -> ('barId, 'fooId)) { bar: Bar =>
    (bar.getBarId, bar.getFooId)
  }

  val result = foos.joinWithSmaller(
    'id -> 'fooId,
    joinableBars,
    joiner = new OuterJoin
  )

  result.write(Tsv(args("output")))

So somewhere along the way, Cascading requests that RecordSchema need be serialized using Java serialization -- and just for further clarification: this only happens when I add a join to the job.. previously using the PackedAvroSource and UnpackedAvroSource without joins did not cause this exception to be thrown.

My current course of action is to dig through the source of the scalding.avro and cascading.avro projects to see where they reference this class and attempt to implement your suggestions, however, after reviewing the above code sample, please let me know if this does not sound like the correct direction to take.

Thanks!

Oscar Boykin

unread,
May 21, 2013, 3:48:00 PM5/21/13
to cascadi...@googlegroups.com
Try not capturing bars.  Just do:

  val joinableBars = PackedAvroSource[Bar](args("bar")).map('Bar -> ('barId, 'fooId)) { bar: Bar =>
    (bar.getBarId, bar.getFooId)
  }

See if that works.

Christopher Severs

unread,
May 21, 2013, 4:08:22 PM5/21/13
to cascadi...@googlegroups.com
I'm not sure why it's trying to serialize recordschema. Can you try adding
cascading.avro.serialization.AvroSpecificRecordSerialization to io.serializations from inside your job and see if that fixes it? I don't think it gets picked up automatically in Scalding (and normally isn't needed). If that doesn't work (or Oscar's solution below) let me know and I can look at it more in depth and see if it's a problem in cascading.avro or scalding.avro. 

-----
Chris


jeffo

unread,
May 21, 2013, 6:11:10 PM5/21/13
to cascadi...@googlegroups.com
Thanks Oscar, this worked.

jeffo

unread,
May 21, 2013, 6:13:05 PM5/21/13
to cascadi...@googlegroups.com
I also tried to independently make the change that Chris offered by adding cascading.avro.serialization.AvroSpecificRecordSerialization to io.serializations in an overridden config definition, but that did not work. It reported the same error as previous attempts.

Christopher Severs

unread,
May 21, 2013, 9:03:26 PM5/21/13
to cascadi...@googlegroups.com
I'll try and track down where the issue is and see if I can fix it.
Reply all
Reply to author
Forward
0 new messages