ClassNotFoundException: cascading.flow.FlowMapper

73 views
Skip to first unread message

Fei Dong

unread,
Oct 10, 2011, 2:48:50 AM10/10/11
to cascading-user
Hi all,

I am a newbie on cascading. I hack some code to replace the old
mapreduce API with the new one (such as use Configuration instead of
JobConf, use mapreduce.* instead of mapred.* ). The reason to hack is
that I have to integrate with another Hadoop analysis system which can
only support new API. I have finished the translation work but in the
running time, it reports some errors.

When I run the cascading.example loganalysis (or other exampls), the
Map tasks always failed with java.lang.ClassNotFoundException:
cascading.flow.FlowMapper.

I have rewrite the FlowMapper which extends from
org.apache.hadoop.mapreduce.Mapper, and make sure to set it in
getJobConf(), job.setMapperClass(FlowMapper.class);
I am not sure what is wrong about the example setting. I did not
change the build.xml of loganalysis
BTW: My modified code can pass that testcase of BasicPipeTest.

Could someone help me figure it out?

job log
##############
Task Logs: 'attempt_201109271720_0038_m_000000_0'

syslog logs
2011-10-10 02:28:40,143 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
Initializing JVM Metrics with processName=MAP, sessionId=
2011-10-10 02:28:40,256 WARN org.apache.hadoop.mapred.TaskTracker:
Error running child
java.lang.RuntimeException: java.lang.ClassNotFoundException:
cascading.flow.FlowMapper
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:
809)
at
org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:
157)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:569)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.Child.main(Child.java:170)
Caused by: java.lang.ClassNotFoundException: cascading.flow.FlowMapper
at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:247)
at
org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:
762)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:
807)
... 4 more
2011-10-10 02:28:40,258 INFO org.apache.hadoop.mapred.TaskRunner:
Runnning cleanup for the task

