Unexplained error - scaling problem?

71 views
Skip to first unread message

Simon Radford

unread,
Mar 8, 2012, 10:08:44 PM3/8/12
to cascading-user
Hey there,

I've been getting a strange failure that is almost impossible to
debug. I've included the stderr of my cascading job below, but the log
has almost no useful information. It simply says "task completion
events identify failed tasks", but there are no tasks created.

If I run my job on a smaller set of input data (1/4 the size),
everything works fine. Also, if I break my input data into multiple
smaller streams, and then merge them together with a GroupBy,
everything works. So it seems like some sort of obscure scaling
problem with large numbers of input files on a single source tap.

I'm using a GlobHfs, but I know the regular expression works since the
number of input paths is correct.

I am at a complete loss to explain this. Anyone have any ideas? I can
do the multiple smaller sources merged together trick, but I shouldn't
have to.

Thanks a lot!
Simon Radford

PS. I'm using PyCascading on Amazon EMR with S3 as input.

Stderr log follows:

12/03/08 01:42:32 INFO flow.MultiMapReducePlanner: using application
jar: /home/hadoop/pycascading/deploys/bANYJl/sources/../deploy.jar
12/03/08 01:42:32 INFO cascade.Cascade: Concurrent, Inc -
Cascading 1.2.5 [hadoop-0.19.2+]
12/03/08 01:42:32 INFO flow.Flow: [sink/393:ad_network_pe...]
starting
12/03/08 01:42:32 INFO flow.Flow: [sink/393:ad_network_pe...]
source: Hfs["TextDelimited[['adunit_id', 'app_id', 'name']]"]["s3://
simon-testing/mappings/mapping_auid_appid_name.txt"]"]
12/03/08 01:42:32 INFO flow.Flow: [sink/393:ad_network_pe...]
source: Hfs["TextDelimited[['creative_id', 'adgroup_id',
'network_name']]"]["s3://simon-testing/mappings/
mapping_cid_adgroupid_adnetwork.txt"]"]
12/03/08 01:42:32 INFO flow.Flow: [sink/393:ad_network_pe...]
source: GlobHfs[s3://XXXX:XXXX@simon-testing/sample_loglines/
sample-????-????/part-*]
12/03/08 01:42:32 INFO flow.Flow: [sink/393:ad_network_pe...]
sink: Hfs["MetaScheme[[UNKNOWN]->[ALL]]"]["s3://simon-testing/
sample_loglines_listified/sample_listified-2012-0201--2012-0228"]"]
12/03/08 01:57:55 INFO s3native.NativeS3FileSystem: Delete called
for 's3://XXXX:XXXX@simon-testing/sample_loglines_listified/
sample_listified-2012-0201--2012-0228' but file does not exist, so
returning false
12/03/08 01:57:55 INFO flow.Flow: [sink/393:ad_network_pe...]
parallel execution is enabled: true
12/03/08 01:57:55 INFO flow.Flow: [sink/393:ad_network_pe...]
starting jobs: 3
12/03/08 01:57:55 INFO flow.Flow: [sink/393:ad_network_pe...]
allocating threads: 3
12/03/08 01:57:56 INFO flow.FlowStep: [sink/393:ad_network_pe...]
starting step: (2/3) ... 'filename', 'file_offset', 'logline']]"][each/
376:ad_network_perf./51730/]
12/03/08 01:59:54 INFO mapred.FileInputFormat: Total input paths
to process : 6720
12/03/08 02:01:42 INFO mapred.FileInputFormat: Total input paths
to process : 6720
12/03/08 02:01:42 INFO mapred.FileInputFormat: Total input paths
to process : 1
12/03/08 02:01:42 INFO mapred.FileInputFormat: Total input paths
to process : 1
12/03/08 02:04:49 WARN flow.FlowStep: [sink/393:ad_network_pe...]
task completion events identify failed tasks
12/03/08 02:04:49 WARN flow.FlowStep: [sink/393:ad_network_pe...]
task completion events count: 0
12/03/08 02:04:49 WARN flow.FlowStep: [sink/393:ad_network_pe...]
abandoning step: (1/3) ... 'filename', 'file_offset', 'logline']]"]
[each/386:ad_network_perf./88114/], predecessor failed: (2/3) ...
'filename', 'file_offset', 'logline']]"][each/376:ad_network_perf./
51730/]
12/03/08 02:04:49 INFO flow.FlowStep: [sink/393:ad_network_pe...]
stopping: (1/3) ... 'filename', 'file_offset', 'logline']]"][each/
386:ad_network_perf./88114/]
12/03/08 02:04:49 WARN flow.FlowStep: [sink/393:ad_network_pe...]
abandoning step: (3/3) ...testing/sample_loglines_listified/
sample_listified-2012-0201--2012-0228"]"], predecessor failed:
(1/3) ... 'filename', 'file_offset', 'logline']]"][each/
386:ad_network_perf./88114/]
12/03/08 02:04:49 INFO flow.FlowStep: [sink/393:ad_network_pe...]
stopping: (3/3) ...testing/sample_loglines_listified/
sample_listified-2012-0201--2012-0228"]"]
12/03/08 02:04:49 WARN flow.Flow: stopping jobs
12/03/08 02:04:49 INFO flow.FlowStep: [sink/393:ad_network_pe...]
stopping: (3/3) ...testing/sample_loglines_listified/
sample_listified-2012-0201--2012-0228"]"]
12/03/08 02:04:49 INFO flow.FlowStep: [sink/393:ad_network_pe...]
stopping: (1/3) ... 'filename', 'file_offset', 'logline']]"][each/
386:ad_network_perf./88114/]
12/03/08 02:04:49 INFO flow.FlowStep: [sink/393:ad_network_pe...]
stopping: (2/3) ... 'filename', 'file_offset', 'logline']]"][each/
376:ad_network_perf./51730/]
12/03/08 02:04:49 WARN flow.Flow: stopped jobs
12/03/08 02:04:49 WARN flow.Flow: shutting down job executor
12/03/08 02:04:49 WARN flow.Flow: shutdown complete
cascading.flow.FlowException: step failed: (2/3) ... 'filename',
'file_offset', 'logline']]"][each/376:ad_network_perf./51730/], with
job id: job_201203080139_0001, please see cluster logs for failure
messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:175)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:140)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:129)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:
303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)

