scalability problem

62 views
Skip to first unread message

Nan

unread,
Apr 19, 2011, 3:19:52 PM4/19/11
to cascading-user
Hi,

I am testing a table as the following. The last column denotes an id
attribute. The first row is for "a1,b1,c1,d1", and all other rows are
"a1,b1,c2,d1" but with unique ids.

------------------
a1,b1,c1,d1,1
a1,b1,c2,d1,2
a1,b1,c2,d1,3
a1,b1,c2,d1,4
a1,b1,c2,d1,5
a1,b1,c2,d1,6
... ... ... ...
------------------

I write a simple code as the following:

------------------------------------------------------------------
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, Main.class);
FlowConnector flowConnector = new FlowConnector(properties);

Scheme sourceScheme = new TextDelimited(fields_cols, false, ",");
Tap source = new Hfs(sourceScheme, g_input_file);
Pipe assembly = new Pipe("CFD-violation");

assembly = new Each(assembly, fields_cfd, new FilterCFD(cfd_id,
g_cfd_file));
assembly = new GroupBy(assembly, fields_cfd_lhs);
assembly = new Every(assembly, fields_cfd, new AggreViolations(cfd_id,
g_cfd_file), Fields.ALL);
Scheme sinkScheme = new TextDelimited(Fields.ALL, false, ",");
Tap sink = new Hfs(sinkScheme, output_path, SinkMode.REPLACE);

Flow flow = flowConnector.connect("cfd-detect", source, sink,
assembly);
flow.writeDOT("myCascade.dot");
flow.complete();
------------------------------------------------------------------

It works fine when the file is about 1.5K (100 rows). However, when I
generated a file for 15.5K (1000 rows). The following errors happened.
Please let me know why this happened.

------------------------------------------------------------------
11/04/19 20:17:12 FATAL conf.Configuration: error parsing conf file:
java.io.FileNotFoundException: /disk/scratch/
workspace/.metadata/.plugins/org.apache.hadoop.eclipse/hadoop-
conf-5567698613354107885/core-site.xml (Too many open files)
11/04/19 20:17:12 WARN mapred.LocalJobRunner: job_local_0001
cascading.pipe.OperatorException: [CFD-violation]
[edinburgh.datacleaning.Main.CFD_Violations(Main.java:171)] operator
Each failed executing operation
at cascading.pipe.Each$EachHandler.operate(Each.java:486)
at
cascading.flow.stack.EachMapperStackElement.operateEach(EachMapperStackElement.java:
94)
at
cascading.flow.stack.EachMapperStackElement.collect(EachMapperStackElement.java:
82)
at cascading.flow.stack.FlowMapperStack.map(FlowMapperStack.java:220)
at cascading.flow.FlowMapper.map(FlowMapper.java:75)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
at org.apache.hadoop.mapred.LocalJobRunner
$Job.run(LocalJobRunner.java:177)
Caused by: java.lang.RuntimeException: java.io.FileNotFoundException: /
disk/scratch/workspace/.metadata/.plugins/org.apache.hadoop.eclipse/
hadoop-conf-5567698613354107885/core-site.xml (Too many open files)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:
1162)
at
org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:
1030)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:
980)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:436)
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:103)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)
at edinburgh.datacleaning.CFDHandler.loadCFDs(CFDHandler.java:18)
at edinburgh.datacleaning.FilterCFD.isRemove(FilterCFD.java:26)
at cascading.pipe.Each.applyFilter(Each.java:372)
at cascading.pipe.Each.access$300(Each.java:53)
at cascading.pipe.Each$EachFilterHandler.handle(Each.java:558)
at cascading.pipe.Each$EachHandler.operate(Each.java:478)
... 8 more
Caused by: java.io.FileNotFoundException: /disk/scratch/
workspace/.metadata/.plugins/org.apache.hadoop.eclipse/hadoop-
conf-5567698613354107885/core-site.xml (Too many open files)
at java.io.FileInputStream.open(Native Method)
at java.io.FileInputStream.<init>(FileInputStream.java:106)
at java.io.FileInputStream.<init>(FileInputStream.java:66)
at
sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:
70)
at
sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:
161)
at
com.sun.org.apache.xerces.internal.impl.XMLEntityManager.setupCurrentEntity(XMLEntityManager.java:
653)
at
com.sun.org.apache.xerces.internal.impl.XMLVersionDetector.determineDocVersion(XMLVersionDetector.java:
186)
at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:
772)
at
com.sun.org.apache.xerces.internal.parsers.XML11Configuration.parse(XML11Configuration.java:
737)
at
com.sun.org.apache.xerces.internal.parsers.XMLParser.parse(XMLParser.java:
119)
at
com.sun.org.apache.xerces.internal.parsers.DOMParser.parse(DOMParser.java:
235)
at
com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderImpl.parse(DocumentBuilderImpl.java:
284)
at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:180)
at
org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:
1079)
... 19 more
11/04/19 20:17:13 INFO mapred.LocalJobRunner:
hdfs://hcrc1425n30.inf.ed.ac.uk/user/ntang/dcinput/1g.csv:0+15892
11/04/19 20:17:16 WARN flow.FlowStep: [cfd-detect] task completion
events identify failed tasks
11/04/19 20:17:16 WARN flow.FlowStep: [cfd-detect] task completion
events count: 0
11/04/19 20:17:16 WARN flow.Flow: stopping jobs
11/04/19 20:17:16 INFO flow.FlowStep: [cfd-detect] stopping: (1/1)
Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["/user/ntang/output/CFD1"]"]
11/04/19 20:17:16 WARN flow.Flow: stopped jobs
11/04/19 20:17:16 WARN flow.Flow: shutting down job executor
11/04/19 20:17:16 WARN flow.Flow: shutdown complete
11/04/19 20:17:16 INFO hadoop.Hadoop18TapUtil: deleting temp path /
user/ntang/output/CFD1/_temporary
Exception in thread "main" cascading.flow.FlowException: step failed:
(1/1) Hfs["TextDelimited[[UNKNOWN]->[ALL]]"]["/user/ntang/output/
CFD1"]"], with job id: job_local_0001, please see cluster logs for
failure messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Chris Curtin

