KeyedPipe

1 view
Skip to first unread message

Supun Kamburugamuve

unread,
Jul 27, 2020, 1:07:26 PM7/27/20
to Twister2, Ahmet Uyar
Hi Ahmet,

I found that the API is correct for KeyedPipe. The function connecting to the pipe can only use streaming semantics. So the code looks like following

// calculate matched tweetID-datepairs
CachedTSet<Tuple<String, BigInteger>> cachedMatchedTweets =
    persistedTweets
.keyedPipe()
.compute(new ComputeMatchingTweets())
.addInput("delete-input", cachedDeleteIDs)
.pipe()
.cache();

private class ComputeMatchingTweets extends BaseComputeCollectorFunc<Tuple<String, BigInteger>, Tuple<BigInteger, String>> {

private TSetContext ctx;
private Set<BigInteger> deleteSet = new TreeSet<>();

public ComputeMatchingTweets() {
}

@Override
public void prepare(TSetContext context) {
ctx = context;

DataPartition a = context.getInput("delete-input");
DataPartitionConsumer<BigInteger> consumer = a.getConsumer();
while (consumer.hasNext()) {
BigInteger deleteTweetID = consumer.next();
deleteSet.add(deleteTweetID);
}
LOG.info("ADDED TUPLES TO DELETE SET: " + deleteSet.size());
}

@Override
public void compute(Tuple<BigInteger, String> input, RecordCollector<Tuple<String, BigInteger>> output) {
if (deleteSet.contains(input.getKey())) {
output.collect(new Tuple<>(input.getValue(), input.getKey()));
}
}
}

--
Supun Kamburugamuve, PhD
Digital Science Center, Indiana University
Member, Apache Software Foundation; http://www.apache.org
E-mail: supun@apache.org;  Mobile: +1 812 219 2563


Ahmet Uyar

unread,
Jul 27, 2020, 3:34:28 PM7/27/20
to Supun Kamburugamuve, Twister2
Hi Supun,

Thanks for that. It is working now with pipe. 

keyedPipe and keyedDirect works with different data types then. 
keyedDirect expects an iterator but keyedPipe expects a tuple. 
I thought both would work with the same data type.

Ahmet

Supun Kamburugamuve

unread,
Jul 27, 2020, 11:50:21 PM7/27/20
to Ahmet Uyar, Twister2
Hi Ahmet,

Pipe is an in-memory streaming semantic. So it cannot have iterator afterwords.

task1 -> pipe -> task2

For pipe, we assume task1 and task2 are in-memory (most of the time). Task 1 can be producing multiple tuples and these are streamed to the task2. 

Best,
Supun..


Reply all
Reply to author
Forward
0 new messages