Hi,
I often have a situation when I have a single large data source from which I need to produce multiple outputs (cubes) which are rolled up on different combinations of columns, each using different sets of columns for secondary sort with different logic implemented in the reducer.
For example in PIG it is quite simple:
inpt = load 'data' using PigStorage(',') as (d1,d2,d3,d4,d5,d6,d7, v1:float, v2:float);
grp1 = group inpt by (d1,d3,d5);
res1 = foreach grp1 {
uniq = distinct inp.d6;
generate FLATTEN(group), COUNT(uniq), SUM(v1);
}
grp1 = group inpt by (d2,d4,d5);
res1 = foreach grp1 {
sorted = ORDER inpt BY v2;
top = LIMIT sorted 10;
generate FLATTEN(group), FLATTEN(top), SUM(v1);
}
…... many more
STORE grp2 INTO …
…...
STORE grpN INTO …
It causes only 1 Map Reduce job. But it becomes more complicated if I need to do some joins (map and reducer side ones) and extra complicated ETL logic, which adds to the execution time, job compilations and extra Map Reduce jobs. Also data skews and distinct operations cause extra issues.
I know how to avoid extra jobs by implementing it in Pangool and use secondary sorting, but unfortunately it is not possible to specify multiple group by combinations, which would reduce extra MR jobs and orchestration logic.
I have started to look into Pangool's source code and my feeling is that theoretically it is possible, but will require some changes to the comparators, partitioners and proxy Reducers. Configuration objects would need small changes as well, but the rest of it is there.
My suggestion would be to have something like this:
TupleMRBuilder mr = new TupleMRBuilder(conf, "Name");
mr.addInput(new Path(args[0]), inputFormat, MapperImplementation);
MRStream stream1 = mr.addStream(“report 1”);
stream1.addIntermediateSchema(schema_1);
stream1.addIntermediateSchema(scheam_2);
stream1.setFieldAliases(“sc1”, new Aliases().add("local", "d1"));
stream1.setGroupByFields("d1", “d2”, “d3”);
stream1.setOrderBy(new OrderBy().add("d1", Order.ASC).addSchemaOrder(Order.DESC));
stream1.setTupleReducer(new ReportReducer_1());
MRStream stream2 = mr.addStream(“report 2”);
stream2.addIntermediateSchema(schema_1);
stream2.setGroupByFields("d2", “d4”, “d5”);
stream2.setTupleReducer(new ReportReducer_2());
MRStream stream3 = mr.addStream(“report 3”);
stream3.addIntermediateSchema(schema_1);
stream3.setGroupByFields("d6", “d7”);
stream3setTupleReducer(new ReportReducer_3());
mr.addNamedOutput(FailureSchema.outputFolderName, new HadoopOutputFormat(TextOutputFormat.class), ITuple.class, NullWritable.class);
mr.addNamedOutput(failure_dir, new HadoopOutputFormat(TextOutputFormat.class), ITuple.class, NullWritable.class);
mr.addNamedOutput(output_dir, new PangoolSequenceOutputFormat("\u0001"), ITuple.class, NullWritable.class);
mr.setOutput(new Path(args[2]), this.getOutpuFormat(), ITuple.class, NullWritable.class);
Main idea is to be able to use the same Mapper or Mappers with multiple Reducers and different group by fields. Thus creating different data streams between the Mapper and Reducer avoiding the need for having multiple map reduce jobs.
One other benefit is modularity, that allows simple addition/removal of the first wave reducers to the same data sources
What would be your opinion on that?
I also do not mind of trying to make the changes as I would need this kind of behaviour in the future and Pangool would be the perfect platform for it.
Thanks,
Alexei