Ken Krugler

unread,
Mar 9, 2012, 9:58:43 AM3/9/12
to cascadi...@googlegroups.com
I'd recommend checking the JobTracker log (which will be saved to S3 by EMR).

E.g. it might be running out of memory generating the splits for one single big job.

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr




Gabor Szabo

unread,
Mar 9, 2012, 11:46:34 AM3/9/12
to cascading-user
I'd also recommend looking at the jt logs as Ken suggested, there's a
chance you'll see a huge job.xml there.

From the logs it looks like your job is taking 15 minutes to submit.
I've seen this happen that if the number of input files is large,
GlobHfs cannot submit the job ("large" depends on the hadoop hardware
config and how much memory it has, 7k as in your case may be enough).
The reason is that GlobHfs hammers the namenode with 1000s of requests
to expand the glob to get each *individual* path to build a list of
inputs, which will take a long time. Then the input list will contain
these paths, hence the large size of job.xml, and the jobtracker runs
out of memory on these.

A solution that has worked for me was to use Hfs with globs. Hadoop
should be able to handle globified input paths, and PyCascading sets
the mapred.input.dir.recursive to true for the job config, so you can
specify your source tap like Hfs(TextDelimited(), 'path/to/bigones/
2011/*'). A caveat: make sure the path part doesn't contain the glob
characters {}. If it does, although completely legal from Hadoop's
point of view, Hfs won't pass on the right glob to Hadoop. (To explain
Hfs converts the path to a URI first and {} become percent-encoded.) I
believe Chris fixed this in Cascading 2, but it may exist in 1.2.
Alternatively you can use HfsDirect in PyCascading safely where Hfs is
subclassed to fix this.

Btw as a side note you can see which MR steps failed to run in the
logs or the jt, each/376:ad_network_perf... etc. means that the step
comes from an Each at line 376 in your ad_net... Python file.

Gabor
Reply all
Reply to author
Forward
0 new messages