Cogroup two pipes after aggregator

127 views
Skip to first unread message

tom kern

unread,
Jun 25, 2012, 12:50:03 PM6/25/12
to cascadi...@googlegroups.com
hi,

i am trying to join two pipes, which actually works without problems.
my subassembly looks like this:

public CountPages(Pipe modules, Pipe pages) {
  modules = new GroupBy(modules, ModuleSchema.WEBSITE_ID);
  modules = new Every(modules, new Parser());
  pages = new CoGroup(modules, ModuleSchema.WEBSITE_ID, pages, PagesSchema.WEBSITE_ID, new InnerJoin());
  setTails(pages);
}

everything works fine, I get only the fields from Parser plus all the fields from Pages.
if i add the aggregator, i get an error:

public CountPages(Pipe modules, Pipe pages) {
  modules = new GroupBy(modules, ModuleSchema.WEBSITE_ID);
  modules = new Every(modules, new Parser());
  pages = new GroupBy(pages, PagesSchema.WEBSITE_ID);
  pages = new Every(pages, new Count());
  pages = new CoGroup(modules, ModuleSchema.WEBSITE_ID, pages, PagesSchema.WEBSITE_ID, new InnerJoin());
  setTails(pages);
}

Error: Caused by: cascading.tuple.TupleException: position value is too large: 14, positions in field: 9
ModuleSchema has a total of 11 fields, PagesSchema has 21.

What am I doing wrong?

thanks in advance,
Thomas

Chris K Wensel

unread,
Jun 25, 2012, 1:28:09 PM6/25/12
to cascadi...@googlegroups.com

if you call Flow#writeDOT() you can see the planned fields. same is true for PlannerException#writeDOT()

You can also stuff Debug filters before and after your Pipes to see what's really happening when running in hadoop or cascading local mode.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/DSvUbzLN5U4J.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.


tom kern

unread,
Jun 25, 2012, 3:53:40 PM6/25/12
to cascadi...@googlegroups.com
thanks for the quick reply.

i cant do writeDOT on Flow because it throws the same exception.

i tried the debugger like this right before i join (last statement before i cogroup on the two pipes):
        pages = new Each(pages, new Debug(true));

this has two side effects:
a) i see all my columns and rows and can verify that everything is in order
b) it works. the join goes through without a hitch, i get the output i want.

i haven't changed the code at all, just inserted the each/debug. if i remove it, it still doesn't work.

tom kern

unread,
Jun 25, 2012, 3:58:11 PM6/25/12
to cascadi...@googlegroups.com
actually i have to run Each/Debug on both pipes to make it work.

Chris K Wensel

unread,
Jun 25, 2012, 4:57:30 PM6/25/12
to cascadi...@googlegroups.com
if you can send me a gist of this issue patterned after a similar Cascading test, that would be very helpful.


ckw

On Jun 25, 2012, at 12:58 PM, tom kern wrote:

actually i have to run Each/Debug on both pipes to make it work.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/cEt1jwxzLF4J.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

tom kern

unread,
Jun 26, 2012, 4:31:55 AM6/26/12
to cascadi...@googlegroups.com
your wish is my command
https://gist.github.com/451fc6a0c41f946dbdbf
i hope that is what you wanted

results in:
cascading.flow.planner.PlannerException: could not build flow from assembly: [position value is too large: 11, positions in field: 5]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:502)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:84)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:445)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:421)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:270)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:252)
    at ExtractLinksTest.testCoGroup(ExtractLinksTest.java:42)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:30)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at cascading.test.PlatformRunner.runChild(PlatformRunner.java:164)
    at cascading.test.PlatformRunner.runChild(PlatformRunner.java:44)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:76)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:195)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: cascading.tuple.TupleException: position value is too large: 11, positions in field: 5
    at cascading.tuple.Fields.translatePos(Fields.java:784)
    at cascading.tuple.Fields.translatePos(Fields.java:772)
    at cascading.tuple.Fields.select(Fields.java:880)
    at cascading.pipe.Splice.createJoinFields(Splice.java:1009)
    at cascading.pipe.Splice.outgoingScopeFor(Splice.java:988)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:589)
    at cascading.flow.planner.ElementGraph.resolveFields(ElementGraph.java:571)
    at cascading.flow.local.planner.LocalPlanner.buildFlow(LocalPlanner.java:71)
    ... 42 more

Chris K Wensel

unread,
Jun 26, 2012, 10:10:21 AM6/26/12
to cascadi...@googlegroups.com
sweet!

and this fails against 2.0 right?

will hopefully get to it today.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/Yrz25IzCGsQJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Jun 26, 2012, 10:21:34 AM6/26/12
to cascadi...@googlegroups.com
and this fails against 2.0 right?


what I meant by this are you on 2.0 final, not one of the wips..

tom kern

unread,
Jun 26, 2012, 12:21:26 PM6/26/12
to cascadi...@googlegroups.com
yes. it fails against 2.0.0 and 2.0.1

appreciated

Thomas

Chris K Wensel

unread,
Jun 26, 2012, 3:57:07 PM6/26/12
to cascadi...@googlegroups.com
ok, able to reproduce.. good news its a local mode bug, not a hadoop mode one..

hope to have a fix wip out late tonight or tomorrow.

ckw

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/9YkyFTQ0kAwJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Chris K Wensel

unread,
Jun 26, 2012, 8:01:04 PM6/26/12
to cascadi...@googlegroups.com
looks like I got a fix, should show in wip-320.

thanks for finding this.

cheers,
chris 

tom kern

unread,
Jun 27, 2012, 3:38:38 AM6/27/12
to cascadi...@googlegroups.com
just tried it with wip-321 and the exception is gone.

thanks for the quick fix,
thomas

Chris K Wensel

unread,
Jun 27, 2012, 11:57:08 AM6/27/12
to cascadi...@googlegroups.com
great news. thanks for letting me know.

chris

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/qA8uAzo_M6QJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
Reply all
Reply to author
Forward
0 new messages