Multiple group bys in the same MR Job

23 views
Skip to first unread message

Alexei Perelighin

unread,
Mar 28, 2014, 12:46:09 PM3/28/14
to pangoo...@googlegroups.com

Hi,


I often have a situation when I have a single large data source from which I need to produce multiple outputs (cubes) which are rolled up on different combinations of columns, each using different sets of columns for secondary sort with different logic implemented in the reducer.


For example in PIG it is quite simple:

inpt = load 'data' using PigStorage(',') as (d1,d2,d3,d4,d5,d6,d7, v1:float, v2:float);

grp1 = group inpt by (d1,d3,d5);

res1 = foreach grp1 {

uniq = distinct inp.d6;

generate FLATTEN(group), COUNT(uniq), SUM(v1);

}


grp1 = group inpt by (d2,d4,d5);

res1 = foreach grp1 {

sorted = ORDER inpt BY v2;

top = LIMIT sorted 10;

generate FLATTEN(group), FLATTEN(top), SUM(v1);

}


…... many more


STORE grp1 INTO …

STORE grp2 INTO …

…...

STORE grpN INTO …


It causes only 1 Map Reduce job. But it becomes more complicated if I need to do some joins (map and reducer side ones) and extra complicated ETL logic, which adds to the execution time, job compilations and extra Map Reduce jobs. Also data skews and distinct operations cause extra issues.


I know how to avoid extra jobs by implementing it in Pangool and use secondary sorting, but unfortunately it is not possible to specify multiple group by combinations, which would reduce extra MR jobs and orchestration logic.


I have started to look into Pangool's source code and my feeling is that theoretically it is possible, but will require some changes to the comparators, partitioners and proxy Reducers. Configuration objects would need small changes as well, but the rest of it is there.


My suggestion would be to have something like this:


TupleMRBuilder mr = new TupleMRBuilder(conf, "Name");

mr.addInput(new Path(args[0]), inputFormat, MapperImplementation);


MRStream stream1 = mr.addStream(“report 1”);

stream1.addIntermediateSchema(schema_1);

stream1.addIntermediateSchema(scheam_2);

stream1.setFieldAliases(“sc1”, new Aliases().add("local", "d1"));

stream1.setGroupByFields("d1", “d2”, “d3”);

stream1.setOrderBy(new OrderBy().add("d1", Order.ASC).addSchemaOrder(Order.DESC));

stream1.setTupleReducer(new ReportReducer_1());


MRStream stream2 = mr.addStream(“report 2”);

stream2.addIntermediateSchema(schema_1);

stream2.setGroupByFields("d2", “d4”, “d5”);

stream2.setTupleReducer(new ReportReducer_2());


MRStream stream3 = mr.addStream(“report 3”);

stream3.addIntermediateSchema(schema_1);

stream3.setGroupByFields("d6", “d7”);

stream3setTupleReducer(new ReportReducer_3());


mr.addNamedOutput(FailureSchema.outputFolderName, new HadoopOutputFormat(TextOutputFormat.class), ITuple.class, NullWritable.class);

mr.addNamedOutput(failure_dir, new HadoopOutputFormat(TextOutputFormat.class), ITuple.class, NullWritable.class);

mr.addNamedOutput(output_dir, new PangoolSequenceOutputFormat("\u0001"), ITuple.class, NullWritable.class);

mr.setOutput(new Path(args[2]), this.getOutpuFormat(), ITuple.class, NullWritable.class);


Main idea is to be able to use the same Mapper or Mappers with multiple Reducers and different group by fields. Thus creating different data streams between the Mapper and Reducer avoiding the need for having multiple map reduce jobs.


One other benefit is modularity, that allows simple addition/removal of the first wave reducers to the same data sources


What would be your opinion on that?

I also do not mind of trying to make the changes as I would need this kind of behaviour in the future and Pangool would be the perfect platform for it.


Thanks,

Alexei

Pere Ferrera

unread,
Mar 29, 2014, 6:17:27 AM3/29/14
to pangoo...@googlegroups.com
Hello Alexei,

Your proposal is very interesting. I will take a deeper look when I have some more time, to give an informed opinion.
It seems to me that Pangool could support arbitrary group by's (with assignable reducers) per intermediate schema quite easily, because the schema id is already used for sorting, and we have "specific order by". This would be a first step, although not exactly what you ask for. 

On the other hand, of course we would be more than keen to receive your additions to the code!

Alexei Perelighin

unread,
Mar 29, 2014, 8:25:37 AM3/29/14
to pangoo...@googlegroups.com
Hi Pere,

I think of adding stream id to schema id for sorting would be better as order of intermediate schemas might be different for different group by combinations and it will ensure better isolation of streams.

