I’d like to apply a custom Comparator to my key fields that will also return 0 for some special cases as I LeftJoin two Pipes in a CoGroup.
Note: In order to ensure that Hadoop partitions and shuffles my input Tuples properly, my Comparator is also implementing Hasher, and is returning 0 for all values of these key fields. (Happily, the real CoGroup in my application has some other key fields which don’t need this special Comparator, so not every Tuple ends up in the same partition, etc. The minimal test case I’ve attached just uses a single key field, however.)
1) Despite it being a LeftJoin, sometimes no Tuple comes out of the CoGroup with fields matching a given LHS Tuple.
2) Despite the fact that my Comparator would return 0 for the key values on the LHS and RHS (in either order), I end up with a miss, and the RHS fields in the output Tuple are all null.
I don’t think the behavior of my Hasher.hashCode override has anything to do with it, but I imagine any workaround will have to work with that in place.
package com.scaleunlimited.cascading;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import cascading.CascadingTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.scheme.Scheme;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import com.scaleunlimited.cascading.local.LocalPlatform;
@SuppressWarnings("serial")
public class JoinFailureTest extends CascadingTestCase {
public static final SingleValueOptimizationComparator
SINGLE_VALUE_OPTIMIZATION_COMPARATOR =
new SingleValueOptimizationComparator();
private static final String WORKING_PATH_NAME = "build/test/JoinFailureTest/";
@Test
public void testComparator() throws Throwable {
assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("all", "us&all"));
assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us&all", "all"));
assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us", "us&all"));
assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us&all", "us"));
}
@Test
@SuppressWarnings("rawtypes")
public void testJoinFailure() throws Throwable {
File workingDir = new File(WORKING_PATH_NAME + "testJoinFailure");
File lhsDir = new File(workingDir, "lhs");
File rhsDir = new File(workingDir, "rhs");
File outputDir = new File(workingDir, "output");
BasePlatform platform = new LocalPlatform(JoinFailureTest.class);
// LHS has one Tuple with us and one with all
Fields lhsFields = new Fields("lhs-country", "lhs-index", "lhs-expected-hit-count");
List<Tuple> lhsTuples = new ArrayList<Tuple>();
lhsTuples.add(new Tuple("us", "lhs-1", "1"));
lhsTuples.add(new Tuple("all", "lhs-2", "1"));
writeTuples(lhsDir, lhsFields, lhsTuples, platform);
// RHS us&all Tuple should match both
Fields rhsFields = new Fields("rhs-country", "rhs-index");
List<Tuple> rhsTuples = new ArrayList<Tuple>();
rhsTuples.add(new Tuple("us&all", "rhs-1"));
writeTuples(rhsDir, rhsFields, rhsTuples, platform);
// Create and run a flow that CoGroups two pipes with a LeftJoin,
// using the Comparator to group the keys on both sides.
Map<String, Tap> sourceTaps = new HashMap<String, Tap>();
BasePath workingPath = platform.makePath(workingDir.getAbsolutePath());
BasePath lhsPath = platform.makePath(workingPath, "lhs");
Tap lhsSource = platform.makeTap( platform.makeBinaryScheme(lhsFields),
lhsPath);
Pipe lhsPipe = new Pipe("lhs Tuples");
sourceTaps.put(lhsPipe.getName(), lhsSource);
BasePath rhsPath = platform.makePath(workingPath, "rhs");
Tap rhsSource = platform.makeTap( platform.makeBinaryScheme(rhsFields),
rhsPath);
Pipe rhsPipe = new Pipe("rhs Tuples");
sourceTaps.put(rhsPipe.getName(), rhsSource);
Fields lhsKeyFields = new Fields("lhs-country");
lhsKeyFields.setComparator( "lhs-country",
SINGLE_VALUE_OPTIMIZATION_COMPARATOR);
Fields rhsKeyFields = new Fields("rhs-country");
rhsKeyFields.setComparator( "rhs-country",
SINGLE_VALUE_OPTIMIZATION_COMPARATOR);
Pipe outputPipe =
new CoGroup("output Tuples",
lhsPipe, lhsKeyFields,
rhsPipe, rhsKeyFields,
new LeftJoin());
outputPipe = new Each( outputPipe,
new TupleLogger(outputPipe.getName(), true));
Fields outputFields = lhsFields.append(rhsFields);
BasePath outputPath = platform.makePath(workingPath, "output");
Tap outputSink = platform.makeTap( platform.makeBinaryScheme(outputFields),
outputPath,
SinkMode.REPLACE);
FlowConnector flowConnector = platform.makeFlowConnector();
Flow flow = flowConnector.connect( "Left-joining lhs with rhs",
sourceTaps,
outputSink,
outputPipe);
FlowRunner.run(flow);
// Verify that every LHS Tuple makes it through at least once, and that
// we get the expected number of hits (or just a single miss).
Map<Integer, Integer> lhsExpectedHitCountMap =
new HashMap<Integer, Integer>();
for (int lhsTupleIndex = 0; lhsTupleIndex < lhsTuples.size(); lhsTupleIndex++) {
TupleEntry lhsTupleEntry =
new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
String expectedHitCountString =
lhsTupleEntry.getString("lhs-expected-hit-count");
lhsExpectedHitCountMap.put( lhsTupleIndex,
Integer.parseInt(expectedHitCountString));
}
List<Tuple> outputTuples = readTuples( outputDir,
outputFields,
platform);
// Note: Sometimes it fails here, depending on various unrelated
// changes to source code (e.g., which package unit test lives in):
assertTrue( "at least one LHS input Tuple got stripped out (LeftJoin)",
outputTuples.size() >= lhsTuples.size());
for (Tuple outputTuple : outputTuples) {
TupleEntry outputTupleEntry =
new TupleEntry(outputFields, outputTuple);
int lhsTupleIndex = 0;
for (Tuple lhsTuple : lhsTuples) {
TupleEntry lhsTupleEntry =
new TupleEntry(lhsFields, lhsTuple);
boolean isLhsMatch = true;
for (Comparable lhsField : lhsFields) {
if (!(lhsTupleEntry.getString(lhsField)
.equals(outputTupleEntry.getString(lhsField)))) {
isLhsMatch = false;
break;
}
}
if (isLhsMatch) {
break;
}
lhsTupleIndex++;
}
assertTrue( ( "Output Tuple doesn't match any LHS Tuple: "
+ outputTupleEntry),
lhsTupleIndex < lhsTuples.size());
int hitCount = lhsExpectedHitCountMap.get(lhsTupleIndex);
if (hitCount > 0) {
if (hitCount == 1) {
hitCount = 0;
}
lhsExpectedHitCountMap.put(lhsTupleIndex, hitCount - 1);
for (Comparable rhsField : rhsFields) {
// Note: Sometimes it fails here, depending on various
// unrelated changes to source code (e.g., which package
// unit test lives in):
assertNotNull( ( "RHS fields of output Tuple should be non-null (since it should have been a hit): "
+ outputTupleEntry),
outputTupleEntry.getString(rhsField));
}
} else {
TupleEntry lhsTupleEntry =
new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
assertEquals( ( "Too many hits for LHS Tuple: "
+ lhsTupleEntry),
0, hitCount);
lhsExpectedHitCountMap.put(lhsTupleIndex, hitCount - 1);
for (Comparable rhsField : rhsFields) {
assertNull( ( "RHS fields of output Tuple should all be null (since it should have been a miss): "
+ outputTupleEntry),
outputTupleEntry.getString(rhsField));
}
}
}
for (int lhsTupleIndex = 0; lhsTupleIndex < lhsTuples.size(); lhsTupleIndex++) {
TupleEntry lhsTupleEntry =
new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
int expectedHitCount =
Integer.parseInt(lhsTupleEntry.getString("lhs-expected-hit-count"));
String message =
( (expectedHitCount == 0) ?
String.format( "Expected exactly %d hits for LHS Tuple: %s",
expectedHitCount,
lhsTupleEntry)
: String.format( "Expected exactly 1 miss output Tuple for LHS Tuple: %s",
lhsTupleEntry));
assertEquals( message,
-1,
(int)lhsExpectedHitCountMap.get(lhsTupleIndex));
}
}
public static class SingleValueOptimizationComparator
implements Comparator<String>, Hasher<String>, Serializable {
private SingleValueOptimizationComparator() {
super();
}
@Override
public int compare(String thisValue, String thatValue) {
if (thisValue == null) {
if (thatValue == null) {
return 0;
}
return -1;
} else if (thatValue == null) {
return 1;
}
if (thisValue.equals(thatValue)) {
return 0;
}
// us and all each equal us&all
if (thisValue.equals("us&all")) {
if ( thatValue.equals("us")
|| thatValue.equals("all")) {
return 0;
}
}
if (thatValue.equals("us&all")) {
if ( thisValue.equals("us")
|| thisValue.equals("all")) {
return 0;
}
}
return thisValue.compareTo(thatValue);
}
@Override
public int hashCode(String value) {
// The transitive property of hash code equality forces virtually
// all possible country values to share the same hash code
// (whenever the Tuples in the Pipe have already been optimized to
// combine the otherwise identical single "specific" Tuple with its
// "all" aggregate). For example:
// 1) us must equal us&all.
// 2) all must equal us&all.
// 3) Therefore, us must equal all.
// 4) Similarly, us must equal gb and every other specific country.
// Therefore, we just hash every value to 0.
// Note that this means such fields can't participate in partitioning
// the Tuples for the reduce phase.
// Happily, the grouping key typically includes non-optimized fields
// (e.g., advertiser) so all Tuples aren't sent to the same reducer.
return 0;
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static void writeTuples( File targetDir,
Fields outputFields,
List<Tuple> tuples,
BasePlatform testPlatform)
throws Exception {
BasePath targetPath = testPlatform.makePath(targetDir.toString());
Tap landingDataTap =
testPlatform.makeTap( testPlatform.makeBinaryScheme(outputFields),
targetPath,
SinkMode.REPLACE);
TupleEntryCollector tupleWriter = null;
try {
tupleWriter =
landingDataTap.openForWrite(testPlatform.makeFlowProcess());
for (Tuple tuple : tuples) {
tupleWriter.add(tuple);
}
} finally {
if (tupleWriter != null) {
tupleWriter.close();
}
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List<Tuple> readTuples( File sourceDir,
Fields inputFields,
BasePlatform testPlatform)
throws Exception {
BasePath sourcePath = testPlatform.makePath(sourceDir.toString());
Scheme sourceScheme = testPlatform.makeBinaryScheme(inputFields);
Tap sourceTap = testPlatform.makeTap(sourceScheme, sourcePath);
FlowProcess flowProcess = testPlatform.makeFlowProcess();
Iterator<TupleEntry> iter = sourceTap.openForRead(flowProcess);
List<Tuple> result = new ArrayList<Tuple>();
while (iter.hasNext()) {
result.add(iter.next().getTuple());
}
return result;
}
}