I am a newbie to the Cascading API.
I wrote the following sample in italics that does a self-join using HashJoin.
String infile = args[0];
String outfile = args[1];
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, MyFlow8.class);
AppProps.setApplicationName(properties,"myflow8");
FlowConnector flowConnector = new Hadoop2MR1FlowConnector();
Fields sourceFields = new Fields("key", "value", "count");
Tap sourceTap = new Hfs(new TextDelimited(sourceFields), infile);
Tap sinkTapEvery = new Hfs(new TextDelimited(), outfile + "_every");
FlowDef flowDef = new FlowDef();
Pipe pipe = new Pipe("everypipe");
Pipe hashJoin = new HashJoin(pipe, new Fields("value"), 1, new Fields("key1", "value1", "count1", "key2", "value2", "count2"));
flowDef.addSource(pipe, sourceTap);
flowDef.addTailSink(hashJoin, sinkTapEvery);
flowDef.setAssertionLevel(AssertionLevel.STRICT);
flowDef.setDebugLevel(DebugLevel.VERBOSE);
Flow flow = flowConnector.connect(flowDef);
flow.complete();
With an input of
try this 1
try this 2
try this 5
try this 6
try this 7
try this 8
I get an output for the HashJoin split in.2 files.
File 1
try this 1 try this 1
try this 1 try this 2
try this 1 try this 5
try this 1 try this 6
try this 2 try this 1
try this 2 try this 2
try this 2 try this 5
try this 2 try this 6
try this 5 try this 1
try this 5 try this 2
try this 5 try this 5
try this 5 try this 6
try this 6 try this 1
try this 6 try this 2
try this 6 try this 5
try this 6 try this 6
and File 2
try this 7 try this 7
try this 7 try this 8
try this 8 try this 7
try this 8 try this 8
The expectation is that every input row would join but instead there seems to be some partitioning of input occurring (4 tuples in the first one, 2 tuples in the 2nd one) and the join is limited to the partitions.
If the input is fed through 2 seperate pipes and the seperate pipes are joined, the generated join is as expected, i.e. all 6 tuples are joined to generate 36 joined tuples.
Is this the expected behavior of HashJoin?