at cascading.tuple.Tuples.toDouble(Tuples.java:196)
at cascading.tuple.Tuple.getDouble(Tuple.java:254)
at cascading.tuple.TupleEntry.getDouble(TupleEntry.java:439)
at cascading.operation.aggregator.ExtremaBase.aggregate(ExtremaBase.java:120)
at cascading.flow.stream.AggregatorEveryStage.receive(AggregatorEveryStage.java:128)
... 24 more
However, when I change the above two lines of code to
Pipe grp_red_pipe = new GroupBy(new Retain(new Pipe("redPipe", red_pipe), new Fields("docid", "term_count")), new Fields("docid"));
Pipe maxtf_pipe = new Every(grp_red_pipe, new Fields("term_count"), new Max(new Fields("max")));
Then my code compiles and the test passes.
This means, Every is not able to recognize that I have grouped_by "docid" which
implies that I want to sum on the remaining field
not in the group fields (which is "term_count")
It is highly possible tha tI have misunderstood some constructor, and its totally my fault that I was getting the above error.
I'd really appreciate if you take a look at my source code for the tf-idf challenge in
this project. The function I wrote is pasted below:
public static FlowDef computeTfIdf(Tap<?, ?, ?> source, Tap<?, ?, ?> sink) {
Pipe input_pipe = new Pipe("sourcePipe");
Pipe pipe = new Each(input_pipe, new Fields("id", "content"),
new CustomWordFinderFunction(new Fields("docid", "term", "indicator")), Fields.SWAP); // nkelkar --comment get term freq
Pipe red_pipe = new SumBy(pipe, new Fields("docid", "term"), new Fields("indicator"), new Fields("term_count"), int.class);
Pipe grp_red_pipe = new GroupBy(new Retain(new Pipe("redPipe", red_pipe), new Fields("docid", "term_count")), new Fields("docid"));
Pipe maxtf_pipe = new Every(grp_red_pipe, new Fields("term_count"), new Max(new Fields("max"))); //(new Fields("term_count")));
//Pipe maxtf_pipe = new Each(grp_red_pipe, new Debug());
// nkelkar --comment now we have to rename join fields, CoGroup on those fields and retain "docid", "term", "term_count", "max" fields
Pipe first_pipe = new Rename(red_pipe, new Fields("docid", "term", "term_count"), new Fields("docid1", "term", "term_count"));
Pipe second_pipe = new Rename(maxtf_pipe, new Fields("docid", "max"), new Fields("docid2", "max"));
//System.out.println("first_pipe name: " + first_pipe.getName());
//System.out.println("second_pipe name: " + second_pipe.getName());
Pipe cogroup_pipe = new CoGroup(first_pipe, new Fields("docid1"), second_pipe, new Fields("docid2"));
Pipe intermediate_pipe = new Retain(cogroup_pipe, new Fields("docid1", "term", "term_count", "max"));
// nkelkac --comment calculate TF
Pipe tf_pipe = new Each(new Rename(intermediate_pipe, new Fields("docid1"), new Fields("docid")),
new CustomTfCalculatorFunction<Object>(new Fields("docid", "term1", "tf")));
// nkelkar --comment now, find IDF by CountBy on grouping field "term" and counting field "docid" on red_pipe
// Then, pass this pipe to a function and calculate IDF for each term with final fields {"term", "IDF"}
// Then, do a join of tf_pipe and idf_pipe, and write a custom function to multiply TF-IDF and throw out {docid, term, TF-IDF}
Pipe red_pipe2 = new Rename(new CountBy(red_pipe, new Fields("term"), new Fields("docid")), new Fields("docid"), new Fields("doc_count"));
Pipe idf_pipe = new Each(red_pipe2, new CustomIdfCalculatorFunction<Object>(new Fields("term", "idf")));
Pipe tfidf_pipe = new Each(new CoGroup(tf_pipe, new Fields("term1"), idf_pipe, new Fields("term")),
new CustomTfIdfCalculatorFunction<Object>(new Fields("docId", "tfidf", "word")));
Fields groupFields = new Fields("docId", "tfidf", "word");
groupFields.setComparator("docId", Collections.reverseOrder());
groupFields.setComparator("tfidf", Collections.reverseOrder());
groupFields.setComparator("word", Collections.reverseOrder());
Pipe final_tfidf = new GroupBy(tfidf_pipe, groupFields);
//Pipe test = new Each(new CoGroup(tf_pipe, new Fields("term1"), idf_pipe, new Fields("term")), new Debug());
return FlowDef.flowDef()//
.addSource(input_pipe, source) //
.addTail(final_tfidf)//
.addSink(final_tfidf, sink);
}Thanks and all help is appreciated!
Best,
Nishant