New tuple stream from two input sources

13 views
Skip to first unread message

Eitetsu Jo

unread,
Jul 20, 2010, 11:05:14 PM7/20/10
to cascading-user
Hi, I posted for the first time.

I want to do the processing as the following.

input source1 : A, B, C, ...
input source2 : x, y, z, ...
Making new tuple stream from two above sources like this : { A x } ,
{ A y }, { A z }, ... , { B x } , { B y } , { B z }, ... , { C x } ,
{ C y } , { C z } , ...

To do this, I try allocating pseudo ID,
{ A 1 } { A 2 } { A 3 } ... { B 1 } { B 2 } { B 3 } ...
{ 1 x } { 2 y } { 3 z } ...
and joining them with CoGroup by pseudo IDs.

Does this approach agree with Cascading?

The result shows { B 1 } { B 2 } { B 3 }... ( not including { A 1 }
{ A 2 } .. { C 1 } { C 2 } ... )
with FlowException.
I don't have any idea to solve this.
Would you let me know any tips about this?

10/07/21 11:58:33 INFO util.Util: resolving application jar from found
main method on: jo.som.Test
10/07/21 11:58:33 INFO flow.MultiMapReducePlanner: using application
jar: /home/hadoop/LeukemiaSOM.jar
10/07/21 11:58:34 INFO cascade.Cascade: Concurrent, Inc - Cascading
1.1.1 [hadoop-0.19.2+]
10/07/21 11:58:34 INFO flow.Flow: [SOM] starting
10/07/21 11:58:34 INFO flow.Flow: [SOM] source:
Hfs["TextDelimited[['pmid', 'vector']]"]["/user/hadoop/
tarticle.txt"]"]
10/07/21 11:58:34 INFO flow.Flow: [SOM] sink:
Hfs["TextLine[['offset', 'line']->[ALL]]"]["/user/hadoop/tarticle1"]"]
10/07/21 11:58:34 INFO flow.Flow: [SOM] parallel execution is
enabled: true
10/07/21 11:58:34 INFO flow.Flow: [SOM] starting jobs: 1
10/07/21 11:58:34 INFO flow.Flow: [SOM] allocating threads: 1
10/07/21 11:58:34 INFO flow.FlowStep: [SOM] starting step: (1/1)
Hfs["TextLine[['offset', 'line']->[ALL]]"]["/user/hadoop/tarticle1"]"]
10/07/21 11:58:39 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the
same.
10/07/21 11:58:40 INFO mapred.FileInputFormat: Total input paths to
process : 1
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] completion events count: 7
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000003_0, Status : SUCCEEDED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000000_0, Status : SUCCEEDED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000001_0, Status : FAILED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000001_1, Status : FAILED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000001_2, Status : FAILED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000001_3, Status : TIPFAILED
10/07/21 11:59:15 WARN flow.FlowStep: [SOM] event = Task Id :
attempt_201007211157_0001_m_000002_0, Status : SUCCEEDED
10/07/21 11:59:15 WARN flow.Flow: stopping jobs
10/07/21 11:59:15 INFO flow.FlowStep: [SOM] stopping: (1/1)
Hfs["TextLine[['offset', 'line']->[ALL]]"]["/user/hadoop/tarticle1"]"]
10/07/21 11:59:15 WARN flow.Flow: stopped jobs
10/07/21 11:59:15 WARN flow.Flow: shutting down job executor
10/07/21 11:59:15 WARN flow.Flow: shutdown complete
10/07/21 11:59:15 INFO hadoop.Hadoop18TapUtil: deleting temp path /
user/hadoop/tarticle1/_temporary
Exception in thread "main" cascading.flow.FlowException: step failed:
(1/1) Hfs["TextLine[['offset', 'line']->[ALL]]"]["/user/hadoop/
tarticle1"]"]
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask
$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

Chris K Wensel

unread,
Jul 20, 2010, 11:30:26 PM7/20/10
to cascadi...@googlegroups.com
Cascading works great for that.

You need to look at your cluster task logs to see what failed for you.

cheers,
chris

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Eitetsu Jo

unread,
Jul 21, 2010, 1:58:11 AM7/21/10
to cascading-user
Thanks for your reply.

I got the following log from job tracker.

2010-07-21 11:59:01,344 INFO org.apache.hadoop.mapred.TaskInProgress:
Error from attempt_201007211157_0001_m_000001_2:
cascading.flow.stack.StackException: cascading.tap.TapException: did
not parse correct number of values from input data, expected: 2, got:
1:

I found that it happened because of the error of my datasets.
I'm sorry to post the trivial matter.

But another problem exists.
The first line of the datasets isn't read.

I wrote the code like this:
Tap source1 = new Hfs(new TextDelimited(new Fields("pmid","vector"),
true, "\t"), new Path("/user/hadoop/
tarticle.txt").toUri().toString());

Do you have any idea about this?

Chris K Wensel

unread,
Jul 21, 2010, 2:04:45 AM7/21/10
to cascadi...@googlegroups.com
You set skipHeader == true in the TextDelimited constructor.

This tells TextDelimited the first line is the header, and it should be skipped.

cheers,
chris

Eitetsu Jo

unread,
Jul 21, 2010, 2:45:53 AM7/21/10
to cascading-user
Thanks, Chris.
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.
Reply all
Reply to author
Forward
0 new messages