BufferJoin produces null JoinerClosure in following Buffer

24 views
Skip to first unread message

Jordan Halterman (kuujo)

unread,
Apr 27, 2015, 2:14:34 PM4/27/15
to cascadi...@googlegroups.com
Hi,

I'm becoming a big fan of Cascading and an advocate for it at my company. Great work!

But it's only been a few short weeks and I'm still learning the API. I'm trying to build a Buffer for a BufferJoin that calculates the median for some set of values. Tentatively, this is done by joining a LHS pipe of values with a RHS pipe containing a single count of all values. However, I've attempting basically any way I could find to configure this flow to no avail. When I finally got to the run flow with the following code, the JoinerClosure passed in to the Median buffer is null. Here's the flow code:

String inputPath = "src/test/resources/median/input/test.txt";
String outputPath = "src/test/resources/median/output/test.txt";
String expectedPath = "src/test/resources/median/expected/test.txt";

FlowConnector connector = new LocalFlowConnector();

Tap source = new FileTap(new TextLine(new Fields("line", Integer.class)), inputPath, SinkMode.KEEP);
Tap sink = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);

Pipe data = new Pipe("data");

Pipe count = new Pipe("counts", data);
count = new GroupBy(count, Fields.NONE);
count = new Every(count, new Count(), Fields.RESULTS);

Pipe join = new CoGroup(data, Fields.NONE, count, Fields.NONE, new Fields("line", "count"), new BufferJoin());
join = new Every(join, new Median(), Fields.RESULTS);

FlowDef flowDef = FlowDef.flowDef()
  .addSource(data, source)
  .addTailSink(join, sink);

connector.connect(flowDef).complete();

The Median class contains the following operate method:


@Override
public void operate(FlowProcess flowProcess, BufferCall bufferCall) {
  JoinerClosure join = bufferCall.getJoinerClosure();
  if (join.size() != 2)
    throw new OperationException("join size must be 2");

  bufferCall.getOutputCollector().add(new Tuple(canonical.canonical(calculate(join))));
}

When join.size() is called, the following exception is thrown:

cascading.pipe.OperatorException: [data*counts][com.mycompany.cascading.operation.MedianTest.testMedian(MedianTest.java:50)] operator Every failed executing operation: Median[decl:'median']
at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:133)
at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:41)
at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144)
at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:122)
at cascading.flow.stream.Fork.complete(Fork.java:60)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at com.mycompany.cascading.operation.Median.operate(Median.java:42)
at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:125)
... 16 more

I'm certain that I'm doing something noobish and there an easy fix for this. I've tried several different patterns of arguments to both the CoGroup constructor and the Every constructor for the BufferJoin but to no avail. All other arguments seem to fail to even compile to runnable flows. I suspect I'm looking in completely the wrong direction, though.

Jordan

Jordan Halterman (kuujo)

unread,
Apr 27, 2015, 2:34:14 PM4/27/15
to cascadi...@googlegroups.com
Ahh... Alas, I seem to have gotten it to work:

Pipe join = new CoGroup(data, Fields.NONE, count, Fields.NONE, new BufferJoin());
join = new Every(join, new Median(), Fields.RESULTS);

However, I'm not entirely sure what the issue was. Can someone elaborate on why passing Fields in to the declaredFields parameter of CoGroup causes that NPE? Are the restrictions for BufferJoin simply strict like that?

Chris K Wensel

unread,
Apr 28, 2015, 3:19:37 PM4/28/15
to cascadi...@googlegroups.com
it probably shouldn’t. a test case against 2.7 wip would be great.

ckw

-- 
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/6555ce16-47a9-4879-b6bb-0f6b9d228199%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Chris K Wensel




Jordan Halterman

unread,
Apr 28, 2015, 6:19:03 PM4/28/15
to cascadi...@googlegroups.com
I'll give it a shot :-)

Sent from my iPhone
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/K74Tgj0gQoE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
Reply all
Reply to author
Forward
0 new messages