Class[] inputTypes = new Class[] {int.class, String.class, int.class};
Class[] outputTypes = new Class[] {String.class, int.class, int.class};
Hfs inputTap = new Hfs(new TextDelimited(new Fields("id", "name", "amount"), true, false, ",", inputTypes), inputFilePath);
// create a new pipe
Pipe pipe = new Pipe("pipe");
// group by name, sort by name
Pipe minPipe = new GroupBy("minbranch", pipe, new Fields("name"), new Fields("amount"));
// now group again, but this time in reverse order
Fields sortFields = new Fields("amount");
sortFields.setComparator("amount", Collections.reverseOrder());
Pipe maxPipe = new GroupBy("maxbranch", pipe, new Fields("name"), sortFields);
// Extract the minimum values based on the default sort
minPipe = new Every(minPipe, new First());
// Extract maximum values based on reverse sort
maxPipe = new Every(maxPipe, new First());
// now lets join the two pipes
Fields common = new Fields("name");
Pipe outputPipe = new CoGroup(minPipe, common, maxPipe, common, new Fields("name", "minamount", "name2", "maxamount"));
// define output fields
Fields outFields = new Fields("name", "maxamount", "minamount");
outputPipe = new Each(outputPipe, new Identity(),outFields);
Scheme outScheme = new TextDelimited(outFields, false, true, ",", outputTypes);
Hfs outputTap = new Hfs(outScheme, outputFilePath);
// create the follow
FlowDef flowDef = FlowDef.flowDef()
.addSource(pipe, inputTap)
.addTailSink(outputPipe, outputTap);
Flow flow = new Hadoop2MR1FlowConnector(p).connect(flowDef);flow.complete();
From: Abhishek Srivastava
Sent: September 5, 2015 8:17:00am PDT
To: cascading-user
Subject: Re: Extracting Min and Max values from a column
--
pipe = new AggregateBy(
pipe,
new Fields("name"),
new MinBy(new Fields("amount"), new Fields("minamount")),
new MaxBy(new Fields("amount"), new Fields("maxamount"))
);
From: Abhishek Srivastava
Sent: September 10, 2015 12:02:30pm PDT
To: cascading-user
Subject: Re: Extracting Min and Max values from a column
Thanks Ken! the approach which you suggest works perfectly and is a very elegant solution.pipe = new AggregateBy(
pipe,
new Fields("name"),
new MinBy(new Fields("amount"), new Fields("minamount")),
new MaxBy(new Fields("amount"), new Fields("maxamount"))
);
But just for sake of learning.... what was wrong with my code above?
so if I take an input and perform 2 operations in two separate pipes and then join the results back.
Fields outFields = new Fields("name", "maxamount", "minamount");
outputPipe = new Each(outputPipe, new Identity(),outFields);
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/0b5ea400-e481-4de1-a8e0-27b698fb397b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.