Reducers stalled while doing a Cartesian product

113 views
Skip to first unread message

Mike

unread,
Nov 5, 2009, 8:27:28 PM11/5/09
to cascading-user
Hello,

I'm new at using Cascading and seem have run into a problem doing a
Cartesian product where two reducers are stuck using 100% cpu and
continuously increases in memory consumption. Any help would be
appreciated.

I have 100k rows of data that looks something like the following:

13992,196,2009-04-27 20:00:00
13992,92209,2009-04-27 20:15:00
13992,219377,2009-04-27 20:15:00
13992,360314,2009-04-27 20:00:00
13992,360318,2009-04-27 20:00:00
13992,379429,2009-04-27 20:00:00
13992,381434,2009-04-27 20:00:00
13992,381434,2009-04-27 20:15:00
14472,29015,2009-04-27 20:00:00
14472,315950,2009-04-27 20:00:00
...
41017912,289566,2009-04-27 20:15:00
41017912,320185,2009-04-27 20:15:00
41017912,355199,2009-04-27 20:15:00
41020656,89259,2009-04-27 20:15:00
41020656,380496,2009-04-27 20:15:00
41020656,384279,2009-04-27 20:15:00
41022232,23732,2009-04-27 20:00:00
41022232,328388,2009-04-27 20:00:00
41022232,332792,2009-04-27 20:00:00
41022232,380914,2009-04-27 20:00:00

And I'm running the following cascading program against the data. The
program is just reading the data line by line and doing a self join on
the third column, the timestamp value (there are only 2 distinct
values in the dataset). So it's essentially doing a Cartesian product
per timestamp, and then the program will get a count of each group. I
am actually intending to do a Cartesian product here because I
eventually want this program operate on the first and second column
values instead of counting. In either case, it seems that the self
join has an issue because this causes my 3-node hadoop cluster to get
stuck on two reducers in the first of two jobs: count-per-group[(1/2)
TempHfs["SequenceFile[['id_1', 'dt_1', 'id_2', 'dt_2']]"][assembly/
52825/]].

package wordcount;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.aggregator.Count;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.TextLine;
import cascading.tap.Hfs;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;

public final class Main
{

public static void main(String[] args)
{
Scheme sourceScheme1 = new TextLine(new Fields("m1"));
Tap source1 = new Hfs(sourceScheme1, args[0]);

Scheme sinkScheme = new TextLine();
Tap sink = new Hfs(sinkScheme, args[1], SinkMode.REPLACE);

Pipe assembly = new Pipe("assembly");
assembly = new Each(assembly, new Fields("m1"), new
CSVSplitFunction());

Fields common = new Fields("dt");
Fields declared = new Fields("id_1", "dt_1", "id_2", "dt_2");
assembly = new CoGroup(assembly, common, 1, declared);

assembly = new GroupBy(assembly, new Fields("id_1", "id_2",
"dt_1"));

Aggregator count = new Count(new Fields("count"));
assembly = new Every(assembly, count);

// set the current job jar
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );
FlowConnector flowConnector = new FlowConnector(properties);

Flow flow = flowConnector.connect("count-per-group", source1,
sink, assembly);
flow.complete();
}

private static class CSVSplitFunction extends BaseOperation
implements Function
{
public CSVSplitFunction()
{
super(1, new Fields("id", "dt"));
}

@Override
public void operate(FlowProcess arg0, FunctionCall arg1)
{
TupleEntry arguments = arg1.getArguments();

Tuple result = new Tuple();

String line = arguments.getString("m1");
String[] fields = line.split(",");
result.addAll(fields[1], fields[2]);

arg1.getOutputCollector().add(result);
}

}
}

The last couple lines of the stalled reducers look something like
this:

2009-11-05 17:09:22,212 INFO org.apache.hadoop.mapred.ReduceTask: In-
memory merge complete: 0 files left.
2009-11-05 17:09:22,212 INFO org.apache.hadoop.mapred.ReduceTask:
Initiating final on-disk merge with 1 files
2009-11-05 17:09:22,236 INFO org.apache.hadoop.mapred.Merger: Merging
1 sorted segments
2009-11-05 17:09:22,242 INFO org.apache.hadoop.mapred.Merger: Down to
the last merge-pass, with 1 segments left of total size: 2427490 bytes
2009-11-05 17:09:22,406 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 1
2009-11-05 17:09:22,647 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 2
2009-11-05 17:09:22,823 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 3
2009-11-05 17:09:22,998 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 4
2009-11-05 17:09:23,161 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 5
2009-11-05 17:09:23,324 INFO cascading.tuple.SpillableTupleList:
spilling tuple list to file number 6