unread,
Apr 19, 2011, 3:46:45 PM4/19/11
to cascadi...@googlegroups.com
Hi,

your OS restrictions are too tight look at this:

(Too many open files)

see what your ulimit is set to, assuming linux/unix.

You need to jump it up (on all machines) or have your admin, then restart the cluster.

Hope this helps.

Chris

Nan

unread,
Apr 20, 2011, 8:50:47 AM4/20/11
to cascading-user
Hi,

I am wondering that why the code runs extremely slow.

Basically what I did is to:
(1) group a table (18MB) using one column, and the table contains 5
columns.
(2) for an extreme test case, I let all tuples in one group.
(3) for each group (in my test case, just 1 group), write an aggregate
function,
to count sth in linear time.
(4) output the aggregated results, which should be less than 10 MB.

I did the test on eclipse, running cascading on a 50+ nodes hadoop
cluster. It takes more than one hour to finish. I am now testing
another case with a 189MB table, which seems take forever to run.

Do I need to configure cascading or hadoop to make it run normally? In
a centralized environment, the 18MB file could be dealt with in a
minute.

Another question is, what is the size limit (in MB) that a group can
handle for cascading?

Best,
Joe

Chris Curtin

unread,
Apr 20, 2011, 3:09:21 PM4/20/11
to cascadi...@googlegroups.com
This definitely doesn't sound right. I do GB+ files on a 4 node cluster all the time in minutes.

I'd start with the Hadoop UI for the JobTracker and see what things Hadoop is actually doing. For example, how many mappers and reducers are you seeing in the job? What are the metrics for each mapper or reducer (drill into the UI and look to the right). Does the # of bytes/tuples look correct for your source data?. Look at the logs for each job and see if anything jumps out.

I'd also look to see if the admins have you deliberately restricted for some reason if they are using one of the schedulers.

Finally, the # of MB for the group by is technically limited by your disk space. By default I think Hadoop keeps 100MB in memory, but that is a tuning parameter. The rest is spilled to disk. This probably doesn't explain your performance issues unless your disks are really, really bad.

Chris

Nan

unread,
Apr 20, 2011, 4:30:58 PM4/20/11
to cascading-user
Hi Chris,

At least I can point out one thing that is wrong: using eclipse has
problems with Cascading, although running Hadoop has no problem.

The strange thing is that: I can run Cascading in Eclipse for small
input, I can get output result, but I cannot see any record of my
Hadoop jobs.

With larger documents (still quite small 189MB), there are 10 reduces
but one seems take very long to finish. I will further investigate it,
but the eclipse thing is really weird.

Best,
Joe

Nan

unread,
Apr 20, 2011, 5:12:58 PM4/20/11
to cascading-user
Hi,

I figured out the problem that why it was so slow. I need to maintain
some data during the customized aggregator which might be very large.
For example, if there are 10^6 tuples in a group, we might need to
record 10^6 ids for my purpose. This is definitely not scalable :( The
extreme case is that I may need to check the last tuple to see whether
the others are required or not. I will think or maybe re-design it
then :)

Best,
Joe

Ted Dunning

unread,
Apr 20, 2011, 5:21:43 PM4/20/11
to cascadi...@googlegroups.com, Nan
Lots of problems that involve highly skewed groups like that are susceptible to sampling.  Interestingly, sampling is also
often subject to combiner-like parallel reduction.  You have to retain information about what you have deleted, but it isn't hard.

Often, this sampling can make a huge difference in scalability with little or no impact on function.  

Examples of such applications include co-occurrence counting for recommendations.  Top-40 lists also fit into this category.

Is your application subject to this kind of sub-sampling.


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


Nan

unread,
Apr 20, 2011, 5:32:25 PM4/20/11
to cascading-user
Hi Ted,

Thanks and I realize that highly skewed groups are a big problem, or
alternatively writing codes directly in Hadoop and there is a huge
value list to reduce cannot remove this problem.

My application is not about sampling. Simply speaking, it is to find
all tuples in a relation that against some properties. The worst case
is that the group could be arbitrarily large and all tuple ids need to
be reported :(

Best,
Nan

On Apr 20, 10:21 pm, Ted Dunning <ted.dunn...@gmail.com> wrote:
> Lots of problems that involve highly skewed groups like that are susceptible
> to sampling.  Interestingly, sampling is also
> often subject to combiner-like parallel reduction.  You have to retain
> information about what you have deleted, but it isn't hard.
>
> Often, this sampling can make a huge difference in scalability with little
> or no impact on function.
>
> Examples of such applications include co-occurrence counting for
> recommendations.  Top-40 lists also fit into this category.
>
> Is your application subject to this kind of sub-sampling.
>
Reply all
Reply to author
Forward
0 new messages