Join / CoGroup examples?

1,080 views
Skip to first unread message

Stuart Sierra

unread,
Jun 26, 2008, 2:52:09 PM6/26/08
to cascading-user
Hello all,

Does anyone have code examples of doing joins with CoGroup in
Cascading? I've gotten this far:

Pipe pipe1 = new Pipe("pipe1");
pipe1 = new GroupBy(pipe1, new Fields("join_field"));

Pipe pipe2 = new Pipe("pipe2");
pipe2 = new GroupBy(pipe2, new Fields("join_field"));

Pipe grouping = new Pipe("grouping pipe");
grouping = new CoGroup(pipe1, new Fields("join_field"), pipe2, new
Fields("join_field"), new Fields("data1", "data2"));

Tap source1 = new Hfs(new SequenceFile(new Fields("join_field",
"data1")), sourcePath1);
Tap source2 = new Hfs(new SequenceFile(new Fields("join_field",
"data2")), sourcePath2);
Map<String, Tap> sources = Cascades.tapsMap(new String[]{"pipe1",
"pipe2"}, Tap.taps(source1, source2));

Tap sink = new Hfs(new TextLine(new Fields("data1", "data2")),
outputPath);

Flow flow = new FlowConnector().connect(sources, sink, grouping);

But I'm still getting errors in the final reduce step:
"cascading.tuple.TupleException: field declaration" ... "does not
match tuple".

Thanks,
-Stuart

Chris K Wensel

unread,
Jun 26, 2008, 5:30:12 PM6/26/08
to cascadi...@googlegroups.com
CoGroup is a little weird. the final Fields declaration declares all
the column names in case the two input streams have names that collide
("join_field").

so it might need to be
"join_field1", "join_field2", "data1", "data2"

but it's weird you didn't get an different error on that.

also, you don't need to sort before the join, it will happen during
the join for you (unless you left some stuff out).

maybe you can send me directly the exception and some relevant code
and I might be able to spot something.

ckw

--
Chris K Wensel
ch...@wensel.net
http://chris.wensel.net/
http://www.cascading.org/


Chris K Wensel

unread,
Jun 26, 2008, 5:44:00 PM6/26/08
to cascadi...@googlegroups.com
and to actually answer your question. here is some code from the unit
tests..
FieldedPipesTest.testCoGroupAfterEvery
http://code.google.com/p/cascading/source/browse/trunk/cascading/src/test/cascading/FieldedPipesTest.java

Tap sourceLower = new Hfs( new TextLine( new Fields( "offset",
"line" ) ), inputFileLower );
Tap sourceUpper = new Hfs( new TextLine( new Fields( "offset",
"line" ) ), inputFileUpper );

Map sources = new HashMap();

sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );

Function splitter = new RegexSplitter( new Fields( "num",
"char" ), " " );

// using null pos so all fields are written
Tap sink = new Hfs( new TextLine(), outputPath + "/complex/
cogroup/", true );

Pipe pipeLower = new Each( new Pipe( "lower" ), new
Fields( "line" ), splitter );
pipeLower = new GroupBy( pipeLower, new Fields( "num" ) );
pipeLower = new Every( pipeLower, new Fields( "char" ), new
First(), Fields.ALL );

Pipe pipeUpper = new Each( new Pipe( "upper" ), new
Fields( "line" ), splitter );
pipeUpper = new GroupBy( pipeUpper, new Fields( "num" ) );
pipeUpper = new Every( pipeUpper, new Fields( "char" ), new
First(), Fields.ALL );

Pipe splice = new CoGroup( pipeLower, new Fields( "num" ),
pipeUpper, new Fields( "num" ), Fields.size( 4 ) );

Stuart Sierra

unread,
Jun 27, 2008, 11:48:42 AM6/27/08
to cascading-user
Thanks, Chris. I see to have it working now. I think my confusion
stemmed from the dual use of Fields -- I wasn't sure when I was
writing a declaration and when I writing a selector. For the benefit
of others, here's what I've learned. I'm going to write this out very
explicitly, so please correct me if I'm wrong:

1. The Fields argument to a Scheme, Function, or Aggregator is a
declaration only; it names tuple entries but does not modify them.

2. The *first* Fields argument to an Each or Every is a selector that
modifies tuples before they are passed to the function/filter/
aggregator.

3. The *second* Fields argument to an Each or Every is a selector that
modifies tuples just before they go to the next Pipe. It may include
fields that were not passed to the function/filter/aggregator.

4. The first two fields arguments to a CoGroup tell which fields to
group by, but do not modify the tuples.

5. The *final* Fields argument to a CoGroup is a declaration only. By
default, CoGroup outputs all the fields from all the input tuples.

-Stuart


On Jun 26, 5:44 pm, Chris K Wensel <ch...@wensel.net> wrote:
> and to actually answer your question. here is some code from the unit
> tests..
> FieldedPipesTest.testCoGroupAfterEveryhttp://code.google.com/p/cascading/source/browse/trunk/cascading/src/...

Chris K Wensel

unread,
Jun 27, 2008, 8:39:07 PM6/27/08
to cascadi...@googlegroups.com
I think this is exactly right. Thanks for posting this.

I'll review the field names in the JavaDoc to make sure they are
consistent. I'm sorry if they were confusing.
http://www.cascading.org/javadoc/

chris

Reply all
Reply to author
Forward
0 new messages