CoGroup with OuterJoin issue

36 views
Skip to first unread message

shree

unread,
Aug 28, 2015, 12:06:08 PM8/28/15
to cascading-user
Hi All,

I am trying to CoGroup two pipes with different strucuture with one common fied

Pipe1 --> "id" , "firstname" ,"lastname"
Pipe2 --->"id" , "firstname" ,"brthyear"

Expected Output Structure is --> "id"  "firstname" "lastname" "birthyear"



When i do my coGroup i am getting output strucure after renaming pipe2 fields --> "id"  "firstname" "lastname"   id1 ,  "firstname1" ,"birthdate"

I would like to get a response like -> "id"  "firstname" "lastname"  "birthyear"

Ex: 
Pipe1->  Input Data

123121234,firstname1,lastname1
019764534,firstname2,lastname2


Pipe2 --: Input data

3333,firstname3,2015
44444,firstname4,2067



I am getting getting output with OuterJoin

['123121234', 'firstname1', 'lastname1', null, null, null]
['019764534', 'firstname2', 'lastname2', null, null, null]
[null, null, null, '3333', 'firstname3', '2015']
[null, null, null, '4444', 'firstname4', '2067']


How to get an Output like below

123121234,firstname1,lastname1,null
123121234,firstname2,lastname2,null
3333,firstname3,null,2015
4444,firstname4,null,2067

Thank You


Ken Krugler

unread,
Aug 28, 2015, 4:59:04 PM8/28/15
to cascadi...@googlegroups.com
You say that there's one common field, but aren't both "id" and "firstname" the same fields across both pipes?

E.g. what would expect to output if you had (1, "Ken", "Krugler") in pipe1, and (1, "Kenneth", "2015") in pipe2?

-- Ken


From: shree

Sent: August 28, 2015 9:06:08am PDT

To: cascading-user

Subject: CoGroup with OuterJoin issue



--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





shree

unread,
Aug 31, 2015, 9:16:32 AM8/31/15
to cascading-user
Hi Ken,

I am expecting below if you had (1, "Ken", "Krugler") in pipe1, and (1, "Ken", "2015")   (2, "Kenneth", "2016")in pipe2

(1, "Ken", "Krugler" ,"2015") 
(2, "Kenneth", "" ,"2016") 


Thank You
Srikanth M

Ken Krugler

unread,
Aug 31, 2015, 9:34:31 AM8/31/15
to cascadi...@googlegroups.com
Hi Srikanth,

Note that in my example, ID is 1 in both of my pipes - so asking whether the join is on just ID, or ID + first name.

And if just joining by ID, then how would you handle different first name values.

-- Ken


From: shree

Sent: August 31, 2015 6:16:32am PDT

To: cascading-user

Subject: Re: CoGroup with OuterJoin issue


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/4cb70c18-244b-4823-b7de-2ca260e13d19%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





shree

unread,
Aug 31, 2015, 9:59:01 AM8/31/15
to cascading-user
Hi Ken,

I am looking for outerjoin with different ids or same but  i need to add the values from pipe2 .  I have a pipe with (1, "Ken", "Krugler")  (3, "Tom", "Krugler") in pipe1, and (1, "Ken", "2015")   (2, "Kenneth", "2016")in pipe2.
If i do coGroup  with outerjoin i will total 6 output fields.

so here i am expectng below output with four columns id,firstname,lastname,doy. 

(1, "Ken", "Krugler" ,"2015")  
(3, "Tom", "Krugler", null) 
 (2, "Kenneth",null , "2016")i

Ken Krugler

unread,
Aug 31, 2015, 12:51:58 PM8/31/15
to cascadi...@googlegroups.com
One more time (note IDs are the same)...

what would expect to output if you had (1, "Ken", "Krugler") in pipe1, and (1, "Kenneth", "2015") in pipe2?

Thanks,

-- Ken


From: shree

Sent: August 31, 2015 6:59:00am PDT

