Problem with avro part file compaction using Cascading

17 views
Skip to first unread message

HIMANSHU VERMA

unread,
Dec 3, 2018, 4:07:21 AM12/3/18
to cascading-user
Hi,
I am trying to write a part file compactor in Cascading which reads in thousands of avro files and reduce the number of files to 50. Output has the same schema as input. Avro schema contains a map field. I am getting the following error when I run the compaction:

"error":"Error: cascading.CascadingException: unable to compare stream elements in position: 0
cascading.tuple.hadoop.util.DeserializerComparator.compareUnTypedTuples(DeserializerComparator.java:239)
cascading.tuple.hadoop.util.DeserializerComparator.compareTuples(DeserializerComparator.java:194)
cascading.tuple.hadoop.util.BaseTupleComparator$StreamComparison.compare(BaseTupleComparator.java:97)
cascading.tuple.hadoop.util.BaseTupleComparator.compare(BaseTupleComparator.java:74)
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1265)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:74)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:126)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:129)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:129)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:126)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:126)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:126)
org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:126)
org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1593)
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482)
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:457)
org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:165)
java.security.AccessController.doPrivileged(Native Method)
javax.security.auth.Subject.doAs(Subject.java:422)
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1635)
org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:160)
Caused by: shaded.org.apache.avro.AvroRuntimeException: Can't compare maps!
shaded.org.apache.avro.generic.GenericData.compare(GenericData.java:984)
shaded.org.apache.avro.specific.SpecificData.compare(SpecificData.java:333)
shaded.org.apache.avro.generic.GenericData.compare(GenericData.java:988)
shaded.org.apache.avro.specific.SpecificData.compare(SpecificData.java:333)
shaded.org.apache.avro.generic.GenericData.compare(GenericData.java:961)
shaded.org.apache.avro.specific.SpecificData.compare(SpecificData.java:333)
shaded.org.apache.avro.generic.GenericData.compare(GenericData.java:946)
shaded.org.apache.avro.specific.SpecificRecordBase.compareTo(SpecificRecordBase.java:81)
shaded.org.apache.avro.specific.SpecificRecordBase.compareTo(SpecificRecordBase.java:30)
cascading.tuple.hadoop.util.TupleElementComparator$1.compare(TupleElementComparator.java:49)
cascading.tuple.hadoop.util.TupleElementComparator$1.compare(TupleElementComparator.java:36)
cascading.tuple.hadoop.util.TupleElementComparator.compare(TupleElementComparator.java:87)


Sample Code:
//inputPipe reads thousands of part files from file system
outputPipe
= new GroupBy(inputPipe);
outputPipe.getStepConfigDef().setProperty("mapreduce.job.reduces", String.valueOf(50));
return outputPipe;


I understand the reason why this could happen in reducer sorting phase. Is there any work around for this ?

Thanks in advance.

Regards,
Himanshu

Ken Krugler

unread,
Dec 3, 2018, 10:15:54 AM12/3/18
to cascadi...@googlegroups.com
Why are you comparing Avro records? Are you using the record as part of a grouping key?

Asking because the typical workarounds are a bit painful…

1. Write your own comparator, which you can set on the Fields.

2. Unpack the Avro record into a Tuple (which contains embedded Tuples, and so on)

But in general comparing maps (Avro or not) is conceptually odd, as what makes one map sort before or after another map?

— Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/617846d4-8804-41a0-a605-4e4ad4e26f65%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply all
Reply to author
Forward
0 new messages