cascading log
###############
dongfei@potato ~/workspace/cascading.samples/loganalysis> hadoop jar ./
build/loganalysis.jar data/apache.200.txt output3
11/10/10 02:28:35 INFO flow.MultiMapReducePlanner: using application
jar: /home/dongfei/workspace/cascading.samples/loganalysis/./build/
loganalysis.jar
11/10/10 02:28:35 INFO flow.MultiMapReducePlanner: using application
jar: /home/dongfei/workspace/cascading.samples/loganalysis/./build/
loganalysis.jar
11/10/10 02:28:36 INFO cascade.Cascade: Concurrent, Inc - Cascading
1.2.4dev [hadoop-0.19.2+]
11/10/10 02:28:36 INFO cascade.Cascade: [import+tsCount+tmCount]
starting
11/10/10 02:28:36 INFO cascade.Cascade: [import+tsCount+tmCount]
parallel execution is enabled: true
11/10/10 02:28:36 INFO cascade.Cascade: [import+tsCount+tmCount]
starting flows: 2
11/10/10 02:28:36 INFO cascade.Cascade: [import+tsCount+tmCount]
allocating threads: 2
11/10/10 02:28:36 INFO cascade.Cascade: [import+tsCount+tmCount]
starting flow: import
11/10/10 02:28:36 INFO flow.Flow: [import] atleast one sink does not
exist
11/10/10 02:28:36 INFO flow.Flow: [import] starting
11/10/10 02:28:36 INFO flow.Flow: [import] source:
Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/apache.200.txt"]"]
11/10/10 02:28:36 INFO flow.Flow: [import] sink:
Hfs["SequenceFile[['ip', 'time', 'method', 'event', 'status',
'size']]"]["output3/logs/"]"]
11/10/10 02:28:36 INFO tap.Hfs: forcing job to local mode, via source:
Lfs["TextLine[['offset', 'line']->[ALL]]"]["data/apache.200.txt"]"]
11/10/10 02:28:36 INFO flow.Flow: [import] parallel execution is
enabled: true
11/10/10 02:28:36 INFO flow.Flow: [import] starting jobs: 1
11/10/10 02:28:36 INFO flow.Flow: [import] allocating threads: 1
11/10/10 02:28:36 INFO flow.FlowStep: [import] starting step:
(1/1) ...e[['ip', 'time', 'method', 'event', 'status', 'size']]"]
["output3/logs/"]"]
11/10/10 02:28:36 INFO flow.FlowStep: [import]
job.getInputFormatClass():
org.apache.hadoop.mapreduce.lib.input.TextInputFormat
11/10/10 02:28:36 WARN mapred.JobClient: No job jar file set. User
classes may not be found. See JobConf(Class) or
JobConf#setJar(String).
11/10/10 02:28:36 INFO input.FileInputFormat: Total input paths to
process : 1
11/10/10 02:29:11 WARN flow.FlowStep: [import] task completion events
identify failed tasks
11/10/10 02:29:11 WARN flow.FlowStep: [import] task completion events
count: 6
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000002_0, Status : SUCCEEDED
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000000_0, Status : FAILED
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000000_1, Status : FAILED
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000000_2, Status : FAILED
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000000_3, Status : TIPFAILED
11/10/10 02:29:11 WARN flow.FlowStep: [import] event = Task Id :
attempt_201109271720_0038_m_000001_0, Status : SUCCEEDED
11/10/10 02:29:11 WARN flow.Flow: stopping jobs
11/10/10 02:29:11 INFO flow.FlowStep: [import] stopping:
(1/1) ...e[['ip', 'time', 'method', 'event', 'status', 'size']]"]
["output3/logs/"]"]
11/10/10 02:29:11 WARN flow.Flow: stopped jobs
11/10/10 02:29:11 WARN flow.Flow: shutting down job executor
11/10/10 02:29:11 WARN flow.Flow: shutdown complete
11/10/10 02:29:11 INFO hadoop.Hadoop18TapUtil: deleting temp path
output3/logs/_temporary
11/10/10 02:29:11 WARN cascade.Cascade: [import+tsCount+tmCount] flow
failed: import
cascading.flow.FlowException: step failed: (1/1) ...e[['ip', 'time',
'method', 'event', 'status', 'size']]"]["output3/logs/"]"], with job
id: null, please see cluster logs for failure messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:163)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:124)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:115)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
11/10/10 02:29:11 WARN cascade.Cascade: [import+tsCount+tmCount]
stopping flows
11/10/10 02:29:11 INFO cascade.Cascade: [import+tsCount+tmCount]
stopping flow: tsCount+tmCount
11/10/10 02:29:11 INFO cascade.Cascade: [import+tsCount+tmCount]
stopping flow: import
11/10/10 02:29:11 WARN cascade.Cascade: [import+tsCount+tmCount]
stopped flows
11/10/10 02:29:11 WARN cascade.Cascade: [import+tsCount+tmCount]
shutting down flow executor
11/10/10 02:29:11 WARN cascade.Cascade: [import+tsCount+tmCount]
shutdown complete
Exception in thread "main" cascading.cascade.CascadeException: flow
failed: import
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:692)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:640)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: cascading.flow.FlowException: step failed:
(1/1) ...e[['ip', 'time', 'method', 'event', 'status', 'size']]"]
["output3/logs/"]"], with job id: null, please see cluster logs for
failure messages
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:163)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:124)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:115)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
... 5 more

Chris K Wensel

unread,
Oct 10, 2011, 9:34:55 AM10/10/11
to cascadi...@googlegroups.com
I'm unsure why you need the new apis.

It is quite trivial to create an object to wrap the other system with a class and put the riffle annotations on it, then pass that object to ProcessFlow to be executed in a Cascade.

http://www.cascading.org/1.2/javadoc/cascading/flow/ProcessFlow.html
https://github.com/cwensel/riffle/

The Riffle annotations were conceived by Ted and me to allow Mahout apps to be driven by Cascading.

That is a whole lot easier than upgrading Cascading to use experimental unstable apis. I spent a week and never got it to work.

chris

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

-- Concurrent, Inc. offers mentoring, support for Cascading

Reply all
Reply to author
Forward
0 new messages