Thanks,
Alexei
> --
> Has recibido este mensaje porque estás suscrito al grupo "pangool-user" de Grupos de Google.
> Para anular la suscripción a este grupo y dejar de recibir sus mensajes, envía un correo electrónico a pangool-user...@googlegroups.com.
> Para acceder a más opciones, visita https://groups.google.com/d/optout.
>

Pere Ferrera

unread,
Apr 1, 2014, 5:06:11 AM4/1/14
to pangoo...@googlegroups.com, alex...@googlemail.com
Hi Alexei,

You're right, a stream id is needed. But actually, it should be possible to add the same schema twice, with a different name (that would do as a trick).

There is one possibility to explore for implementing what you need. It would be to "group by" an Avro field (http://pangool.net/userguide/custom_serialization.html) which would be a union of schemas. In this way the Avro field could contain different combinations of types. But I think you would need to implement a Comparator like in the AvroTopicalWordCount example (https://github.com/datasalt/pangool/blob/master/examples/src/main/java/com/datasalt/pangool/examples/avro/AvroTopicalWordCount.java).

Another possibility would be to have a "common group by" (maybe just a source id), duplicate schemas with different names (maybe using the new Mutator class http://pangool.net/apidocs/com/datasalt/pangool/io/Mutator.html) if needed and use specific order by per-intermediate schema for the extra grouping and sorting. In this case you would need to implement narrow grouping in the reducer itself, maybe using the Rollup API.

None of the above seems like an easy, integrated solution like you propose. But I'm not quite sure about the API redesign for supporting this use case... it looks like a very advanced use case. Obviously, it would be nice that Pangool supports it, but maybe not through a new API redesign, but maybe through another Builder. So that TupleMRBuilder and MapOnlyJobBuilder remain backwards compatible, and the new Builder can be used for those purposes.

Finally, I would like to know if you have numbers on whether it's really more performant to have a single MR Job for this use case. In my experience, trying to make things run in a single MR Job is not always the best. Generally speaking, if the input paths can be read relatively fast in the Map stage, it might be better to have separate jobs that to have one big job that has a huge intermediate output. The intuition is easy: if Hadoop is basically sorting a big list, it's better to sort two independent lists of n that to sort a list of 2n.

Let us know, and I encourage you again to contribute code if you need and want to. If that's the case I would say let's open a ticket in Github for that.

Cheers!

Alexei Perelighin

unread,
Apr 1, 2014, 6:15:56 AM4/1/14
to Pere Ferrera, pangool-user
Hi Pere,

Thanks for the info. I will think of it. I had played with something similar to Mutator, making each intermediate schema to have all columns from the all of the group by definitions plus one extra column schema id. But it was getting quite messy with a lot of boiler plate code.

Backwards compatibility is important, I will keep that in mind,

The purpose of this is to have less boiler plate code and more flexibility, which will reduce programming/maintenance effort of using Java for Map Reduce. For example managing intermediate outputs, keeping track of them is added complexity, will need to have extra code like OOZIE to orchestrate it and if there are changes, adding extra steps can be a pain (carving out existing logic from one MR job and moving it into another)

On the other hand principal of "read once use multiple times" gives a lot of dividends when:
1) lots of ETL over machine produced data;
2) you have relatively small clusters (from 3 to 10 datanodes)  and in virtualized clusters which can be bandwidth starved HDDS are not collocated with the CPUs (it happens a lot when there is no dedicated cluster and you get what you get). In one of such environment I was able to reduce execution time from one hour to 7 minutes just by merging 9 pig scripts which read same data into one;
3) use of algorithmic thinking (heaps, maps, bloom filters, hierarchical structures) allows you to cut conners compared to use of relational thinking (SQL and joins);

A use case:
I work with payment transactions and there are 2 types of transactions: single message transactions and multi message transactions (reserve & confirm or auth & settelment). Usually 80% of all messages are single message transactions and the reports over these transactions use the same combinations of group by fields and each combination has a field that that says weather it is single msg or multi msg transaction. But I can not include data from reserve message if there is no corresponding confirm that can arrive a lot later and would not go into the same mapper. Also in parallel it is necessary to produce a distinct list of clients, devices, services, etc.

One of the solutions is to have an MR job that does the splitting of singles into intermediate folder during map stage and join on multi in the reduce stage and second job that reads both outputs and does all of the group bys. In this case 80% of data was re-written and re-read into cluster just to fit the problem.

Alternative would be to still have 2 MR jobs. But the first one applies all group bys to the 80% of data which is single message and does join on multiple message types and than repeat the same group bys on the remaining 20%.

I will defensively try to code something (subject to time availability) and it would be nice to have a Github ticket for that.

Thanks,
Alexei

Pere Ferrera

unread,
Apr 3, 2014, 1:06:04 PM4/3/14
to pangoo...@googlegroups.com, Pere Ferrera, alex...@googlemail.com
Opened an issue in Github so we can keep track of it there: https://github.com/datasalt/pangool/issues/39
Reply all
Reply to author
Forward
0 new messages