I have a custom Aggregator which I need to run in multiple reduce tasks for performance purposes (by setting mapred.reduce.tasks in the stepconfigdef). However when I do this, I am seeing that all tuples belonging to the same group key are not being sent to the same reduce task. Therefore the output of my aggregation is incorrect. Things work correctly when I use a single reducer (mapred.reduce.tasks = 1). The group key is a composite key of 8-9 fields, which are either Long or String or Enum. I have supplied the appropriate serializations for all the Enum classes used using TupleSerializationProps. So is this a problem of serialization & comparison, or is there something wrong in using Aggregator in this manner?
Fields groupFields = new Fields(a,b,c,d,e);
pipe = new GroupBy(pipe, groupFields);
Aggregator<Context> aggregator = new CustomAggregator<Context>(new Fields(z));
pipe = new Every(pipe, new Fields(f), aggregator, new Fields(a,b,c,d,e,z));
pipe.getStepConfigDef().setProperty("mapred.reduce.tasks", 10);