cascading.flow.FlowException: step failed: (1/1)

1,089 views
Skip to first unread message

Aceeca

unread,
Jun 13, 2010, 7:06:26 PM6/13/10
to cascading-user
Hello:

I am having this issue where cascade fails for large data sets. A
brief summary of the environment I'm running on:

Cascade Version: 1.08
Hadoop Version: 0.18 on AWS Elastic MapReduce
Background of the problem: We have an application that produce tab-
delimited log files. These files are parsed for certain fields and
group-by-ed these extracted fields using the built-in aggregate count
function. the input is on S3 and the output is written to S3. Now I
have written code using cascade to get the aggregate, the code looks
like:

public static void main(String[] args)
{
String inputPath = args[0];
String outputPath = args[1];



Fields allFields = new Fields("timestamp", "field1", "field2",
"field3", "field4", "field5", "field6",
"field7");

TextLine scheme = new TextLine(new Fields("tuple"));


Tap logTap = new Hfs(scheme, inputPath);

String delimiter = "\t";

Pipe assembly = new Pipe("testpipe");
Function parser = new RegexSplitter(allFields, delimiter);


assembly = new Each(assembly, new Fields("tuple"), parser);

assembly = new Each(assembly, new Fields("timestamp"), new
DateFormatter(new Fields(("date")), "yyyy-MM-dd"), new Fields("date",
"field1", "field2",
"field3", "field5", "field7"));

assembly = new Each(assembly, new Fields("field2"), new
CustomFunction2(new Fields(("newfield2"))), new Fields("date",
"field1", "newfield2",
"field3", "field5", "field7"));

assembly = new Each(assembly, new Fields("field5"), new
CustomFunction5(new Fields(("newfield5"))), new Fields("date",
"field1", "newfield2",
"field3", "newfield5", "field7"));

assembly = new GroupBy(assembly, new Fields("date", "field1",
"newfield2",
"field3", "newfield5", "field7"));

Aggregator count = new Count(new Fields("count"));
assembly = new Every(assembly, count);

Tap remoteLogTap = new Hfs(scheme, outputPath);


Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties,
CascadingJob.class);


Flow parsedLogFlow = new
FlowConnector(properties).connect(logTap, remoteLogTap, assembly);



parsedLogFlow.start();


parsedLogFlow.complete();
}

Now, the issue is that when I run the code with small data-sets (~5
Million) lines, the codes executes correctly to completion. However if
I run the above code on large data sets ~100 Million lines, it fails.
Another thing I tried was increasing the memory allocated to JVM to
~5g, then it succeeds for the 100M data set. If I increase the data
set to 500M, the code fails again.

The exception I get is:
cascading.flow.FlowException: step failed: (1/1)
Hfs["TextLine[['tuple']->[ALL]]"]["s3://<bucket-name>/"]"]
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:493)
at cascading.flow.FlowStep$FlowStepJob.call(FlowStep.java:422)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

I Googled for this exception but could not get much from it. leads me
to believe that since I'm doing something very straight-forward and
with very little results for the exception that I may be doing
something drastically wrong.

Thanks in advance

Chris K Wensel

unread,
Jun 13, 2010, 7:13:36 PM6/13/10
to cascadi...@googlegroups.com
You need to look at the actual exception on the cluster, in the cluster logs. Hadoop does not send them back, we only know the job failed.

if the failure is in the mapper, its likely your custom functions accumulating state (and memory). something that tends to not scale well.

also you might find upgrading to Cascading 1.1.x a good idea.

cheers,
chris

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Aceeca

unread,
Jun 13, 2010, 7:34:57 PM6/13/10
to cascading-user
Hello Chris,

Thank you for your reply. You are correct, the issue is related to
memory. Below is the exception I get:

