need help with the approach

282 views
Skip to first unread message

markpiller

unread,
Aug 14, 2009, 11:36:27 AM8/14/09
to cascading-user
Hi Chris,

I need your advice on how to structure a flow here. Here's what I
have:

a single row includes the following:

timestamp response_time

I have the code which calculates Count() for groups of records by
second and by minute and it is working well. The code looks like this:

Pipe tsPipe = new Each( "arrival rate", new Fields( "time" ), new
DateParser( "yyyy/MM/dd:HH:mm:ss" ) );
Pipe tsCountPipe = new Pipe( "tsCount", tsPipe );
tsCountPipe = new GroupBy( tsCountPipe, new Fields( "ts" ) );
tsCountPipe = new Every( tsCountPipe, Fields.GROUP, new Count() );

What is not clear to me is how to inject another field which would
calculate Average() for the the same grouping based on the values of
response_time.

Thanks,
Mark

markpiller

unread,
Aug 14, 2009, 11:46:35 AM8/14/09
to cascading-user
This is what I tried (and it does not work):

Pipe tsPipe = new Each( "arrival rate", new Fields( "time",
"resptime" ), new DateParser( new Fields( "time" ), "yyyy/MM/
dd:HH:mm:ss" ) );

// name the per second assembly and split on tsPipe
Pipe tsCountPipe = new Pipe( "tsCount", tsPipe );
tsCountPipe = new GroupBy( tsCountPipe, new Fields( "ts" ) );
Pipe transactionCount = new Every( tsCountPipe, Fields.GROUP, new
Count() );
Pipe averageRespTime = new Every( tsCountPipe, Fields.GROUP, new
Average( new Fields( "resptime" ) ) );

Pipe mergedPipe = new CoGroup( transactionCount,
averageRespTime );

The idea was to reuse the tsCountPipe since it is already grouped by
"ts" and calculate Average for "resptime". This fails with the
following exception:

java.lang.IllegalArgumentException: each input pipe branch must be
uniquely named
at cascading.pipe.Group.addGroupFields(Group.java:648)
at cascading.pipe.Group.<init>(Group.java:223)
at cascading.pipe.Group.<init>(Group.java:161)
at cascading.pipe.Group.<init>(Group.java:150)
at cascading.pipe.CoGroup.<init>(CoGroup.java:112)
at apppuncher.Main.main(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)

Please help, I am kinda lost...

Thanks,
Mark

Chris K Wensel

unread,
Aug 14, 2009, 11:49:25 AM8/14/09
to cascadi...@googlegroups.com
There are two ways.

First, just add an Average aggregator after the Count.

or, add a Sum aggregator after Count, then add an ExpressionFunction
after that to divide sum by count.

you may find the first simpler..

tsCountPipe = new Every( tsCountPipe, Fields.GROUP, new Count() );
tsCountPipe = new Every( tsCountPipe, new Fields("response_time"),
new Average() );

ckw
--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Chris K Wensel

unread,
Aug 14, 2009, 11:54:18 AM8/14/09
to cascadi...@googlegroups.com
I'm working today on improving the doc around how chained aggregators
work.

in short, just chain them, you don't need to make branches for each.

the results of Each are new tuples in the stream
the results of Every are new aggregate values on the same tuple

markpiller

unread,
Aug 14, 2009, 11:57:48 AM8/14/09
to cascading-user
Thanks, Chris.

With the following suggested approach:

tsCountPipe = new Every( tsCountPipe, Fields.GROUP, new Count
() );
tsCountPipe = new Every( tsCountPipe, new Fields("response_time"),
new Average() );

will it calculate averages for each of the created groups? (that would
be what I actually need)

Thanks,
Mark

Chris K Wensel

unread,
Aug 14, 2009, 12:01:26 PM8/14/09
to cascadi...@googlegroups.com
Yes, that's exactly what it does.

for every grouping, the set of Aggregators immediately following the
CoGroup or GroupBy are fired and their results are appended to the
current grouping values. you can chain as many Aggregators as you
want, they will all fire in the same reducer.

ckw

markpiller

unread,
Aug 14, 2009, 12:06:04 PM8/14/09
to cascading-user
This is the code now:

Pipe tsPipe = new Each( "arrival rate", new Fields( "time",
"resptime" ), new DateParser( new Fields( "time" ), "yyyy/MM/
dd:HH:mm:ss" ) );
Pipe tsCountPipe = new Pipe( "tsCount", tsPipe );
tsCountPipe = new GroupBy( tsCountPipe, new Fields( "ts" ) );
tsCountPipe = new Every( tsCountPipe, Fields.GROUP, new Count() );
tsCountPipe = new Every( tsCountPipe, new Fields("resptime"), new
Average() );

I am not sure if I am doing it the right way - i have included both
"time" and "resptime" in the Fields declaration when I create the
tsPipe.

The problem I run into now is:

09/08/14 11:05:06 INFO flow.MultiMapReducePlanner: using application
jar: /C:/cygwin/usr/local/hadoop/apppuncher.jar
cascading.flow.PlannerException: could not build flow from assembly:
[[tsCount][apppuncher.Main.main(Unknown Source)] could not resolve
grouping selector in: GroupBy(tsCount)[by:[{1}:'ts']]]
at cascading.flow.MultiMapReducePlanner.buildFlow
(MultiMapReducePlanner.java:226)
at cascading.flow.FlowConnector.connect(FlowConnector.java:
452)
at cascading.flow.FlowConnector.connect(FlowConnector.java:
434)
at cascading.flow.FlowConnector.connect(FlowConnector.java:
406)
at cascading.flow.FlowConnector.connect(FlowConnector.java:
373)
at apppuncher.Main.main(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke
(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke
(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:165)
at org.apache.hadoop.mapred.JobShell.run(JobShell.java:54)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:79)
at org.apache.hadoop.mapred.JobShell.main(JobShell.java:68)
Caused by: cascading.pipe.OperatorException: [tsCount]
[apppuncher.Main.main(Unknown Source)] could not resolve grouping
selector in: GroupBy(tsCount)[by:[{1}:'ts']]
at cascading.pipe.Group.resolveGroupingSelectors(Group.java:
879)
at cascading.pipe.Group.outgoingScopeFor(Group.java:847)
at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:
498)
at cascading.flow.ElementGraph.resolveFields(ElementGraph.java:
480)
at cascading.flow.MultiMapReducePlanner.buildFlow
(MultiMapReducePlanner.java:187)
... 14 more
Caused by: cascading.tuple.TupleException: field not found: 'ts',
available fields: ['time']
at cascading.tuple.Fields.indexOf(Fields.java:693)
at cascading.tuple.Fields.select(Fields.java:746)
at cascading.pipe.Group.resolveSelectorsAgainstIncoming
(Group.java:903)
at cascading.pipe.Group.resolveGroupingSelectors(Group.java:
860)
... 18 more

Chris K Wensel

unread,
Aug 14, 2009, 12:12:20 PM8/14/09
to cascadi...@googlegroups.com

try this:

Pipe tsPipe = new Each( "arrival rate", new Fields( "time" ), new
DateParser( new Fields( "ts" ), "yyyy/MM/dd:HH:mm:ss" ), Fields.ALL );

markpiller

unread,
Aug 14, 2009, 12:21:29 PM8/14/09
to cascading-user
That "Fields.ALL" is what I was missing. It works great now! Thank
you very much.

Mark

balajee venkatesh

unread,
Dec 2, 2015, 2:37:23 AM12/2/15
to cascading-user
Thanks a lot sir :-) This concept worked for me as well. I was facing the similar scenario where I had to find the respective output of 'Count()' and 'Sum()' together. Ultimately this concept worked pretty well.
Reply all
Reply to author
Forward
0 new messages