@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void test() throws Exception {
File dir = new File("build/test/UnionDistinctTest/test/");
dir.mkdirs();
// Create some records in table A. One of these is also in B ("bob") so it gets removed, the
// other two should remain.
Tap tapA = new FileTap(new TextDelimited(new Fields("X", "Y")), "build/test/UnionDistinctTest/test/inA", SinkMode.REPLACE);
TupleEntryCollector write = tapA.openForWrite(new LocalFlowProcess());
write.add(new Tuple("bob", 1));
write.add(new Tuple("tom", 3));
write.add(new Tuple("mark", 5));
write.close();
// Create some records in table B
Tap tapB = new FileTap(new TextDelimited(new Fields("X", "Y")), "build/test/UnionDistinctTest/test/inB", SinkMode.REPLACE);
write = tapB.openForWrite(new LocalFlowProcess());
write.add(new Tuple("bob", 1));
write.add(new Tuple("bob", 2));
write.add(new Tuple("john", 3));
write.add(new Tuple("carol", 5));
write.close();
Pipe pipeA = new Pipe("pipe for table A");
Pipe pipeB = new Pipe("pipe from table B");
// Create a pipe with only unique values of field X. We rename it to avoid conflicts with the same
// field in pipeA during the CoGroup, and discard field Y since we don't need it.
Pipe uniqueB = new Unique(pipeB, new Fields("X"));
uniqueB = new Rename(uniqueB, new Fields("X"), new Fields("B_X"));
uniqueB = new Discard(uniqueB, new Fields("Y"));
Pipe inAnotB = new CoGroup( pipeA, new Fields("X"),
uniqueB, new Fields("B_X"),
null,
new OuterJoin());
// At this point inAnotB will have tuples with B_X set to null for cases where "X" in A doesn't have
// a corresponding value in B. So those are the ones we want...
inAnotB = new Each(inAnotB, new Fields("B_X"), new FilterNotNull());
inAnotB = new Discard(inAnotB, new Fields("B_X"));
// Finally do the union of all records in table B with the subset from table A.
Pipe result = new Merge(pipeB, inAnotB);
Tap sinkTap = new FileTap(new TextDelimited(), "build/test/UnionDistinctTest/test/out", SinkMode.REPLACE);
FlowDef flowDef = new FlowDef()
.addSource(pipeA, tapA)
.addSource(pipeB, tapB)
.addTailSink(result, sinkTap);
Flow flow = new LocalFlowConnector().connect(flowDef);
flow.complete();
}