shree

unread,
Aug 31, 2015, 1:35:34 PM8/31/15
to cascading-user
i dont have different firstname wth same IDs as per your recent comment. If i have that situation then it would be (1, "Ken", "Krugler" , "2015") in  result

But suppose if you  a pipe with (1, "Ken", "Krugler")  (3, "Tom", "somethig") in pipe1, and (1, "Ken", "2015")   (2, "Kenneth", "2016")in pipe2.
If i do coGroup  with outerjoin i will get total 6 columns in output and my common field is "ID"

 (1, "Ken", "Krugler" ,1, "Ken", "2015")
 (null ,null ,null, 2, "Kenneth","2016")
(3, "Tom", "something", null, null ,null


so here i am expectng below output with four columns id,firstname,lastname,doy.  wherever we dont have value for the column it should populate with null.

 (1, "Ken", "Krugler" , "2015")
 (2, "Kenneth",null , "2016")
(3, "Tom", "Krugler", null



Ken Krugler

unread,
Aug 31, 2015, 2:04:34 PM8/31/15
to cascadi...@googlegroups.com
If firstname is always fixed for a given ID, then it shouldn't be in one of the two pipes, as it has no value.

So you'd use Discard() to get rid of it, if you couldn't prune that out in advance.

Which means something like this:

pipe2 = new Discard(pipe2, new Fields("firstname"));
pipe2 = new Rename(pipe2, new Fields("id"), new Fields("id2"));

Pipe merged = CoGroup("merged", pipe1, new Fields("id"), pipe2, new Fields("id2"), null, new OuterJoin());
merged = new Discard(merged, new Fields("id2");

This should leave you with fields "id", "firstname", "lastname", "birthdate", with null values where the join doesn't have that data.

-- Ken


From: shree

Sent: August 31, 2015 10:35:34am PDT

shree

unread,
Aug 31, 2015, 4:16:28 PM8/31/15
to cascading-user
Thank You for your reply Ken, But here in this case we are loosing the pipe2 firstname fields (i.e Kenneth)  with the first discard and  i need that values in final output

Ken Stmt ---> pipe2 = new Discard(pipe2, new Fields("firstname"));   // Here we are loosing the Kenneth from Pipe2 but i need in output



 I am getting output like below with CoGroup

 (1, "Ken", "Krugler" ,1, "Ken", "2015")
 (null ,null ,null, 2, "Kenneth","2016")
(3, "Tom", "something", null, null ,null

I need to get like below 

 (1, "Ken", "Krugler" , "2015")
 (2, "Kenneth",null , "2016")
(3, "Tom", "Krugler", null

Ken Krugler

unread,
Aug 31, 2015, 5:26:28 PM8/31/15
to cascadi...@googlegroups.com


From: shree

Sent: August 31, 2015 1:16:28pm PDT

To: cascading-user

Subject: Re: CoGroup with OuterJoin issue


Thank You for your reply Ken, But here in this case we are loosing the pipe2 firstname fields (i.e Kenneth)  with the first discard and  i need that values in final output

Just to be explicit - you only need the firstname field from pipe2 if this is associated with an ID that doesn't exist in pipe1, correct?

So in that case you'd also rename firstname in pipe2 to something like "firstname2", 

pipe2 = new Rename(pipe2, new Fields("id", "firstname"), new Fields("id2", "firstname2"));

and probably rename firstname in pipe1:

pipe1 = new Rename(pipe1, new Fields("firstname"), new Fields("firstname1"));

Then do the CoGroup as per my email

Pipe merged = CoGroup("merged", pipe1, new Fields("id"), pipe2, new Fields("id2"), null, new OuterJoin());

and then use ExpressionFunction to merge "firstname1" and "firstname2" into a "firstname" field, e.g.

merged = new Each(merged, new Fields("firstname1", "firstname2"), new ExpressionFunction(new Fields("firstname"), "$0 != null ? $0 : $1", String.class), Fields.SWAP);

And then do the Discard to get rid of the "id2" field:

merged = new Discard(merged, new Fields("id2"));

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.

shree

unread,
Aug 31, 2015, 6:02:21 PM8/31/15
to cascading-user
Thank you Ken, But here i need both firstname and DateOfYear from pipe2. So my output be like below. But with expression filter it is adding new field at the end with given new Field value in expression.

 (1, "Ken", "Krugler" , "2015")
 (2, "Kenneth",null , "2016")
 (3, "Tom", "Krugler", null



Ken Krugler

unread,
Aug 31, 2015, 6:48:08 PM8/31/15
to cascadi...@googlegroups.com


From: shree

Sent: August 31, 2015 3:02:20pm PDT

To: cascading-user

Subject: Re: CoGroup with OuterJoin issue


Thank you Ken, But here i need both firstname and DateOfYear from pipe2. So my output be like below. But with expression filter it is adding new field at the end with given new Field value in expression.

I'm not sure what you mean by "adding new field at the end with given new Field value in expression".

You shouldn't be relying on absolute positions, but rather field names when determining the position of values.

If you must have things in a specific order after an operation might have re-ordered them, then use the Identity() function, e.g.

        merged = new Each(merged, new Fields("id", "firstname", "lastname", "birthdate"), new Identity());

But just to wrap this up, here's code that I think generates what you want…

        Tap tap1 = new InMemoryTap(new Fields("id", "firstname""lastname"));
        TupleEntryCollector writer1 = tap1.openForWrite(new LocalFlowProcess());
        writer1.add(new Tuple("1", "Ken", "Krugler"));
        writer1.add(new Tuple("3", "Tom", "Something"));
        writer1.close();

        

        Tap tap2 = new InMemoryTap(new Fields("id", "firstname""birthdate"));
        TupleEntryCollector writer2 = tap2.openForWrite(new LocalFlowProcess());
        writer2.add(new Tuple("1", "Ken", "2015"));
        writer2.add(new Tuple("2", "Kenneth", "2016"));
        writer2.close();

        

        Pipe pipe1 = new Pipe("pipe1");
        pipe1 = new Rename(pipe1, new Fields("id", "firstname"), new Fields("id1", "firstname1"));

        Pipe pipe2 = new Pipe("pipe2");
        pipe2 = new Rename(pipe2, new Fields("id", "firstname"), new Fields("id2", "firstname2"));

        Pipe merged = new CoGroup("merged", pipe1, new Fields("id1"), pipe2, new Fields("id2"), null, new OuterJoin());

        

        Function function1 = new ExpressionFunction(new Fields("firstname"), "$0 != null ? $0 : $1", String.class);
        merged = new Each(merged, new Fields("firstname1", "firstname2"), function1, Fields.SWAP);

        

        Function function2 = new ExpressionFunction(new Fields("id"), "$0 != null ? $0 : $1", String.class);
        merged = new Each(merged, new Fields("id1", "id2"), function2, Fields.SWAP);
        merged = new Each(merged, new Fields("id", "firstname", "lastname", "birthdate"), new Identity());

        

        merged = new Each(merged, new Debug(true));

        

        FlowDef flowDef = new FlowDef()
            .addSource(pipe1, tap1)
            .addSource(pipe2, tap2)
            .addTailSink(merged, new NullSinkTap());

        

        new LocalFlowConnector().connect(flowDef).complete();

The output is:

['id', 'firstname', 'lastname', 'birthdate']
['1', 'Ken', 'Krugler', '2015']
['2', 'Kenneth', null, '2016']
['3', 'Tom', 'Something', null]

Note that InMemoryTap and NullSinkTap are classes in cascading.utils.

-- Ken

shree

unread,
Sep 1, 2015, 9:45:39 AM9/1/15
to cascading-user
Thank you a lot Ken, It works as expected with identity.
Reply all
Reply to author
Forward
0 new messages