Bug in Average?

163 views
Skip to first unread message

Yifan

unread,
Jan 23, 2012, 2:19:12 PM1/23/12
to cascading-user
Hi,

I got this weird thing in job.

pipe = new GroupBy(pipe, new Fields(KEY));
pipe = new Every(pipe, new Fields(VALUE), new Average());

The code crushed at the reducing stage by

Caused by: java.lang.NumberFormatException: For input string:
"pv2:u100001275"
at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:
1222)
at java.lang.Double.parseDouble(Double.java:510)
at cascading.tuple.Tuples.toDouble(Tuples.java:177)
at cascading.tuple.Tuple.getDouble(Tuple.java:265)
at cascading.tuple.TupleEntry.getDouble(TupleEntry.java:355)
at cascading.operation.aggregator.Average.aggregate(Average.java:88)
at cascading.pipe.Every$EveryAggregatorHandler.operate(Every.java:
426)

It seems to be Averaging over the KEY column instead of the VALUE
column. Any idea how?

Yifan

unread,
Jan 23, 2012, 2:56:04 PM1/23/12
to cascading-user
In particular:

Here is what I see in cascading:
public void aggregate( FlowProcess flowProcess,
AggregatorCall<Context> aggregatorCall )
{
Context context = aggregatorCall.getContext();
TupleEntry arguments = aggregatorCall.getArguments();

context.sum += arguments.getDouble( 0 );
context.count += 1d;
}

does
new Every(pipe, new Fields(VALUE), new Average());
correctly remove the extra columns in tuple then
arguments.getDouble(0)
find that column?

It seems not working.

Chris K Wensel

unread,
Jan 23, 2012, 4:13:29 PM1/23/12
to cascadi...@googlegroups.com

You might add a Debug to the pipe assembly to prove to yourself what's in the value column is what you think it is.

just replace the Every, with and Each( pipe, new Debug());

ckw

> --
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>

--
Chris K Wensel
ch...@concurrentinc.com
http://concurrentinc.com

Nishant Kelkar

unread,
Oct 2, 2013, 6:29:54 PM10/2/13
to cascadi...@googlegroups.com
This bug is well reported. I did the new Each(pipe, new Debug()) as Chris suggested, and I correctly see the Key/Value pairs as they should be. When I try

new Every(grouped_pipe, new Average(new Fields("value_to_avg_on")))

I get a "java.lang.NumberFormatException" saying that a string cannot be converted into a number. My key is a String and value is an Integer, btw.

IS there any fix to this bug? I am using Cascading 1.2. Any and all help is most appreciated!

Best Regards,
Nishant Kelkar

Chris K Wensel

unread,
Oct 4, 2013, 11:47:06 AM10/4/13
to cascadi...@googlegroups.com
Sorry, this bug is not well reported as I do not have a test case including data. I've spent the morning attempting to cause a failure regardless.

The best approach to filing a well reported bug is outlined here

fwiw, the current stable release is Cascading 2.1.6 (you listed you are on 1.2 which is over a 18mos old). 

But if you can test against 2.2 that would be very helpful.

ckw

To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.

Chris K Wensel

unread,
Oct 4, 2013, 11:53:04 AM10/4/13
to cascadi...@googlegroups.com
fwiw, here is one of the tests I created

Nishant Kelkar

unread,
Oct 7, 2013, 5:39:09 PM10/7/13
to cascadi...@googlegroups.com
Hi Chris,

I'm extremely sorry for the bad typo in my comment above, regarding the version of Cascading that I was using. I meant to say that I was using 2.1.5.
Really sorry about that.

However, I still think there might be something wrong with the Aggregator functions. The lines of code in my tf-idf calculating project that cascading.learn throws
errors on are

        Pipe grp_red_pipe = new GroupBy(new Retain(new Pipe("redPipe", red_pipe), new Fields("docid", "term_count")), new Fields("docid"));
        Pipe  maxtf_pipe = new Every(grp_red_pipe, new Max(new Fields("max")));
       
The error log I get, is as follows:

cascading.flow.FlowException: local step failed
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:208)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
    at java.util.concurrent.FutureTask.run(FutureTask.java:138)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
    at java.lang.Thread.run(Thread.java:662)
Caused by: cascading.pipe.OperatorException: [redPipe][fr.xebia.cascading.learn.level5.FreestyleJobs.computeTfIdf(FreestyleJobs.java:84)] operator Every failed executing operation: Max[decl:'max'][args:1]
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:136)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:39)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:49)
    at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:105)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:268)
    at cascading.flow.stream.Fork.complete(Fork.java:60)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:268)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:114)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:268)
    at cascading.flow.stream.Duct.complete(Duct.java:81)
    at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:268)
    at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
    at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
    ... 5 more
Caused by: java.lang.NumberFormatException: For input string: "0#intro"

    at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1222)
    at java.lang.Double.parseDouble(Double.java:510)
    at cascading.tuple.Tuples.toDouble(Tuples.java:196)
    at cascading.tuple.Tuple.getDouble(Tuple.java:254)
    at cascading.tuple.TupleEntry.getDouble(TupleEntry.java:439)
    at cascading.operation.aggregator.ExtremaBase.aggregate(ExtremaBase.java:120)
    at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
    ... 24 more

