Custom Comparator doesn't seem to work with LeftJoin

16 views
Skip to first unread message

Chris Schneider

unread,
Jun 1, 2018, 3:16:31 PM6/1/18
to Cascading Users Cascading Users
Hi Gang,

I’d like to apply a custom Comparator to my key fields that will also return 0 for some special cases as I LeftJoin two Pipes in a CoGroup. 

Note: In order to ensure that Hadoop partitions and shuffles my input Tuples properly, my Comparator is also implementing Hasher, and is returning 0 for all values of these key fields. (Happily, the real CoGroup in my application has some other key fields which don’t need this special Comparator, so not every Tuple ends up in the same partition, etc. The minimal test case I’ve attached just uses a single key field, however.)

At least with Cascading 2.7.1, I end up with one of two failure modes (and which one I get depends on some essentially random factor):

1) Despite it being a LeftJoin, sometimes no Tuple comes out of the CoGroup with fields matching a given LHS Tuple.

2) Despite the fact that my Comparator would return 0 for the key values on the LHS and RHS (in either order), I end up with a miss, and the RHS fields in the output Tuple are all null.

I don’t think the behavior of my Hasher.hashCode override has anything to do with it, but I imagine any workaround will have to work with that in place.

Any help would be greatly appreciated!

- Chris

package com.scaleunlimited.cascading;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.junit.Test;

import cascading.CascadingTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowProcess;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.scheme.Scheme;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Hasher;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;

import com.scaleunlimited.cascading.local.LocalPlatform;

@SuppressWarnings("serial")
public class JoinFailureTest extends CascadingTestCase {
    public static final SingleValueOptimizationComparator
        SINGLE_VALUE_OPTIMIZATION_COMPARATOR = 
            new SingleValueOptimizationComparator();

    private static final String WORKING_PATH_NAME = "build/test/JoinFailureTest/";

    @Test
    public void testComparator() throws Throwable {
        assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("all", "us&all"));
        assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us&all", "all"));

        assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us", "us&all"));
        assertEquals(0, SINGLE_VALUE_OPTIMIZATION_COMPARATOR.compare("us&all", "us"));
    }
    
    @Test
    @SuppressWarnings("rawtypes")
    public void testJoinFailure() throws Throwable {
        File workingDir = new File(WORKING_PATH_NAME + "testJoinFailure");
        File lhsDir = new File(workingDir, "lhs");
        File rhsDir = new File(workingDir, "rhs");
        File outputDir = new File(workingDir, "output");
        BasePlatform platform = new LocalPlatform(JoinFailureTest.class);
        
        // LHS has one Tuple with us and one with all
        Fields lhsFields = new Fields("lhs-country", "lhs-index", "lhs-expected-hit-count");
        List<Tuple> lhsTuples = new ArrayList<Tuple>();
        lhsTuples.add(new Tuple("us", "lhs-1", "1"));
        lhsTuples.add(new Tuple("all", "lhs-2", "1"));
        writeTuples(lhsDir, lhsFields, lhsTuples, platform);
        
        // RHS us&all Tuple should match both
        Fields rhsFields = new Fields("rhs-country", "rhs-index");
        List<Tuple> rhsTuples = new ArrayList<Tuple>();
        rhsTuples.add(new Tuple("us&all", "rhs-1"));
        writeTuples(rhsDir, rhsFields, rhsTuples, platform);
        
        // Create and run a flow that CoGroups two pipes with a LeftJoin,
        // using the Comparator to group the keys on both sides.
        Map<String, Tap> sourceTaps = new HashMap<String, Tap>();
        BasePath workingPath = platform.makePath(workingDir.getAbsolutePath());
        
        BasePath lhsPath = platform.makePath(workingPath, "lhs");
        Tap lhsSource = platform.makeTap(   platform.makeBinaryScheme(lhsFields),
                                            lhsPath);
        Pipe lhsPipe = new Pipe("lhs Tuples");
        sourceTaps.put(lhsPipe.getName(), lhsSource);
        
        BasePath rhsPath = platform.makePath(workingPath, "rhs");
        Tap rhsSource = platform.makeTap(   platform.makeBinaryScheme(rhsFields),
                                            rhsPath);
        Pipe rhsPipe = new Pipe("rhs Tuples");
        sourceTaps.put(rhsPipe.getName(), rhsSource);
        
        Fields lhsKeyFields = new Fields("lhs-country");
        lhsKeyFields.setComparator( "lhs-country", 
                                    SINGLE_VALUE_OPTIMIZATION_COMPARATOR);
        Fields rhsKeyFields = new Fields("rhs-country");
        rhsKeyFields.setComparator( "rhs-country", 
                                    SINGLE_VALUE_OPTIMIZATION_COMPARATOR);
        Pipe outputPipe = 
            new CoGroup("output Tuples", 
                        lhsPipe, lhsKeyFields,
                        rhsPipe, rhsKeyFields,
                        new LeftJoin());
        outputPipe = new Each(  outputPipe, 
                                new TupleLogger(outputPipe.getName(), true));
        
        Fields outputFields = lhsFields.append(rhsFields);
        BasePath outputPath = platform.makePath(workingPath, "output");
        Tap outputSink = platform.makeTap(  platform.makeBinaryScheme(outputFields),
                                            outputPath,
                                            SinkMode.REPLACE);
                        
        FlowConnector flowConnector = platform.makeFlowConnector();
        Flow flow = flowConnector.connect(  "Left-joining lhs with rhs",
                                            sourceTaps, 
                                            outputSink,
                                            outputPipe);
        FlowRunner.run(flow);
        
        // Verify that every LHS Tuple makes it through at least once, and that
        // we get the expected number of hits (or just a single miss).
        Map<Integer, Integer> lhsExpectedHitCountMap = 
            new HashMap<Integer, Integer>();
        for (int lhsTupleIndex = 0; lhsTupleIndex < lhsTuples.size(); lhsTupleIndex++) {
            TupleEntry lhsTupleEntry = 
                new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
            String expectedHitCountString = 
                lhsTupleEntry.getString("lhs-expected-hit-count");
            lhsExpectedHitCountMap.put( lhsTupleIndex, 
                                        Integer.parseInt(expectedHitCountString));
        }
        List<Tuple> outputTuples = readTuples(  outputDir, 
                                                outputFields,
                                                platform);
        
        // Note: Sometimes it fails here, depending on various unrelated
        // changes to source code (e.g., which package unit test lives in):
        assertTrue( "at least one LHS input Tuple got stripped out (LeftJoin)",
                    outputTuples.size() >= lhsTuples.size());
        
        for (Tuple outputTuple : outputTuples) {
            TupleEntry outputTupleEntry = 
                new TupleEntry(outputFields, outputTuple);
            int lhsTupleIndex = 0;
            for (Tuple lhsTuple : lhsTuples) {
                TupleEntry lhsTupleEntry = 
                    new TupleEntry(lhsFields, lhsTuple);
                boolean isLhsMatch = true;
                for (Comparable lhsField : lhsFields) {
                    if (!(lhsTupleEntry.getString(lhsField)
                            .equals(outputTupleEntry.getString(lhsField)))) {
                        isLhsMatch = false;
                        break;
                    }
                }
                if (isLhsMatch) {
                    break;
                }
                lhsTupleIndex++;
            }
            assertTrue( (   "Output Tuple doesn't match any LHS Tuple: "
                        +   outputTupleEntry), 
                        lhsTupleIndex < lhsTuples.size());
            int hitCount = lhsExpectedHitCountMap.get(lhsTupleIndex);
            if (hitCount > 0) {
                if (hitCount == 1) {
                    hitCount = 0;
                }
                lhsExpectedHitCountMap.put(lhsTupleIndex, hitCount - 1);
                for (Comparable rhsField : rhsFields) {

                    // Note: Sometimes it fails here, depending on various
                    // unrelated changes to source code (e.g., which package
                    // unit test lives in):
                    assertNotNull(  (   "RHS fields of output Tuple should be non-null (since it should have been a hit): "
                                    +   outputTupleEntry), 
                                    outputTupleEntry.getString(rhsField));
                }
            } else {
                TupleEntry lhsTupleEntry = 
                    new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
                assertEquals(   (   "Too many hits for LHS Tuple: "
                                +   lhsTupleEntry),
                                0, hitCount);
                lhsExpectedHitCountMap.put(lhsTupleIndex, hitCount - 1);
                for (Comparable rhsField : rhsFields) {
                    assertNull( (   "RHS fields of output Tuple should all be null (since it should have been a miss): "
                                +   outputTupleEntry), 
                                outputTupleEntry.getString(rhsField));
                }
            }
        }
        for (int lhsTupleIndex = 0; lhsTupleIndex < lhsTuples.size(); lhsTupleIndex++) {
            TupleEntry lhsTupleEntry = 
                new TupleEntry(lhsFields, lhsTuples.get(lhsTupleIndex));
            int expectedHitCount = 
                Integer.parseInt(lhsTupleEntry.getString("lhs-expected-hit-count"));
            String message = 
                (   (expectedHitCount == 0) ?
                    String.format(  "Expected exactly %d hits for LHS Tuple: %s",
                                    expectedHitCount,
                                    lhsTupleEntry)
                :   String.format(  "Expected exactly 1 miss output Tuple for LHS Tuple: %s",
                                    lhsTupleEntry));
            assertEquals(   message, 
                            -1, 
                            (int)lhsExpectedHitCountMap.get(lhsTupleIndex));
        }
    }
    
    public static class SingleValueOptimizationComparator
        implements Comparator<String>, Hasher<String>, Serializable {
    
        private SingleValueOptimizationComparator() {
            super();
        }
    
        @Override
        public int compare(String thisValue, String thatValue) {
            
            if (thisValue == null) {
                if (thatValue == null) {
                    return 0;
                }
                return -1;
            } else if (thatValue == null) {
                return 1;
            }
            
            if (thisValue.equals(thatValue)) {
                return 0;
            }
            
            // us and all each equal us&all
            if (thisValue.equals("us&all")) {
                if  (   thatValue.equals("us")
                    ||  thatValue.equals("all")) {
                    return 0;
                }
            }
            if (thatValue.equals("us&all")) {
                if  (   thisValue.equals("us")
                    ||  thisValue.equals("all")) {
                    return 0;
                }
            }
            
            return thisValue.compareTo(thatValue);
        }
    
        @Override
        public int hashCode(String value) {
            
            // The transitive property of hash code equality forces virtually
            // all possible country values to share the same hash code
            // (whenever the Tuples in the Pipe have already been optimized to
            // combine the otherwise identical single "specific" Tuple with its
            // "all" aggregate).  For example:
            // 1) us must equal us&all.
            // 2) all must equal us&all.
            // 3) Therefore, us must equal all.
            // 4) Similarly, us must equal gb and every other specific country.
            // Therefore, we just hash every value to 0.  
            // Note that this means such fields can't participate in partitioning
            // the Tuples for the reduce phase.
            // Happily, the grouping key typically includes non-optimized fields
            // (e.g., advertiser) so all Tuples aren't sent to the same reducer.
            return 0;
        }
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static void writeTuples( File targetDir,
                                    Fields outputFields,
                                    List<Tuple> tuples,
                                    BasePlatform testPlatform)
        throws Exception {
        
        BasePath targetPath = testPlatform.makePath(targetDir.toString());
        Tap landingDataTap = 
            testPlatform.makeTap(   testPlatform.makeBinaryScheme(outputFields), 
                                    targetPath, 
                                    SinkMode.REPLACE);
        TupleEntryCollector tupleWriter = null;
        try {
            tupleWriter = 
                landingDataTap.openForWrite(testPlatform.makeFlowProcess());
            for (Tuple tuple : tuples) {
                tupleWriter.add(tuple);
            }
        } finally {
            if (tupleWriter != null) {
                tupleWriter.close();
            }
        }
    }
    
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static List<Tuple> readTuples(   File sourceDir,
                                            Fields inputFields,
                                            BasePlatform testPlatform) 
        throws Exception {
        
        BasePath sourcePath = testPlatform.makePath(sourceDir.toString());
        Scheme sourceScheme = testPlatform.makeBinaryScheme(inputFields);
        Tap sourceTap = testPlatform.makeTap(sourceScheme, sourcePath);
        FlowProcess flowProcess = testPlatform.makeFlowProcess();
        Iterator<TupleEntry> iter = sourceTap.openForRead(flowProcess);
        List<Tuple> result = new ArrayList<Tuple>();
        while (iter.hasNext()) {
            result.add(iter.next().getTuple());
        }
        return result;
    }
}


