unable to load deserializer for: java.util.ArrayList in the reduce phase

152 views
Skip to first unread message

Asaf Mesika

unread,
Mar 2, 2015, 12:29:57 AM3/2/15
to cascadi...@googlegroups.com
Hi,

I'm in the process of writing a custom Tap and Scheme. I have an existing InputFormat which essentially returns a record reader on AvroKey<GenericRecord>. 
I've registered both AvroSerialization and KryoSerialization using CommonConfigurationKeys.IO_SERIALIZATIONS_KEY on my JobConf.

I'm converting to Tuple using AvroToCascading.parseRecord.

The weird exception I'm getting stuck at is in the reduce phase. I wrote a small cascading flow which retrieves information from this tap and does a simple group by. In the reduce task I'm getting this exception:


 

2015-03-02 07:09:46,777 WARN org.apache.hadoop.mapred.Child: Error running child

cascading
.CascadingException: unable to load deserializer for: java.util.ArrayList from: org.apache.hadoop.io.serializer.SerializationFactory

 at cascading
.tuple.hadoop.TupleSerialization.getNewDeserializer(TupleSerialization.java:464)

 at cascading
.tuple.hadoop.TupleSerialization$SerializationElementReader.getDeserializerFor(TupleSerialization.java:654)

 at cascading
.tuple.hadoop.TupleSerialization$SerializationElementReader.read(TupleSerialization.java:621)

 at cascading
.tuple.hadoop.io.HadoopTupleInputStream.readType(HadoopTupleInputStream.java:105)

 at cascading
.tuple.hadoop.io.HadoopTupleInputStream.getNextElement(HadoopTupleInputStream.java:52)

 at cascading
.tuple.io.TupleInputStream.readTuple(TupleInputStream.java:78)

 at cascading
.tuple.io.TupleInputStream.readTuple(TupleInputStream.java:67)

 at cascading
.tuple.hadoop.io.TupleDeserializer.deserialize(TupleDeserializer.java:38)

 at cascading
.tuple.hadoop.io.TupleDeserializer.deserialize(TupleDeserializer.java:28)

 at org
.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1261)

 at org
.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1199)

 at org
.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:255)

 at org
.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:251)

 at cascading
.flow.hadoop.util.TimedIterator.next(TimedIterator.java:74)

 at cascading
.flow.hadoop.HadoopGroupByClosure$1.next(HadoopGroupByClosure.java:113)

 at cascading
.flow.hadoop.HadoopGroupByClosure$1.next(HadoopGroupByClosure.java:71)

 at cascading
.pipe.joiner.InnerJoin$JoinIterator.initLastValues(InnerJoin.java:152)

 at cascading
.pipe.joiner.InnerJoin$JoinIterator.next(InnerJoin.java:184)

 at cascading
.pipe.joiner.InnerJoin$JoinIterator.next(InnerJoin.java:68)

 at cascading
.tuple.TupleEntryChainIterator.next(TupleEntryChainIterator.java:79)

 at cascading
.tuple.TupleEntryChainIterator.next(TupleEntryChainIterator.java:32)

 at cascading
.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)

 at cascading
.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)

 at cascading
.flow.hadoop.stream.HadoopGroupGate.run(HadoopGroupGate.java:93)

 at cascading
.flow.hadoop.FlowReducer.reduce(FlowReducer.java:133)

 at org
.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:506)

 at org
.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:447)

 at org
.apache.hadoop.mapred.Child$4.run(Child.java:268)

 at java
.security.AccessController.doPrivileged(Native Method)

 at javax
.security.auth.Subject.doAs(Subject.java:396)

 at org
.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)

 at org
.apache.hadoop.mapred.Child.main(Child.java:262)

2015-03-02 07:09:46,783 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task


Any clue?

Asaf Mesika

unread,
Mar 2, 2015, 2:09:14 AM3/2/15
to cascadi...@googlegroups.com
I've manage to solve this but it seems ugly to me.

Currently in my custom Scheme implementation, I have in  sourceConfInit method, the following code:
        Collection<String> serializations = conf.getStringCollection(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY);

 

       
if (!serializations.contains(AvroSerialization.class.getName())) {

            serializations
.add(AvroSerialization.class.getName());

       
}

       
if (!serializations.contains(KryoSerialization.class.getName())) {

            serializations
.add(KryoSerialization.class.getName());

       
}

         

        conf
.setStrings(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, serializations.toArray(new String[serializations.size()]));

Only when I added to my cascacding unit test, during the creation of  Hadoop2MR1FlowConnector, the following properties, it worked:
        Collection<String> serializations = Lists.newArrayList();

 

        serializations
.add(KryoSerialization.class.getName());

        properties
.put(CommonConfigurationKeys.IO_SERIALIZATIONS_KEY, Joiner.on(',').join(serializations));

       
Hadoop2MR1FlowConnector flowConnector = new Hadoop2MR1FlowConnector(properties);

Why can't the Scheme say: "I use ArrayList in the Tuple, thus here are the serializers that all the cascading flow need"? 
Currently the developer of any Cascading flow which uses my Tap & Scheme, needs to add the KryoSerialization to the config, if he decides to move around fields which are not primitive. It somehow seems not right to me, but bare in mind I'm not yet familiar with Cascading that much.

Thanks!

Asaf
...

Ken Krugler

unread,
Mar 2, 2015, 9:38:59 AM3/2/15
to cascadi...@googlegroups.com
Hi Asaf,

You can take a look at how we handle conversion from Avro to Cascading Tuples in the cascading.avro scheme.

E.g. the AvroToCascading class.

-- Ken


From: Asaf Mesika

Sent: March 1, 2015 9:29:57pm PST

To: cascadi...@googlegroups.com

Subject: unable to load deserializer for: java.util.ArrayList in the reduce phase


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/60c0096a-e9e8-4d75-8a1e-7910d2a24a97%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Reply all
Reply to author
Forward
0 new messages