String inputPath = "src/test/resources/median/input/test.txt";String outputPath = "src/test/resources/median/output/test.txt";String expectedPath = "src/test/resources/median/expected/test.txt";
FlowConnector connector = new LocalFlowConnector();
Tap source = new FileTap(new TextLine(new Fields("line", Integer.class)), inputPath, SinkMode.KEEP);Tap sink = new FileTap(new TextLine(), outputPath, SinkMode.REPLACE);
Pipe data = new Pipe("data");
Pipe count = new Pipe("counts", data);count = new GroupBy(count, Fields.NONE);count = new Every(count, new Count(), Fields.RESULTS);
Pipe join = new CoGroup(data, Fields.NONE, count, Fields.NONE, new Fields("line", "count"), new BufferJoin());join = new Every(join, new Median(), Fields.RESULTS);
FlowDef flowDef = FlowDef.flowDef() .addSource(data, source) .addTailSink(join, sink);
connector.connect(flowDef).complete();
@Overridepublic void operate(FlowProcess flowProcess, BufferCall bufferCall) { JoinerClosure join = bufferCall.getJoinerClosure(); if (join.size() != 2) throw new OperationException("join size must be 2");
bufferCall.getOutputCollector().add(new Tuple(canonical.canonical(calculate(join))));}
cascading.pipe.OperatorException: [data*counts][com.mycompany.cascading.operation.MedianTest.testMedian(MedianTest.java:50)] operator Every failed executing operation: Median[decl:'median'] at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:133) at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:41) at cascading.flow.stream.MemoryCoGroupGate.push(MemoryCoGroupGate.java:144) at cascading.flow.stream.MemoryCoGroupGate.complete(MemoryCoGroupGate.java:123) at cascading.flow.stream.Duct.complete(Duct.java:81) at cascading.flow.stream.Duct.complete(Duct.java:81) at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296) at cascading.flow.stream.Duct.complete(Duct.java:81) at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:122) at cascading.flow.stream.Fork.complete(Fork.java:60) at cascading.flow.stream.SourceStage.map(SourceStage.java:105) at cascading.flow.stream.SourceStage.call(SourceStage.java:53) at cascading.flow.stream.SourceStage.call(SourceStage.java:38) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: java.lang.NullPointerException at com.mycompany.cascading.operation.Median.operate(Median.java:42) at cascading.flow.stream.BufferEveryWindow.receive(BufferEveryWindow.java:125) ... 16 more
Pipe join = new CoGroup(data, Fields.NONE, count, Fields.NONE, new BufferJoin());
join = new Every(join, new Median(), Fields.RESULTS);
--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/6555ce16-47a9-4879-b6bb-0f6b9d228199%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/K74Tgj0gQoE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/F266B99B-8F1F-4EEA-9352-D476B1AC3473%40wensel.net.