However, when I change the above two lines of code to

        Pipe grp_red_pipe = new GroupBy(new Retain(new Pipe("redPipe", red_pipe), new Fields("docid", "term_count")), new Fields("docid"));
        Pipe  maxtf_pipe = new Every(grp_red_pipe, new Fields("term_count"), new Max(new Fields("max")));

Then my code compiles and the test passes.

This means, Every is not able to recognize that I have grouped_by "docid" which implies that I want to sum on the remaining field not in the group fields (which is "term_count")

It is highly possible tha tI have misunderstood some constructor, and its totally my fault that I was getting the above error.

I'd really appreciate if you take a look at my source code for the tf-idf challenge in this project. The function I wrote is pasted below:


    public static FlowDef computeTfIdf(Tap<?, ?, ?> source, Tap<?, ?, ?> sink) {
        Pipe input_pipe = new Pipe("sourcePipe");
        Pipe pipe = new Each(input_pipe, new Fields("id", "content"),
                             new CustomWordFinderFunction(new Fields("docid", "term", "indicator")), Fields.SWAP); // nkelkar --comment get term freq

        Pipe red_pipe = new SumBy(pipe, new Fields("docid", "term"), new Fields("indicator"), new Fields("term_count"), int.class);

        Pipe grp_red_pipe = new GroupBy(new Retain(new Pipe("redPipe", red_pipe), new Fields("docid", "term_count")), new Fields("docid"));
        Pipe  maxtf_pipe = new Every(grp_red_pipe, new Fields("term_count"), new Max(new Fields("max"))); //(new Fields("term_count")));
        //Pipe maxtf_pipe = new Each(grp_red_pipe, new Debug());
        // nkelkar --comment now we have to rename join fields, CoGroup on those fields and retain "docid", "term", "term_count", "max" fields
        Pipe first_pipe = new Rename(red_pipe, new Fields("docid", "term", "term_count"), new Fields("docid1", "term", "term_count"));
        Pipe second_pipe = new Rename(maxtf_pipe, new Fields("docid", "max"), new Fields("docid2", "max"));

        //System.out.println("first_pipe name: " + first_pipe.getName());
        //System.out.println("second_pipe name: " + second_pipe.getName());

        Pipe cogroup_pipe = new CoGroup(first_pipe, new Fields("docid1"), second_pipe, new Fields("docid2"));
        Pipe intermediate_pipe = new Retain(cogroup_pipe, new Fields("docid1", "term", "term_count", "max"));

        // nkelkac --comment calculate TF
        Pipe tf_pipe = new Each(new Rename(intermediate_pipe, new Fields("docid1"), new Fields("docid")),
                          new CustomTfCalculatorFunction<Object>(new Fields("docid", "term1", "tf")));


        // nkelkar --comment now, find IDF by CountBy on grouping field "term" and counting field "docid" on red_pipe
        //                   Then, pass this pipe to a function and calculate IDF for each term with final fields {"term", "IDF"}
        //                   Then, do a join of tf_pipe and idf_pipe, and write a custom function to multiply TF-IDF and throw out {docid, term, TF-IDF}
        Pipe red_pipe2 = new Rename(new CountBy(red_pipe, new Fields("term"), new Fields("docid")), new Fields("docid"), new Fields("doc_count"));

        Pipe idf_pipe = new Each(red_pipe2, new CustomIdfCalculatorFunction<Object>(new Fields("term", "idf")));

        Pipe tfidf_pipe = new Each(new CoGroup(tf_pipe, new Fields("term1"), idf_pipe, new Fields("term")),
                          new CustomTfIdfCalculatorFunction<Object>(new Fields("docId", "tfidf", "word")));

        Fields groupFields = new Fields("docId", "tfidf", "word");

        groupFields.setComparator("docId", Collections.reverseOrder());
        groupFields.setComparator("tfidf", Collections.reverseOrder());
        groupFields.setComparator("word", Collections.reverseOrder());

        Pipe final_tfidf = new GroupBy(tfidf_pipe, groupFields);

        //Pipe test = new Each(new CoGroup(tf_pipe, new Fields("term1"), idf_pipe, new Fields("term")), new Debug());
        return FlowDef.flowDef()//
                .addSource(input_pipe, source) //
                .addTail(final_tfidf)//
                .addSink(final_tfidf, sink);
    }




Thanks and all help is appreciated!

Best,
Nishant

Nishant Kelkar

unread,
Oct 7, 2013, 5:46:33 PM10/7/13
to cascadi...@googlegroups.com
Oh, btw, I used the Max() function in my example below, but the error is the same if I use Average() instead. Just thought
that I should add a small note since this thread is on "Bug in Average".

Andre Kelpe

unread,
Oct 7, 2013, 5:49:44 PM10/7/13
to cascadi...@googlegroups.com
Please be aware, that many parts of java are locale aware. You are
most likely parsing/printing/formatting a number in a different
format, than your systems locale is set to. Can you do an export

export LANG=en_US.UTF-8

and rerun your program?

This looks a lot like such a problem: Caused by:
java.lang.NumberFormatException: For input string: "0#intro"

- André
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com

Nishant Kelkar

unread,
Oct 7, 2013, 6:14:30 PM10/7/13
to cascadi...@googlegroups.com
Hi Andre,

I am running my tests in IntelliJ CE on Windows. I still went into the Control Panel and Languages settings...The language is English-US over there.

Any other hints/pointers?

Thanks for your concern!

Best Regards,
Nishant Kelkar
Reply all
Reply to author
Forward
0 new messages