-----------------------------------------
Chris Schneider
http://www.scaleunlimited.com
custom big data solutions
-----------------------------------------

Chris K Wensel

unread,
Jun 3, 2018, 11:45:39 PM6/3/18
to cascadi...@googlegroups.com
If you can make this a pull request, I can test this much quicker.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/0ED60255-9437-4379-86FD-D7246C41A6C8%40scaleunlimited.com.
For more options, visit https://groups.google.com/d/optout.


Chris Schneider

unread,
Jun 4, 2018, 6:50:36 PM6/4/18
to Cascading Users Cascading Users
Hi Chris,


Hope this demonstrates the problem within the Cascading context, since I can’t figure out how to get gradle to run just this one unit test.

FYI,

- Schmed


For more options, visit https://groups.google.com/d/optout.

Chris K Wensel

unread,
Jun 4, 2018, 9:50:41 PM6/4/18
to cascadi...@googlegroups.com

Chris K Wensel

unread,
Jun 4, 2018, 11:02:27 PM6/4/18
to cascadi...@googlegroups.com
I commented on the pr, but reproduced here:

I do recognize that the nature of CoGroup implies the actual values for lhs and rhs will remain stable and pass-through when the keys are not actually interchangeable. but this is the first time is has been brought up as an issue in 10 years.