2010-06-13 18:54:37,287 INFO org.apache.hadoop.mapred.TaskInProgress
(IPC Server handler 0 on 9001): Error from
attempt_201006131636_0001_m_000017_0: java.io.IOException: Cannot run
program "bash": java.io.IOException: error=12, Cannot allocate memory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:459)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:149)
at org.apache.hadoop.util.Shell.run(Shell.java:134)
at org.apache.hadoop.fs.DF.getAvailable(DF.java:73)
at org.apache.hadoop.fs.LocalDirAllocator
$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:329)
at
org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:
124)
at
org.apache.hadoop.mapred.MapOutputFile.getOutputFileForWrite(MapOutputFile.java:
61)
at org.apache.hadoop.mapred.MapTask
$MapOutputBuffer.mergeParts(MapTask.java:1495)
at org.apache.hadoop.mapred.MapTask
$MapOutputBuffer.flush(MapTask.java:1171)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:
365)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:312)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.io.IOException: java.io.IOException: error=12, Cannot
allocate memory
at java.lang.UNIXProcess.<init>(UNIXProcess.java:148)
at java.lang.ProcessImpl.start(ProcessImpl.java:65)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:452)
... 11 more


The function I have is straight forward:

@Override
public void operate(FlowProcess flowProcess, FunctionCall
functionCall)
{
Tuple output = new Tuple();
String urlString = functionCall.getArguments().getString(0);

try
{
URL url = new URL(urlString);
output.add(url.getHost());
}
catch (MalformedURLException ex)
{
output.add("Unknown");
}
catch (Exception e)
{
output.add("Unknown");
}
functionCall.getOutputCollector().add(output);
}


The thing that gets accumulated in memory is the tuple and its count?
I'm a little to new to this, so sorry if the answers are very obvious:

(i) Will increasing the number of instances help?
(ii) Is there a fundamental flaw with the design hat does not lend
itself to being scalable?

Thanks in advance.
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Jun 13, 2010, 8:09:53 PM6/13/10
to cascadi...@googlegroups.com
These types of issues are best answered on the Hadoop list.

but it looks to me as if your child jvms (map tasks) are using up so much memory they are preventing new child jvms from spawning.

you might reduce the amount of memory your child jvms can use so the gc will kick in, instead of just growing the heap preventing other tasks from spawning..

or it could be something entirely different. sorry.

ckw

> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
Jun 13, 2010, 10:29:24 PM6/13/10
to cascadi...@googlegroups.com
From the error below, one explanation is that you've run out of
available swap space.

When Hadoop has to execute a shell command (via bash), this triggers a
fork of the current process.

The current process is the Java JVM running the task. The fork
initially uses the same size heap as the parent process. If you're
running with a big Java heap size, this effectively means double that
amount of memory.

This typically works even when you don't have that much memory,
because Linux supports over-commit of memory, where if the swap space
is available, it pretends that the new process can be launched.

But if you've exhausted swap space, the process fork will fail.

-- Ken

>>> To post to this group, send email to cascading-
>>> us...@googlegroups.com.


>>> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
>>> .
>>> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
>>> .
>>
>> --
>> Chris K Wensel
>> ch...@concurrentinc.comhttp://www.concurrentinc.com
>
> --
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> .
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> .
>

--------------------------------------------
Ken Krugler
+1 530-210-6378
http://bixolabs.com
e l a s t i c w e b m i n i n g


Ken Krugler

unread,
Jun 13, 2010, 10:31:09 PM6/13/10
to cascadi...@googlegroups.com
One other minor issue is that you're creating a new Tuple with every
call to your operator.

You could instead reuse a tuple, to avoid lots of object allocation/
deallocation (followed by GCs).

-- Ken

On Jun 13, 2010, at 4:34pm, Aceeca wrote:

> Hello Chris,
>
> Thank you for your reply. You are correct, the issue is related to
> memory. Below is the exception I get:
>
> 2010-06-13 18:54:37,287 INFO org.apache.hadoop.mapred.TaskInProgress
> (IPC Server handler 0 on 9001): Error from
> attempt_201006131636_0001_m_000017_0: java.io.IOException: Cannot run
> program "bash": java.io.IOException: error=12, Cannot allocate memory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:459)
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:149)
> at org.apache.hadoop.util.Shell.run(Shell.java:134)
> at org.apache.hadoop.fs.DF.getAvailable(DF.java:73)
> at org.apache.hadoop.fs.LocalDirAllocator
> $AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:329)
> at