Chris K Wensel

unread,
Nov 6, 2009, 11:42:37 AM11/6/09
to cascadi...@googlegroups.com

You might try wip-1.1 and see if memory fares better. otherwise you
should allocate more memory (you very well could be paging).

also, are you running with speculative execution on? am wondering why
you have two reducers.

note you can use RegexSplitter to do the split, instead of writing a
function.

ckw
--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Mike

unread,
Nov 6, 2009, 9:30:41 PM11/6/09
to cascading-user
Hi Chris,

I tried wip-1.1 w/ hadoop 0.19.2 as you had suggested. But the
problem still persists with the two reducer tasks. To clarify, there
are 96 reduce tasks in this job and 94 of them finish, but these last
two don't. I don't think it's a memory issue because i've allocated
1GB of heap to each child process (mapred.child.java.opts), but each
of these reducer processes is using ~400MB of memory. Each machine
has 12GB of memory total. A theory I have is that it's trying to do a
full cartesian product in each reducer for each distinct timestamp.
Does this sound like something Cascading planner would do?

Thanks.

Mike

unread,
Nov 6, 2009, 9:30:56 PM11/6/09
to cascading-user
Hi Chris,

I tried wip-1.1 w/ hadoop 0.19.2 as you had suggested. But the
problem still persists with the two reducer tasks. To clarify, there
are 96 reduce tasks in this job and 94 of them finish, but these last
two don't. I don't think it's a memory issue because i've allocated
1GB of heap to each child process (mapred.child.java.opts), but each
of these reducer processes is using ~400MB of memory. Each machine
has 12GB of memory total. A theory I have is that it's trying to do a
full cartesian product in each reducer for each distinct timestamp.
Does this sound like something Cascading planner would do?

Thanks.

On Nov 6, 8:42 am, Chris K Wensel <ch...@wensel.net> wrote:

Chris K Wensel

unread,
Nov 7, 2009, 11:30:40 AM11/7/09
to cascadi...@googlegroups.com
you might try setting
CoGroupClosure.SPILL_THRESHOLD property to a very large number and see
what happens.

and maybe
CoGroupClosure.SPILL_COMPRESS to false;

ckw

Mike

unread,
Nov 9, 2009, 9:35:41 PM11/9/09
to cascading-user
Hi Chris,

I just want to confirm that this is the correct way of doing it for
passing these configurations to the FlowConnector constructor?

properties.setProperty(CoGroupClosure.SPILL_COMPRESS, "false");

or

properties.setProperty(CoGroupClosure.SPILL_THRESHOLD, "100000000");

I tried each of these, and it still performs badly.

And when I increased the SPILL_THRESHOLD, it actually yielded
additional warning like the following:

2009-11-09 18:23:23,822 WARN cascading.pipe.cogroup.CoGroupClosure:
codecs set, but unable to load any:
org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec

Thanks.

Chris K Wensel

unread,
Nov 10, 2009, 12:04:33 PM11/10/09
to cascadi...@googlegroups.com
Yes, that's how you set the properties, and then pass them to the
FlowConnector.

Since your codecs aren't loading, it makes me wonder if your cluster
is configured properly or on an supported platform (windows).

chris

On Nov 9, 2009, at 6:35 PM, Mike wrote:

>
> Hi Chris,
>
> --~--~---------~--~----~------------~-------~--~----~
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> -~----------~----~----~----~------~----~------~--~---

Mike

unread,
Nov 10, 2009, 2:52:05 PM11/10/09
to cascading-user
Oh - I'm running it on a linux cluster. Is cascading only supported
on windows?
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en

Chris K Wensel

unread,
Nov 10, 2009, 2:58:52 PM11/10/09
to cascadi...@googlegroups.com
sorry, typo. meant 'unsupported platform'.

Cascading supports whatever Hadoop supports (its just a library).

Hadoop historically hasn't worked well (at all) on windows.

