Mike
unread,Nov 5, 2009, 8:27:28 PM11/5/09Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to cascading-user
Hello,
I'm new at using Cascading and seem have run into a problem doing a
Cartesian product where two reducers are stuck using 100% cpu and
continuously increases in memory consumption. Any help would be
appreciated.
I have 100k rows of data that looks something like the following:
13992,196,2009-04-27 20:00:00
13992,92209,2009-04-27 20:15:00
13992,219377,2009-04-27 20:15:00
13992,360314,2009-04-27 20:00:00
13992,360318,2009-04-27 20:00:00
13992,379429,2009-04-27 20:00:00
13992,381434,2009-04-27 20:00:00
13992,381434,2009-04-27 20:15:00
14472,29015,2009-04-27 20:00:00
14472,315950,2009-04-27 20:00:00
...
41017912,289566,2009-04-27 20:15:00
41017912,320185,2009-04-27 20:15:00
41017912,355199,2009-04-27 20:15:00
41020656,89259,2009-04-27 20:15:00
41020656,380496,2009-04-27 20:15:00
41020656,384279,2009-04-27 20:15:00
41022232,23732,2009-04-27 20:00:00
41022232,328388,2009-04-27 20:00:00
41022232,332792,2009-04-27 20:00:00
41022232,380914,2009-04-27 20:00:00
And I'm running the following cascading program against the data. The
program is just reading the data line by line and doing a self join on
the third column, the timestamp value (there are only 2 distinct
values in the dataset). So it's essentially doing a Cartesian product
per timestamp, and then the program will get a count of each group. I
am actually intending to do a Cartesian product here because I
eventually want this program operate on the first and second column
values instead of counting. In either case, it seems that the self
join has an issue because this causes my 3-node hadoop cluster to get
stuck on two reducers in the first of two jobs: count-per-group[(1/2)
TempHfs["SequenceFile[['id_1', 'dt_1', 'id_2', 'dt_2']]"][assembly/
52825/]].
package wordcount;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.aggregator.Count;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
public final class Main
{
public static void main(String[] args)
{
Scheme sourceScheme1 = new TextLine(new Fields("m1"));
Tap source1 = new Hfs(sourceScheme1, args[0]);
Scheme sinkScheme = new TextLine();
Tap sink = new Hfs(sinkScheme, args[1], SinkMode.REPLACE);
Pipe assembly = new Pipe("assembly");
assembly = new Each(assembly, new Fields("m1"), new
CSVSplitFunction());
Fields common = new Fields("dt");
Fields declared = new Fields("id_1", "dt_1", "id_2", "dt_2");
assembly = new CoGroup(assembly, common, 1, declared);
assembly = new GroupBy(assembly, new Fields("id_1", "id_2",
"dt_1"));
Aggregator count = new Count(new Fields("count"));
assembly = new Every(assembly, count);
// set the current job jar
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("count-per-group", source1,
sink, assembly);
flow.complete();
}
private static class CSVSplitFunction extends BaseOperation
implements Function
{
public CSVSplitFunction()
{
super(1, new Fields("id", "dt"));
}
@Override
public void operate(FlowProcess arg0, FunctionCall arg1)
{
TupleEntry arguments = arg1.getArguments();
Tuple result = new Tuple();
String line = arguments.getString("m1");
String[] fields = line.split(",");
result.addAll(fields[1], fields[2]);
arg1.getOutputCollector().add(result);
}
}
}
The last couple lines of the stalled reducers look something like
this:
2009-11-05 17:09:22,212 INFO org.apache.hadoop.mapred.ReduceTask: In-
memory merge complete: 0 files left.
2009-11-05 17:09:22,212 INFO org.apache.hadoop.mapred.ReduceTask:
Initiating final on-disk merge with 1 files
2009-11-05 17:09:22,236 INFO org.apache.hadoop.mapred.Merger: Merging
1 sorted segments
2009-11-05 17:09:22,242 INFO org.apache.hadoop.mapred.Merger: Down to
the last merge-pass, with 1 segments left of total size: 2427490 bytes
2009-11-05 17:09:22,406 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 1
2009-11-05 17:09:22,647 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 2
2009-11-05 17:09:22,823 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 3
2009-11-05 17:09:22,998 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 4
2009-11-05 17:09:23,161 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 5
2009-11-05 17:09:23,324 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 6