> org
> .apache
> .hadoop
> .fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:
> 124)
> at
> org
> .apache
> .hadoop.mapred.MapOutputFile.getOutputFileForWrite(MapOutputFile.java:

>>> To post to this group, send email to cascading-
>>> us...@googlegroups.com.


>>> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
>>> .
>>> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
>>> .
>>
>> --
>> Chris K Wensel
>> ch...@concurrentinc.comhttp://www.concurrentinc.com
>
> --
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> .

Ted Dunning

unread,
Jun 13, 2010, 11:27:34 PM6/13/10
to cascadi...@googlegroups.com
Is that still true?  I thought copy-on-write vfork was standard any more.

Ken Krugler

unread,
Jun 15, 2010, 10:38:49 AM6/15/10
to cascadi...@googlegroups.com
On Jun 13, 2010, at 8:27pm, Ted Dunning wrote:

Is that still true?  I thought copy-on-write vfork was standard any more.

From what I've been reading, it's highly dependent on your Linux distribution/version. Some interesting comments I've found (not verified):

- Most Java implementations use fork (not vfork) as that doesn't cause all threads to hang while the fork is getting started.

- But using fork instead of vfork is bad since it requires copying the page tables, which can get big when you're using a 64-bit machine w/lots of memory & a big JVM.

- In any case, the real issue is with overcommit support (even for fork or vfork). You're still asking for a big hunk of memory, and the Linux kernel needs to decide if you have enough. If the vm_overcommit setting is "default heuristic" (0) you can get an IOException. One explanation:

 Normally the kernel will allow a certain amount of overcommit of memory (in the default, heuristic mode - mode 0 if you `cat /proc/sys/vm/overcommit_memory`). It takes into consideration the available memory in the system and also gives a little more leeway to root. If it thinks you're making a request it cannot possibly fulfill, however, the fork will fail. Adding more swap space makes the kernel think that your request to fork isn't so outlandish and will give it a green light

- Solaris 10 doesn't support overcommit, so it requires the full amount of memory (typically via swap space) to be available. This either totally sucks, or is the right thing to do, depending on who you're reading.

- For a more thorough discussion of this exact issue in Hadoop, with shell commands, read through the issue & comments for:


-- Ken

On Sun, Jun 13, 2010 at 7:29 PM, Ken Krugler <kkrugle...@transpac.com> wrote:
The current process is the Java JVM running the task. The fork initially uses the same size heap as the parent process. If you're running with a big Java heap size, this effectively means double that amount of memory.


Aceeca

unread,
Jun 16, 2010, 11:02:44 AM6/16/10
to cascading-user
Thanks everyone for their inputs and hints.

Here's how I have solved this issue:
(i) I reduced the number of Mappers & Reducers per node to a
reasonable number, since my Mappers very memory intensive. Also AWS
EMR does not have swap on their servers.
(ii) Added JVM params: -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -
server to all the child processes.

I know I can tweak the number of mappers and reducers, along with GC
params to get optimal performance, but since I could not get the
process to run for reasonable amount of data, this is a milestone for
me.

Thanks once again.
> > On Sun, Jun 13, 2010 at 7:29 PM, Ken Krugler <kkrugler_li...@transpac.com
> > > wrote:
> > The current process is the Java JVM running the task. The fork  
> > initially uses the same size heap as the parent process. If you're  
> > running with a big Java heap size, this effectively means double  
> > that amount of memory.
>
> --------------------------------------------
> Ken Krugler
> +1 530-210-6378http://bixolabs.com

Esé

unread,
Jun 17, 2010, 7:58:44 PM6/17/10
to cascading-user
Hey folks,

the code Aceeca posted looks innocuous enough. No gotchas in there for
out of memory type errors. Even though Aceeca did find a workable
solution I am curious about the type of settings he was using in the
first place since, I am guessing, that's what caused the issue.

Aceeca - were you running lots and lots of mappers per node with a
very high memory setting per jvm, way beyond the EMR defaults? If you
simply run the same code on a bunch of c1.medium boxes with standard
EMR settings, I'd expect you would be able to process 500M without any
problem. Have you tried that?
Reply all
Reply to author
Forward
0 new messages