I found that the API is correct for KeyedPipe. The function connecting to the pipe can only use streaming semantics. So the code looks like following
persistedTweets
.keyedPipe()
.compute(new ComputeMatchingTweets())
.addInput("delete-input", cachedDeleteIDs)
.pipe()
.cache();
private class ComputeMatchingTweets extends BaseComputeCollectorFunc<Tuple<String, BigInteger>, Tuple<BigInteger, String>> {
private TSetContext ctx;
private Set<BigInteger> deleteSet = new TreeSet<>();
public ComputeMatchingTweets() {
}
@Override
public void prepare(TSetContext context) {
ctx = context;
DataPartition a = context.getInput("delete-input");
DataPartitionConsumer<BigInteger> consumer = a.getConsumer();
while (consumer.hasNext()) {
BigInteger deleteTweetID = consumer.next();
deleteSet.add(deleteTweetID);
}
LOG.info("ADDED TUPLES TO DELETE SET: " + deleteSet.size());
}
@Override
public void compute(Tuple<BigInteger, String> input, RecordCollector<Tuple<String, BigInteger>> output) {
if (deleteSet.contains(input.getKey())) {
output.collect(new Tuple<>(input.getValue(), input.getKey()));
}
}
}
--
Supun Kamburugamuve, PhD
Digital Science Center, Indiana University