How can I have a source/sink Tap which uses a set of unknown/mixed tuple values?

52 views
Skip to first unread message

Allen

unread,
May 10, 2010, 4:20:59 PM5/10/10
to cascading-user
I've got a flow defined and working which takes some log files as
input and works through a series of 4 aggregations, but after
debugging things a bit more I noticed that the planning on the flow
wasn't particularly optimal and the initial log parsing pipe was
getting run multiple times. So I'd like to optimize things a little
bit and only read the initial log files and parse them once, then re-
use the data from there for the multiple aggregations I am doing.

The only thing that makes this a bit tricky is that my log files use
JSON based log entries and the entries are not all exactly the same,
so I currently have 3 types of log entries which each have a slightly
different number of tuple values. In the current flow that I have I
use a custom function which parses each log entry and outputs the
tuples using Fields.UNKNOWN which works fine if I pass this in to
other pipes. However, I'm have a problem if I try to sink that output
into a temporary file, then use that temp file as the source to the
cascading flow which does the aggregations.

Is it possible to setup a Tap sink which uses a SequenceFile and
doesn't care what the Fields definitions are? And on top of that, can
I then setup a Tap source which uses a SequenceFile as the input and
doesn't require an exact Fields definition for the input tuples?

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
May 10, 2010, 5:10:56 PM5/10/10
to cascadi...@googlegroups.com
On May 10, 2010, at 1:20pm, Allen wrote:

I've got a flow defined and working which takes some log files as
input and works through a series of 4 aggregations, but after
debugging things a bit more I noticed that the planning on the flow
wasn't particularly optimal and the initial log parsing pipe was
getting run multiple times.

This would happen if you split your output from the parse operator into multiple pipes.

Cascading has two choices then - it can either re-run the operation to generate tuples for each of the splits, or it can save the output of the operator to a temp file, and use that as the input for each split.

With Cascading 1.1, you can have an isSafe() method in your operation that returns false, and this then forces Cascading to use the latter approach (create temp file).

So I'd like to optimize things a little
bit and only read the initial log files and parse them once, then re-
use the data from there for the multiple aggregations I am doing.

The only thing that makes this a bit tricky is that my log files use
JSON based log entries and the entries are not all exactly the same,
so I currently have 3 types of log entries which each have a slightly
different number of tuple values.  In the current flow that I have I
use a custom function which parses each log entry and outputs the
tuples using Fields.UNKNOWN which works fine if I pass this in to
other pipes.  However, I'm have a problem if I try to sink that output
into a temporary file, then use that temp file as the source to the
cascading flow which does the aggregations.

Is it possible to setup a Tap sink which uses a SequenceFile and
doesn't care what the Fields definitions are?

Sure.

        Tap sinkTap = new Hfs(new SequenceFile(Fields.ALL), outputPath, SinkMode.REPLACE);

And on top of that, can
I then setup a Tap source which uses a SequenceFile as the input and
doesn't require an exact Fields definition for the input tuples?

Yes.

        Tap sourceTap = new Hfs(new SequenceFile(Fields.ALL), inputPath);

-- Ken

--------------------------------------------
Ken Krugler
e l a s t i c   w e b   m i n i n g




Allen

unread,
May 11, 2010, 1:03:39 PM5/11/10
to cascading-user
Awesome, I used the isSafe() method and that did the trick. The docs
don't indicate anything about how isSafe() can cause data from a pipe
to be written out to a temporary file before being fed into other
pipes, which I'm guessing is only the case when a split happens after
the function?

In any case, from a performance perspective is either of the solutions
you offer below preferred? I would expect that if Cascading is going
to use a temporary file that it's going to do so using effectively the
same strategy of writing to a SequenceFile and then reading back from
the same file for any continuing pipes?

Allen
> +1 530-210-6378http://bixolabs.com

Ken Krugler

unread,
May 11, 2010, 3:45:10 PM5/11/10
to cascadi...@googlegroups.com

On May 11, 2010, at 10:03am, Allen wrote:

> Awesome, I used the isSafe() method and that did the trick. The docs
> don't indicate anything about how isSafe() can cause data from a pipe
> to be written out to a temporary file before being fed into other
> pipes, which I'm guessing is only the case when a split happens after
> the function?

I believe isSafe() is considered any time Cascading has to decide
whether to run an operation multiple times, versus using a temp file.

So "only the case" might not be true, but Chris would know best.