ckw
> --
>
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=
> .

Chris K Wensel

unread,
Nov 10, 2009, 4:22:24 PM11/10/09
to cascadi...@googlegroups.com
fwiw, I just ran a 40 node cluster in EMR (Cascading 1.0 and hadoop
0.18) where one of the grouping keys required about 6k spills. was
slow, but completed without issues. (had about 160 reducers)

ckw

On Nov 6, 2009, at 6:30 PM, Mike wrote:

>
> --~--~---------~--~----~------------~-------~--~----~
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> -~----------~----~----~----~------~----~------~--~---
>

Chris K Wensel

unread,
Nov 12, 2009, 2:33:02 PM11/12/09
to cascading-user
Hopefully I can get to this relatively soon. Thanks for sending it.

ckw

On Nov 11, 2009, at 2:53 PM, Defenestrator wrote:

Hi Chris,

I've tried it again and confirmed that the cluster was working properly for other workloads.  It works for a much smaller data set (10 rows), but I've had no luck with the 100k data set.

I've attached the program and the 100k dataset that demonstrates what I think is a problem with the cascading planner.

Here are the steps I use to run this, I basically just replaced the Main.java in the wordcount example with my own:

hadoop dfs -copyFromLocal /tmp/m1_100k.csv m1

ant -Dcascading.home=/data/defenestrator/cascading-1.1.0-62-hadoop-0.19.0+ -Dbuild.number=62 jar

/data/defenestrator/hadoop/bin/hadoop jar ./build/wordcount.jar hdfs:///user/defenestrator/m1 hdfs:///user/defenestrator/m1_cartesian_count

I would appreciate any insights into this.  Thanks.
> >> --
> >> Chris K Wensel
> >> ch...@concurrentinc.comhttp://www.concurrentinc.com
> > --~--~---------~--~----~------------~-------~--~----~
> > You received this message because you are subscribed to the Google  
> > Groups "cascading-user" group.
> > To post to this group, send email to cascadi...@googlegroups.com
> > To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
> > -~----------~----~----~----~------~----~------~--~---
> --
> Chris K Wensel
> ch...@concurrentinc.comhttp://www.concurrentinc.com
<m1_100k.csv><Main.java>

Chris K Wensel

unread,
Nov 15, 2009, 6:36:40 PM11/15/09
to Defenestrator, cascading-user
I can't seem to find any evidence this is "hanging".

Do note that you are self-joining against two keys.  where one of them has 60k values, the other 40k values.

> cat m1_100k.csv | cut -d',' -f3 | sort | uniq -c
60046 2009-04-27 20:00:00
39954 2009-04-27 20:15:00

60,046 * 60,046 = 3,605,522,116 tuples

this likely takes time and much disk space to generate.

ckw

> >> --
> >> Chris K Wensel
> >> ch...@concurrentinc.comhttp://www.concurrentinc.com
> > --~--~---------~--~----~------------~-------~--~----~
> > You received this message because you are subscribed to the Google  
> > Groups "cascading-user" group.
> > To post to this group, send email to cascadi...@googlegroups.com
> > To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
> > -~----------~----~----~----~------~----~------~--~---
> --
> Chris K Wensel
> ch...@concurrentinc.comhttp://www.concurrentinc.com
<m1_100k.csv><Main.java>

Defenestrator

unread,
Nov 18, 2009, 8:15:20 PM11/18/09
to Chris K Wensel, cascading-user
Hi Chris,

As I noted in the original post that it is a self-join against two distinct join keys.  Is there a reason why the cascading planner doesn't parallelize the self-join across the available reducers, instead of having a single reducer per join key?

Thanks.

Chris K Wensel

unread,
Nov 18, 2009, 9:44:35 PM11/18/09
to Defenestrator, cascading-user
yes, and that is your problem. you are doing a join against 60k values on each side.

all joins are parallelized via the hash of the joining key. if there is only 1 key value, there is only one hash value, and so only one reducer will used.

ckw

Defenestrator

unread,
Nov 20, 2009, 7:32:45 PM11/20/09
to Chris K Wensel, cascading-user
Is there a fundamental reason for this behavior?  Theoretically, it should be possible to parallelize the self join of even a single join key, right?
Reply all
Reply to author
Forward
0 new messages