Filtering out the data

28 views
Skip to first unread message

Ben

unread,
Mar 28, 2012, 8:01:36 PM3/28/12
to cascading-user
Hi, Guys,

The input pipe is as follows:

ID_1 ID_2 sum_1 sum_2
AB 1 46000 6000
AB 1 30000 5000
AB 1 25000 7000
AB 1 20000 6000
AB 1 10000 4500
AB 1 10000 4000
AB 1 9000 4000
AB 2 56000 5000
AB 2 33000 4000
AB 2 22000 4000
AB 2 20000 3000
AB 2 12000 2500
AB 2 12000 2000
AB 2 8000 1000
AC 1 75000 4000
AC 1 62000 3500
AC 1 50000 2000
AC 1 50000 1300
AC 1 34000 1500
AC 1 26000 1200
AC 1 19000 1100
AC 3 64000 5500
AC 3 47000 3400
AC 3 33000 2900
AC 3 25000 2300
AC 3 19000 1900
AC 3 12000 1900
AC 3 11000 1200

How can I produce the output pipe (keeping top 3 in term of sum_1 for
each group) looks like the followings?

ID_1 ID_2 sum_1 sum_2
AB 1 46000 6000
AB 1 30000 5000
AB 1 25000 7000
AB 2 56000 5000
AB 2 33000 4000
AB 2 22000 4000
AC 1 75000 4000
AC 1 62000 3500
AC 1 50000 2000
AC 3 64000 5500
AC 3 47000 3400
AC 3 33000 2900

Thank you for your help.

Ben


Chris K Wensel

unread,
Mar 29, 2012, 1:07:26 AM3/29/12
to cascadi...@googlegroups.com
The short answer is a GroupBy on ID_1 and ID_2 with secondary sort on sum_1, followed by a custom Buffer/Aggregator to re-emit the first N items it see (where N==3).

See how First works for an idea. I keep forgetting to create a TopN like Aggregator, I'll push it up the list..

ckw

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://concurrentinc.com

Joris Bontje

unread,
Mar 29, 2012, 1:37:52 AM3/29/12
to cascadi...@googlegroups.com
Where the TopN buffer would be something like this:

public class TopN extends BaseOperation implements Buffer {
private long maxResults;

public TopN(long maxResults) {
super(Fields.ARGS);
this.maxResults = maxResults;
}

public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
long count = 0;
Iterator<TupleEntry> arguments = bufferCall.getArgumentsIterator();
while (arguments.hasNext() && count < maxResults) {
bufferCall.getOutputCollector().add(arguments.next());
count++;

Ben

unread,
Mar 30, 2012, 12:22:04 PM3/30/12
to cascading-user
Thank you for your info. However, it does not seem to work for me.

Ben
> >> For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
Mar 30, 2012, 12:31:17 PM3/30/12
to cascadi...@googlegroups.com

On Mar 30, 2012, at 9:22am, Ben wrote:

Thank you for your info. However, it does not seem to work for me.

If you share at least that section of your code, and explain what didn't work, then it's going to be easier to help :)

-- Ken
--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Mahout & Solr




Ben

unread,
Mar 30, 2012, 1:48:04 PM3/30/12
to cascading-user
The code looks like the followings:

Fields keyFields= new Fields("ID_1", "ID_2");
Fields dataFields = new
Fields("sum_trans","sum_acts");
Fields resultFields = keyFields.append(dataFields);
Fields sortFields = new Fields("sum_trans");
sortFields.setComparator("sum_trans", Collections.reverseOrder());
Pipe bAssembly = new GroupBy("new_pipe", bAssembly_orig, keyFields,
sortFields);
Buffer<?> topN = new TopN(10);
bAssembly = new Every(bAssembly, resultFields, topN,
Fields.RESULTS);

The TopN is like what Joris provided.

The final bAssembly is the same as the bAssembly_org.

Thanks,

Ben
> Ken Kruglerhttp://www.scaleunlimited.com

Ken Krugler

unread,
Mar 30, 2012, 2:39:26 PM3/30/12
to cascadi...@googlegroups.com
Hi Ben,

Thanks - though you didn't explain what didn't work.

What output did you get, versus what did you expect?

-- Ken

For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


--------------------------
Ken Krugler

Ben

unread,
Mar 30, 2012, 2:51:06 PM3/30/12
to cascading-user
Hi, Ken,

The input and output are mentioned in my first message. My input is
the a pipe and the output is a sub-pipe of top N in term of sum_1 for
each group (ID_1 and ID_2).

Thanks,

Ben

Chris K Wensel

unread,
Mar 30, 2012, 3:03:09 PM3/30/12
to cascadi...@googlegroups.com

Putting a new Each(...., new Debug(true) ) before the GroupBy and after the Every may be enlightening (be aware doing this on the cluster will only put the debug output in the cluster).

Or just making a test and running it through a debugger in your IDE to see both the debug output locally and step through your custom aggregator.

ckw

> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Ken Krugler

unread,
Mar 30, 2012, 8:54:54 PM3/30/12
to cascadi...@googlegroups.com
Hi Ben,

On Mar 30, 2012, at 11:51am, Ben wrote:

Hi, Ken,

The input and output are mentioned in my first message. My input is
the a pipe and the output is a sub-pipe of top N in term of sum_1 for
each group (ID_1 and ID_2).

You'd said "However, it does not seem to work for me", which is sufficiently vague that it's hard to provide much help.

So I was asking about the actual output you got from the workflow (data), versus what you expected.

Regards,

-- Ken


For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


--------------------------
Ken Krugler
Reply all
Reply to author
Forward
0 new messages