Converting simple SQL to Cascading operations

18 visninger
Gå til det første ulæste opslag

HIMANSHU VERMA

ulæst,
13. okt. 2016, 11.26.0913.10.2016
til cascading-user
Hi Everyone,
I have a following situation:

Tuple/Table     A with fields X,Y,Z
Tuple/Table     B with fields X,Y,Z

I need to do follow cascading operations which would produce results similar to :

Select * from B

UNION 

(Select * from A where A.X NOT IN (SELECT distinct(B.X) from B))

Thanks in advance

Regards,
Himanshu

Ken Krugler

ulæst,
13. okt. 2016, 14.12.5313.10.2016
til cascadi...@googlegroups.com
On Oct 13, 2016, at 8:26am, HIMANSHU VERMA <superbh...@gmail.com> wrote:

Hi Everyone,
I have a following situation:

Tuple/Table     A with fields X,Y,Z
Tuple/Table     B with fields X,Y,Z

I need to do follow cascading operations which would produce results similar to :

Select * from B

UNION 

(Select * from A where A.X NOT IN (SELECT distinct(B.X) from B))

You could try Lingual, of course.

But to translate into a Cascading Flow…

You want to have every record from B, and every record from A where A.X doesn’t occur anywhere in B.X

A union is like a Merge of two Cascading Pipes, so that part is easy.

A distinct is like a Unique, so no problem there.

Where it gets a bit tricky is the A.X NOT IN part, as Cascading doesn’t have a NotInnerJoin. You could write your own custom joiner, but the easier solution is to do an OuterJoin of A with the unique B using field X, and then filter out any records where the unique B field X isn’t null (and thus it exists).

Here’s some test code that does what you want…

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Test
    public void test() throws Exception {
        File dir = new File("build/test/UnionDistinctTest/test/");
        dir.mkdirs();

        

        // Create some records in table A. One of these is also in B ("bob") so it gets removed, the
        // other two should remain.
        Tap tapA = new FileTap(new TextDelimited(new Fields("X", "Y")), "build/test/UnionDistinctTest/test/inA", SinkMode.REPLACE);
        TupleEntryCollector write = tapA.openForWrite(new LocalFlowProcess());
        write.add(new Tuple("bob", 1));
        write.add(new Tuple("tom", 3));
        write.add(new Tuple("mark", 5));
        write.close();

        

        // Create some records in table B
        Tap tapB = new FileTap(new TextDelimited(new Fields("X", "Y")), "build/test/UnionDistinctTest/test/inB", SinkMode.REPLACE);
        write = tapB.openForWrite(new LocalFlowProcess());
        write.add(new Tuple("bob", 1));
        write.add(new Tuple("bob", 2));
        write.add(new Tuple("john", 3));
        write.add(new Tuple("carol", 5));
        write.close();

        

        Pipe pipeA = new Pipe("pipe for table A");
        Pipe pipeB = new Pipe("pipe from table B");

        

        // Create a pipe with only unique values of field X. We rename it to avoid conflicts with the same
        // field in pipeA during the CoGroup, and discard field Y since we don't need it.
        Pipe uniqueB = new Unique(pipeB, new Fields("X"));
        uniqueB = new Rename(uniqueB, new Fields("X"), new Fields("B_X"));
        uniqueB = new Discard(uniqueB, new Fields("Y"));

        

        Pipe inAnotB = new CoGroup( pipeA, new Fields("X"),
                                    uniqueB, new Fields("B_X"),
                                    null,
                                    new OuterJoin());

        

        // At this point inAnotB will have tuples with B_X set to null for cases where "X" in A doesn't have
        // a corresponding value in B. So those are the ones we want...
        inAnotB = new Each(inAnotB, new Fields("B_X"), new FilterNotNull());
        inAnotB = new Discard(inAnotB, new Fields("B_X"));

        

        // Finally do the union of all records in table B with the subset from table A.
        Pipe result = new Merge(pipeB, inAnotB);

        

        Tap sinkTap = new FileTap(new TextDelimited(), "build/test/UnionDistinctTest/test/out", SinkMode.REPLACE);
        FlowDef flowDef = new FlowDef()
            .addSource(pipeA, tapA)
            .addSource(pipeB, tapB)
            .addTailSink(result, sinkTap);

        

        Flow flow = new LocalFlowConnector().connect(flowDef);
        flow.complete();
    }


— Ken


--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



HIMANSHU VERMA

ulæst,
13. okt. 2016, 14.47.3713.10.2016
til cascading-user
You are awesome Ken. This is exactly what I was looking for.

Thanks a lot

Regards,
Himanshu

HIMANSHU VERMA

ulæst,
13. okt. 2016, 16.05.3313.10.2016
til cascading-user
After some testing and playing around with your test method. I figured out that using LeftJoin() in the CoGroup will look more clean.

        Pipe inAnotB = new CoGroup( pipeA, new Fields("X"),
                                   uniqueB, new Fields("B_X"),
                                   new LeftJoin());


Thanks
Himanshu


On Thursday, 13 October 2016 13:12:53 UTC-5, kkrugler wrote:
Svar alle
Svar til forfatter
Videresend
0 nye opslag