> In any case, from a performance perspective is either of the solutions
> you offer below preferred? I would expect that if Cascading is going
> to use a temporary file that it's going to do so using effectively the
> same strategy of writing to a SequenceFile and then reading back from
> the same file for any continuing pipes?

Performance depends on the time required to run the operation(s)
versus the time required to write intermediate results to a file.

For example, consider the case of a source tap, two operations, and
then a split. In the case where you're re-running the operations,
you'll do this twice:

Read original tuples (2x)
Perform operations (2x)

Now let's say you marked the second operation as unsafe. What you'd
now get:

Read original tuples (1x)
Perform operations (1x)
Write resulting tuples (1x)
Read resulting tuples (2x)

If you assume that the resulting tuples are roughly the same size/type
as the original tuples, so the read times for original & resulting
tuples are equivalent, then what you're comparing (canceling out
common factors) is the time for:

Perform operations (1x)

and

Write tuples (1x)
Read tuples (1x)

Since writing & reading (disk I/O) is so much slower than in-memory
operations, usually it's faster to perform the extra operations one
more time.

Two examples (from Bixo) where this hasn't been true:

- Fetching pages
- Parsing pages

-- Ken

Ted Dunning

unread,
May 11, 2010, 3:48:44 PM5/11/10
to cascadi...@googlegroups.com
The other place that this happens is where your assumption of equal size is violated.

Many map-reduce operations result in thousands of times smaller output than input.  These are a cinch for temp-file treatment.


On Tue, May 11, 2010 at 12:45 PM, Ken Krugler <kkrugle...@transpac.com> wrote:
Perform operations (1x)

and

Write tuples (1x)
Read tuples (1x)

Since writing & reading (disk I/O) is so much slower than in-memory operations, usually it's faster to perform the extra operations one more time.

Two examples (from Bixo) where this hasn't been true:

- Fetching pages
- Parsing pages

--

Chris K Wensel

unread,
May 11, 2010, 5:26:40 PM5/11/10
to cascadi...@googlegroups.com
which speaks to adding support for planner hints at some point.

Allen

unread,
May 17, 2010, 5:26:16 PM5/17/10
to cascading-user
quick follow up question ... do temp files get written to the hadoop
master node? i'm relatively new to Hadoop so this may be a stupid
question, but i can't imagine a reason why the temp files would go to
the master rather than use HDFS and get spread out over the cluster.

i ask because i'm getting this error when running one of my cascading
flows on AWS elastic map reduce and i'm not sure why ...

cascading.flow.FlowException: step failed: (1/9)
TempHfs["SequenceFile[[UNKNOWN]]"][adstats/84490/]
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask
$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)

this thread suggests that the problem may be caused by the master
running out of disk, but i'm not sure how or why that would happen,
especially considering that i'm using EMR and not doing anything
specifically on the Hadoop Master node

http://groups.google.com/group/cascading-user/browse_thread/thread/3392813b6bb8cb4f/6d4b1c5ee949903f?lnk=gst&q=TempHfs#6d4b1c5ee949903f

any help is greatly appreciated.

Cheers,
Allen


On May 11, 2:26 pm, Chris K Wensel <ch...@wensel.net> wrote:
> which speaks to adding support for planner hints at some point.
>
> On May 11, 2010, at 12:48 PM, Ted Dunning wrote:
>
>
>
>
>
> > The other place that this happens is where your assumption of equal size is violated.
>
> > Many map-reduce operations result in thousands of times smaller output than input.  These are a cinch for temp-file treatment.
>
> > On Tue, May 11, 2010 at 12:45 PM, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> > Perform operations (1x)
>
> > and
>
> > Write tuples (1x)
> > Read tuples (1x)
>
> > Since writing & reading (disk I/O) is so much slower than in-memory operations, usually it's faster to perform the extra operations one more time.
>
> > Two examples (from Bixo) where this hasn't been true:
>
> > - Fetching pages
> > - Parsing pages
>
> > --
> > You received this message because you are subscribed to the Google Groups "cascading-user" group.
> > To post to this group, send email to cascadi...@googlegroups.com.
> > To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.
>
> --
> Chris K Wensel
> ch...@concurrentinc.comhttp://www.concurrentinc.com
>
> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
May 17, 2010, 5:42:02 PM5/17/10
to cascadi...@googlegroups.com

On May 17, 2010, at 2:26pm, Allen wrote:

> quick follow up question ... do temp files get written to the hadoop
> master node?