the issue here is that MR, Tez, and local modes presume if the comparators state the values are equivalent, the instance values are then interchangeable (which isn't the expectation here). 

custom comparators exist to allow for custom/third-party POJO values to be used as keys and/or allow for custom secondary sorting of values.

bucketing keys into ranges at this level hasn’t really been on the radar since values can be generated from functions allowing for them to be used as join keys. 

it is optimal, performance/memory wise and code simplicity, to assume the keys are interchangeable.

the alternative case of keeping key instance values stable would require a pluggable re-write of the underlying CoGrouping mechanism for each platform.

that said, you might try using a BufferJoin + Buffer to implement the join. this should provide more insights on how the join is fabricated.

ckw


For more options, visit https://groups.google.com/d/optout.

Ken Krugler

unread,
Jun 6, 2018, 6:28:18 PM6/6/18
to cascadi...@googlegroups.com
Hi Chris,

I dug into this a bit more,  using the test case from Schmed.

The root problem is that the comparator he’s using is non-transitive w.r.t. equality.

So you can have “A” == “B”, and “A” == “C”, but “B” != “C”.

In local mode, with how the MemoryCoGroupGate builds the TreeSet of unique keys and also HashMaps of key=>tuple for each pipe, depending on the order of keys you get different (wrong in different ways) results.

In any case, an update to the comparator documentation about this requirement would be helpful. Currently there’s a short blurb at http://docs.cascading.org/cascading/3.2/userguide/ch19-extending-cascading.html that talks about comparators, and there’s the Javadoc for Fields.setComparator, but neither feels exactly right.

— Ken



For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Chris K Wensel

unread,
Jun 6, 2018, 6:46:08 PM6/6/18
to cascadi...@googlegroups.com
sure, happy to refresh the docs on that point, and call out my earlier point the joins necessarily aren’t stable as well.

but note the MemoryCoGroupGate keeps a tree of unique keys per the Comparator, but the Map of Key/Values does not, if memory serves. that that also needs to be _fixed_ or there will be missing tuples.

I believe you will still have problems in MR/Tez, as the (false — in this case) assumption the keys are interchangeable.

this will be a lot of work to clear up, and if cleared up, should only be enabled via a flag. it should also be benchmarked, the fix may not impose a noticeable performance penalty.

ckw


For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages