Lost lines with CoGroup and Tez

40 views
Skip to first unread message

Pierre-Antoine MARC

unread,
May 12, 2017, 11:38:10 AM5/12/17
to cascading-user
Hi,

We're having some issues with the CoGroup operation when used with Tez. In our assembly, we have two source taps: one coming from an avro file (small dataset) and the other from an ORC file (big dataset). When we join the two sets, there should not be too many rejects, The fact is we lose about 1/3 of the initial lines (sometimes more when we change the parallelization). If we use the HashJoin operation instead of CoGroup, we get the expected number of lines. After a bit of digging, it seems like some reducers just don't write any data (ex. if we set 7 reducers, only 3 of them will have an output). We also noticed that when we do an operation on the join fields (like a trim), the CoGroup operation works fine. We don't have this issue when working with MapReduce.

public static void main(String ... args) throws IOException {

       
Configuration hadoopConfiguration = new Configuration();

       
Properties properties = new Properties();

       
CascadingUtils.configureYarnQueue(properties, hadoopConfiguration);

       
OrcTapFactory orcTapFactory = new OrcTapFactory();

       
Pipe pipeInAvro = new Pipe("inAvro");
       
Pipe pipeInOrc = new Pipe("inOrc");

       
String nomFichierAvro = "/projets/avroin/archive.avro";

       
Tap inAvro = new Hfs(new AvroScheme(SCHEMA_JSON),nomFichierAvro);

       
String inOrcPath = "/projets/orcin";
       
Tap inOrc = orcTapFactory.createSource(FIELDS_OUT, inOrcPath);

       
String outDirectory = "/projets/out";
       
Tap out = new Hfs(new TextDelimited(FIELDS_OUT,";"),outDirectory);

       
SubAssembly assembly = new CoGroupAssembly(pipeInOrc, pipeInAvro);

       
FlowDef flowDef = FlowDef.flowDef()
               
.setName("TestCoGroup")
               
.setRunID("TestCoGroup")
               
.addSource(pipeInAvro, inAvro)
               
.addSource(pipeInOrc, inOrc)
               
.addTailSink(assembly, out);

        properties
.setProperty(FlowRuntimeProps.GATHER_PARTITIONS,args[0]);

       
new Hadoop2TezFlowConnector(properties).connect(flowDef).complete();

       
long result = 0L;

       
FileSystem fs = FileSystem.get(hadoopConfiguration);

       
RowFileCounter rowFileCounter = new RowFileCounter(outDirectory, fs);

       
Map<String,RowFileCounter.EntityContent> countResult = rowFileCounter.count(false, null, false);

        result
= rowFileCounter.sumEntityContents(countResult);

       
System.out.println("Nombre de lignes dans le fichier de sortie : " + result);

       
exit(0);

   
}

private static class CoGroupAssembly extends SubAssembly {
       
public CoGroupAssembly(Pipe inOrc, Pipe inAvro) {
            setPrevious
(inOrc, inAvro);

            inOrc
= new CoGroup(inOrc, new Fields("id"), inAvro, new Fields(FIELD_ID));

            inOrc
= new Retain(inOrc,FIELDS_OUT);

            setTails
(inOrc);
       
}

   
}

Here is the code we use.

Is there a way to fix this issue?

Chris K Wensel

unread,
May 12, 2017, 12:15:47 PM5/12/17
to cascadi...@googlegroups.com
What version of Cascading are you running.

Note we are at 3.2 now that includes numerous planner fixes.




--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/0c24c61f-2c6a-435b-b134-35b3227961c8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Pierre-Antoine MARC

unread,
May 12, 2017, 12:28:42 PM5/12/17
to cascading-user
We are using the 3.1.1 version on production but after upgrading the tests to 3.2.1, we're still having the same behaviour.

Chris K Wensel

unread,
May 12, 2017, 1:29:31 PM5/12/17
to cascadi...@googlegroups.com
ok. the quickest way for me to test this is for you to create a new platform test in a pull request.


there is no need for a contributor agreement for tests. 

if you can use data already i the project, that’s also best (vs adding new files). 

ckw

Pierre-Antoine MARC

unread,
May 16, 2017, 4:58:03 AM5/16/17
to cascading-user
I ran a few other tests and it seems like the problem comes from the ORC Scheme we are using (https://github.com/HotelsDotCom/corc/, latest version). I tried with the scheme developed by Ebay (https://fr.hortonworks.com/blog/using-orcfile-cascading-apache-crunch/) and the CoGroup works fine.

Would you recommend a library to read ORC files with Cascading though? They don't appear to be all the same!

Patrick Duin

unread,
May 16, 2017, 6:54:53 AM5/16/17
to cascading-user
Hi Pierre-Antoine,

If you have some more info on what you think the problem is with the CORC scheme, could you please open a ticket on our github (https://github.com/HotelsDotCom/corc/)?

Thanks,
 Patrick (hotels.com)

Pierre-Antoine MARC

unread,
May 16, 2017, 7:50:41 AM5/16/17
to cascading-user
Hi Patrick,

A ticket has been created: https://github.com/HotelsDotCom/corc/issues/21

Pierre-Antoine MARC

unread,
May 18, 2017, 4:54:29 AM5/18/17
to cascading-user
After some tests, I think we found what caused the issue. We are joining two sets on a string key that is supposed to have the same length among all the values. It might happen that some keys are wrong and don't have the same length. That's when the join fails. It also has to do with the types declared in the fields.

I put some test code here: https://github.com/Pierre-Antoine/CoGroupIssue
It's possible to review our tests here: https://github.com/HotelsDotCom/corc/issues/21

It looks like the regression comes from Cascading 3.1.0, maybe it has to do with tuple serialization?

Chris K Wensel

unread,
May 18, 2017, 12:30:40 PM5/18/17
to cascadi...@googlegroups.com
I’m a little confused following the threads…

if this can be reproduced with just cascading and no orc taps, please drop me a PR with a test case (and data if that’s necessary to reproduce). Hopefully you can simply bend one of the existing tests.


I do not need a contrib agreement for the PR.

ckw

Reply all
Reply to author
Forward
0 new messages