No - to HDFS.

> i'm relatively new to Hadoop so this may be a stupid
> question, but i can't imagine a reason why the temp files would go to
> the master rather than use HDFS and get spread out over the cluster.
>
> i ask because i'm getting this error when running one of my cascading
> flows on AWS elastic map reduce and i'm not sure why ...
>
> cascading.flow.FlowException: step failed: (1/9)
> TempHfs["SequenceFile[[UNKNOWN]]"][adstats/84490/]
> at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
> at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
> at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
> at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
> at java.util.concurrent.FutureTask
> $Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at java.util.concurrent.ThreadPoolExecutor
> $Worker.runTask(ThreadPoolExecutor.java:886)
> at java.util.concurrent.ThreadPoolExecutor
> $Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:619)

This error is telling you that the step failed, and that step would
have written a temp file - but to find out why, look at earlier log
entries, and you should find the real exception that triggered the
failure.

-- Ken
>>> To post to this group, send email to cascading-
>>> us...@googlegroups.com.
>>> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
>>> .
>>> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
>>> .
>>
>> --
>> Chris K Wensel
>> ch...@concurrentinc.comhttp://www.concurrentinc.com
>>
>> --
>> You received this message because you are subscribed to the Google
>> Groups "cascading-user" group.
>> To post to this group, send email to cascadi...@googlegroups.com.
>> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
>> .
>> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en
>> .
>
> --
> You received this message because you are subscribed to the Google
> Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com
> .
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en
> .
>

--------------------------------------------
Ken Krugler
+1 530-210-6378
http://bixolabs.com
e l a s t i c w e b m i n i n g




Allen Gilliland

unread,
May 17, 2010, 6:38:19 PM5/17/10
to cascadi...@googlegroups.com
This may be more of a DFS issue than Cascading, but it's hard to tell.
I ran a number of Cascading jobs on EMR today and sadly only 1/8 of
them completed successfully. The rest all died at some point or
another with this error ...

MapAttempt TASK_TYPE="MAP" TASKID="task_201005171939_0002_m_000004"
TASK_ATTEMPT_ID="attempt_201005171939_0002_m_000004_0"
TASK_STATUS="FAILED" FINISH_TIME="1274126370056"
HOSTNAME="/default-rack/domU-12-31-39-09-18-E4.compute-1.internal"
ERROR="java.io.IOException: Stream closed.
at org.apache.hadoop.dfs.DFSClient$DFSOutputStream.isClosed(DFSClient.java:2245)
at org.apache.hadoop.dfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:2692)
at org.apache.hadoop.dfs.DFSClient$DFSOutputStream.close(DFSClient.java:2657)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:61)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:81)
at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:957)
at org.apache.hadoop.mapred.SequenceFileOutputFormat$1.close(SequenceFileOutputFormat.java:73)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:278)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:240)
at org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2216)
"
I have successfully run this same MR job multiple times on EMR without
any issue before today, so I'm not sure what would have caused this,
but only 2 things changed about the way I ran things today ...

1. I launched multiple jobs at the same time (all 8 at the same time
in fact), and since EMR uses the same ec2 security groups for all the
launched jobs I'm not sure that any conflict could have been caused by
that.

2. I launched these jobs using the EMR ruby command line tool instead
of the AWS management console. I can't imagine that would change
anything about the runtime environment of the job, but since there is
not fullproof way of know exactly how the management console is
launching the jobs i can't be certain.

I'm tracking this on the AWS forums as well ...

http://developer.amazonwebservices.com/connect/thread.jspa?messageID=179242&#179242

is anything about this error likely to be caused by Cascading and the
way I am using a temp file in my MR job?

Thanks in advance,
Allen
>>>> To post to this group, send email to cascadi...@googlegroups.com.
>>>> To unsubscribe from this group, send email to
>>>> cascading-use...@googlegroups.com.
>>>> For more options, visit this group
>>>> athttp://groups.google.com/group/cascading-user?hl=en.
>>>
>>> --
>>> Chris K Wensel
>>> ch...@concurrentinc.comhttp://www.concurrentinc.com
>>>
>>> --
>>> You received this message because you are subscribed to the Google Groups
>>> "cascading-user" group.
>>> To post to this group, send email to cascadi...@googlegroups.com.
>>> To unsubscribe from this group, send email to

Chris K Wensel

unread,
May 17, 2010, 6:44:20 PM5/17/10
to cascadi...@googlegroups.com
Hey Allen

What version of Cascading? Where are you reading/writing files to in the failed tasks? if S3, is your bucket in the same region as your EMR cluster?

i don't see much pointing at Cascading causing this error. It would likely be very obvious by being in the stack trace.

ckw
--
Chris K Wensel
ch...@concurrentinc.com

Ken Krugler

unread,
May 17, 2010, 7:19:28 PM5/17/10
to cascadi...@googlegroups.com
Hi Alan,

I'd run into a similar error in the past (the IOException), which
wound up being a miss-direction. The real problem was that an
operation had failed badly, because of a thrown Error (not an
Exception). This caused the job to terminate abruptly, shutting down
files - and then the job cleanup code would complain.

You'd want to look through the individual error logs carefully for
tasks just suddenly ending without any cleanup code, as a sign of this
particular kind of problem.

-- Ken
>>> To post to this group, send email to cascading-
>>> us...@googlegroups.com.

Allen Gilliland

unread,
May 18, 2010, 7:30:17 PM5/18/10
to cascadi...@googlegroups.com
Thanks for the advice Ken. I went back into my code and added some
more try/catch blocks to catch any unexpected exceptions and that's
where my problem was.

I'm a bit surprised that Hadoop/EMR didn't give me any kind of
stacktrace with my uncaught exception in one of the log files. I
combed over all of the ones from my EMR runs that failed and I never
got a clear stacktrace pointing back to the error, but none the less i
was able to resolve the issue.

Cheers,
Allen


On Mon, May 17, 2010 at 4:19 PM, Ken Krugler
<kkrugle...@transpac.com> wrote:
> Hi Alan,
>
> I'd run into a similar error in the past (the IOException), which wound up
> being a miss-direction. The real problem was that an operation had failed
> badly, because of a thrown Error (not an Exception). This caused the job to
> terminate abruptly, shutting down files - and then the job cleanup code
> would complain.
>
> You'd want to look through the individual error logs carefully for tasks
> just suddenly ending without any cleanup code, as a sign of this particular
> kind of problem.
>
> -- Ken
>
> On May 17, 2010, at 3:38pm, Allen Gilliland wrote:
>
>> This may be more of a DFS issue than Cascading, but it's hard to tell.
>> I ran a number of Cascading jobs on EMR today and sadly only 1/8 of
>> them completed successfully.  The rest all died at some point or
>> another with this error ...
>>
>> MapAttempt TASK_TYPE="MAP" TASKID="task_201005171939_0002_m_000004"
>> TASK_ATTEMPT_ID="attempt_201005171939_0002_m_000004_0"
>> TASK_STATUS="FAILED" FINISH_TIME="1274126370056"
>> HOSTNAME="/default-rack/domU-12-31-39-09-18-E4.compute-1.internal"
>> ERROR="java.io.IOException: Stream closed.
>>       at
>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream.isClosed(DFSClient.java:2245)
>>       at
>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:2692)
>>       at
>> org.apache.hadoop.dfs.DFSClient$DFSOutputStream.close(DFSClient.java:2657)
>>       at
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:61)
>>       at
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:81)
>>       at
>> org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:957)
>>       at
>> org.apache.hadoop.mapred.SequenceFileOutputFormat$1.close(SequenceFileOutputFormat.java:73)
>>       at
>> org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.close(MapTask.java:278)
>>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:240)
>>       at
>> org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2216)
>>>>>> To post to this group, send email to cascadi...@googlegroups.com.
>>>>>> To unsubscribe from this group, send email to
>>>>>> cascading-use...@googlegroups.com.
>>>>>> For more options, visit this group
>>>>>> athttp://groups.google.com/group/cascading-user?hl=en.
>>>>>
>>>>> --
>>>>> Chris K Wensel
>>>>> ch...@concurrentinc.comhttp://www.concurrentinc.com
>>>>>
>>>>> --
>>>>> You received this message because you are subscribed to the Google
>>>>> Groups
>>>>> "cascading-user" group.
>>>>> To post to this group, send email to cascadi...@googlegroups.com.
>>>>> To unsubscribe from this group, send email to
>>>>> cascading-use...@googlegroups.com.
>>>>> For more options, visit this group
>>>>> athttp://groups.google.com/group/cascading-user?hl=en.
>>>>
>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups
>>>> "cascading-user" group.
Reply all
Reply to author
Forward
0 new messages