A much more verbose version in Java, but one that avoids generating all possible combinations of words…
public class WordCooccurrence extends SubAssembly {
private static class SetFilter extends BaseOperation<NullContext> implements Filter<NullContext> {
private Set<String> _validWords;
public SetFilter(Set<String> validWords) {
_validWords = validWords;
}
@SuppressWarnings("rawtypes")
@Override
public boolean isRemove(FlowProcess flowProcess, FilterCall<NullContext> filterCall) {
return !_validWords.contains(filterCall.getArguments().getTuple().getString(1));
}
}
public WordCooccurrence(Pipe documentsPipe, String docIdFieldname, String docTextFieldname, Set<String> listA, Set<String> listB) {
// Filter out everything but what we need.
documentsPipe = new Each(documentsPipe, new Fields(docIdFieldname, docTextFieldname), new Identity());
// Parse the text into separate words. We could do this more efficiently by implementing a custom function that
// outputs <doc id> and <word> as two fields.
//
// Normally you'd do something better here for parsing, and you'd want to normalize text
documentsPipe =
new Each(documentsPipe,
new Fields(docTextFieldname),
new RegexSplitGenerator(
new Fields(
"listAword"),
"\\s+"), Fields.
ALL);
// Filter out the original docTextFieldname.
documentsPipe = new Each(documentsPipe, new Fields(docIdFieldname, "listAword"), new Identity());
// We only care about unique terms.
documentsPipe = new Unique(documentsPipe, new Fields(docIdFieldname, "listAword"));
// Split, and only leave around words that exist in either set
Pipe listAPipe = new Pipe("list A words", documentsPipe);
listAPipe = new Each(listAPipe, new SetFilter(listA));
Pipe listBPipe = new Pipe("list B words", documentsPipe);
listBPipe = new Each(listBPipe, new SetFilter(listB));
// Rename this right side to avoid field name collisions in the CoGroup
listBPipe = new Rename(listBPipe, new Fields(docIdFieldname, "listAword"), new Fields("listBdocID", "listBword"));
// Do a CoGroup with an inner join to get all the word combinations for a given document.
Pipe resultsPipe = new CoGroup( listAPipe, new Fields(docIdFieldname),
listBPipe, new Fields("listBdocID"),
null,
new InnerJoin());
// Get rid of the extra listBdocID field that we don't need
resultsPipe = new Each(resultsPipe, new Fields(docIdFieldname, "listAword", "listBword"), new Identity());
setTails(resultsPipe);
}
}
And a snippet of code that tests it:
public void test() throws Exception {
Tap sourceTap = new InMemoryTap(new Fields("docid", "text"));
TupleEntryCollector writer = sourceTap.openForWrite(new LocalFlowProcess());
writer.add(new Tuple("D1", "The quick brown fox jumped over the lazy dog"));
writer.add(new Tuple("D2", "The boy ran to the market"));
writer.add(new Tuple("D3", "The girl walked over to the cat"));
writer.close();
Set<String> listA = new HashSet<String>();
Collections.addAll(listA, "fox chicken hen dog cat".split(" "));
Set<String> listB = new HashSet<String>();
Collections.addAll(listB, "ran jumped walked crawled".split(" "));
Pipe p = new Pipe("cooccurrence test");
p = new WordCooccurrence(p, "docid", "text", listA, listB);
p = new Each(p, new Debug(true));
FlowDef fd = new FlowDef();
fd.addSource(p, sourceTap);
fd.addTailSink(p, new NullSinkTap());
new LocalFlowConnector().connect(fd).complete();
-- Ken