Extracting Min and Max values from a column

已查看 54 次
跳至第一个未读帖子

Abhishek Srivastava

未读,
2015年9月2日 19:15:032015/9/2
收件人 cascading-user
I have an input file which looks like this

id,name,amount
1, test1, 10
2, test2, 15
3, test3, 20
4, test1, 5
5, test2, 35
6, test3, 0
7, test1, 42
8, test2, 12
9, test3, 1
10, test1, 0
11, test2, 33
12, test3, 22

Here id is just a sequence.

I want the output file to look like

id, name, minamount, maxamount
1, test1, 0, 42
2, test2, 12, 35
3, test3, 0, 22

Here also id, is just a sequence (ever increasing integer for ever record).

In order to perform this transformation I have written this code.

Here is how I think the problem can be solved (correct me if I am wrong, or there is a better way)

1. read the input file by a tap
2. create one pipe by GroupBy name, and sort on amount
3. create second pipe by GroupBy name, and reverse sort on amount
4. then take first for pipe1
5. take first from pipe2
6. join the two pipes
7. remove duplicate name2.
8. write the output pipe to the output tap.

This is my code

Class[] inputTypes = new Class[] {int.class, String.class, int.class};
Class[] outputTypes = new Class[] {String.class, int.class, int.class};
Hfs inputTap = new Hfs(new TextDelimited(new Fields("id", "name", "amount"), true, false, ",", inputTypes), inputFilePath);

// create a new pipe
Pipe pipe = new Pipe("pipe");

// group by name, sort by name
Pipe minPipe = new GroupBy("minbranch", pipe, new Fields("name"), new Fields("amount"));

// now group again, but this time in reverse order
Fields sortFields = new Fields("amount");
sortFields
.setComparator("amount", Collections.reverseOrder());
Pipe maxPipe = new GroupBy("maxbranch", pipe, new Fields("name"), sortFields);

// Extract the minimum values based on the default sort
minPipe
= new Every(minPipe, new First());

// Extract maximum values based on reverse sort
maxPipe
= new Every(maxPipe, new First());

// now lets join the two pipes
Fields common = new Fields("name");
Pipe outputPipe = new CoGroup(minPipe, common, maxPipe, common, new Fields("name", "minamount", "name2", "maxamount"));

// define output fields
Fields outFields = new Fields("name", "maxamount", "minamount");
outputPipe
= new Each(outputPipe, new Identity(),outFields);
Scheme outScheme = new TextDelimited(outFields, false, true, ",", outputTypes);
Hfs outputTap = new Hfs(outScheme, outputFilePath);

// create the follow
FlowDef flowDef = FlowDef.flowDef()
 
.addSource(pipe, inputTap)
 
.addTailSink(outputPipe, outputTap);
Flow flow = new Hadoop2MR1FlowConnector(p).connect(flowDef);
flow.complete();



I am getting errors when I execute it... but I will publish the error message later. Does this code and thinking look correct?

it would have been good if I could have found the Min and Max in a single operation rather than doing groupby twice.

My understanding is that Every can take only 1 aggregation at a time. So I need two everys and thus two groupbys

I may be wrong...

Also, once the grouping has been done, how do I generate the new id sequence?

Abhishek Srivastava

未读,
2015年9月5日 11:17:012015/9/5
收件人 cascading-user
bump. I will appreciate any help. Also let me know of the question is not clear. I will try to rephrase it.

Ken Krugler

未读,
2015年9月5日 12:50:372015/9/5
收件人 cascadi...@googlegroups.com
Calculating both min and max in a single reduce phase is pretty easy.

MinValue and MaxValue are Aggregator operations, so they can be combined using AggregateBy

e.g.

pipe = AggregateBy(pipe, new Fields("name", new MinBy(), new MaxBy());

There's a separate issue, where you say "I want the output file to look like …., id is just a sequence"

Each reduce task outputs a separate part file, so in the case where you have two reduce tasks, what should be the IDs in each file?

If it's OK for both files to have ids 1, 2, 3, 4…, or to run with a single reduce task, then that's pretty easy.

If you want a globally unique (across all part files) id, and you don't want to run with a single reduce task, then it gets more complicated.

-- Ken
 


From: Abhishek Srivastava

Sent: September 5, 2015 8:17:00am PDT

To: cascading-user

Subject: Re: Extracting Min and Max values from a column


--

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr




Abhishek Srivastava

未读,
2015年9月10日 15:02:302015/9/10
收件人 cascading-user
Thanks Ken! the approach which you suggest works perfectly and is a very elegant solution.


pipe = new AggregateBy(

 pipe
,
 
new Fields("name"),

 
new MinBy(new Fields("amount"), new Fields("minamount")),
 
new MaxBy(new Fields("amount"), new Fields("maxamount"))
);


But just for sake of learning.... what was wrong with my code above?

so if I take an input and perform 2 operations in two separate pipes and then join the results back.

Ken Krugler

未读,
2015年9月10日 17:50:302015/9/10
收件人 cascadi...@googlegroups.com


From: Abhishek Srivastava

Sent: September 10, 2015 12:02:30pm PDT

To: cascading-user

Subject: Re: Extracting Min and Max values from a column


Thanks Ken! the approach which you suggest works perfectly and is a very elegant solution.


pipe = new AggregateBy(
 pipe,
 new Fields("name"),
 new MinBy(new Fields("amount"), new Fields("minamount")),
 new MaxBy(new Fields("amount"), new Fields("maxamount"))
);


But just for sake of learning.... what was wrong with my code above?

so if I take an input and perform 2 operations in two separate pipes and then join the results back.

That should work. Without the error message it's hard to know, but this looks wrong to me:

Fields outFields = new Fields("name", "maxamount", "minamount");
outputPipe = new Each(outputPipe, new Identity(),outFields);
Here you're trying to drop the "name2" field (from the RHS pipe) that you used in the join.

So best is to use Discard, e.g.

outputPipe = new Discard(outputPipe, new Fields("name2"));

Or if you're using Identity, then you want to pass in the fields that you'd like to retain, e.g.

outputPipe = new Each(outputPipe, outFields, new Identity());

So here you're using outFields as the argument selector, which then limits what gets passed to the Identity() function.

You were passing these as the Each operation's output selector - this (almost?) always should be a logic Fields value, e.g. Fields.RESULTS, Fields.SWAP, etc.

 -- Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/0b5ea400-e481-4de1-a8e0-27b698fb397b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
回复全部
回复作者
转发
0 个新帖子