Chris Curtin
unread,Jul 7, 2010, 10:42:51 AM7/7/10Sign in to reply to author
Sign in to forward
You do not have permission to delete messages in this group
Sign in to report message
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to cascading-user
Hi,
I'm trying to use a TextDelimited scheme to output a CSV file using a
TemplateTap. Using a regular Tap works as expected.
Changing from a TextDelimited scheme to TextLine works with the
TemplateTap, so it looks like something is going on with a
TextDelimited scheme and the TemplateTap.
Anyone have an example of using a TextDelimted scheme with a
TemplateTap?
I'm using 1.1.1 and hadoop 0.20.2
Code:
Tap inputTap = new Hfs(new TextLine(), "/testing_input/
templatetest.csv");
Fields groupBy = new Fields(FieldNames.MAILING_ID_NAME,
FieldNames.REPORT_ID_NAME);
Pipe delivPipe = new Each("offline_deliv", new Fields("line"), new
DeliverabilityParser());
delivPipe = new GroupBy(delivPipe, groupBy);
TextDelimited scheme = new TextDelimited(new
Fields(FieldNames.MAILING_ID_NAME, FieldNames.REPORT_ID_NAME,
FieldNames.JOB_ID_NAME,
FieldNames.RECIPIENT_ID_NAME,
FieldNames.ORGANIZATION_ID2_NAME, FieldNames.LIST_ID_NAME,
FieldNames.SENT_TS_NAME,
FieldNames.LAST_MODIFIED_NAME, FieldNames.ACTIVITY_STATUS_NAME,
FieldNames.BOUNCE_STATUS_NAME,
FieldNames.TOP_DOMAIN_NAME, FieldNames.SUPPRESSED_NAME,
FieldNames.EMAIL_NAME), ",");
scheme.setNumSinkParts(1);
Hfs outputTap = new Hfs(scheme, "/testing/");
// Hfs outputTap = new Hfs(new TextLine(1), "/
testing/"); // Works?
Tap byMidTap = new TemplateTap(outputTap, "%s-%s", SinkMode.REPLACE);
Flow flow = m_flowConnector.connect(inputTap, byMidTap, delivPipe);
Cascade job = m_cascadeConnector.connect(flow);
job.complete();
Error:
2010-07-07 10:35:53,565 INFO mapred.TaskRunner
(Task.java:sendDone(737)) - Task 'attempt_local_0001_m_000000_0' done.
2010-07-07 10:35:53,597 INFO mapred.LocalJobRunner
(LocalJobRunner.java:statusUpdate(276)) -
2010-07-07 10:35:53,612 INFO mapred.Merger (Merger.java:merge(390)) -
Merging 1 sorted segments
2010-07-07 10:35:53,628 INFO mapred.Merger (Merger.java:merge(473)) -
Down to the last merge-pass, with 1 segments left of total size:
476954 bytes
2010-07-07 10:35:53,628 INFO mapred.LocalJobRunner
(LocalJobRunner.java:statusUpdate(276)) -
2010-07-07 10:35:53,675 WARN mapred.LocalJobRunner
(LocalJobRunner.java:run(256)) - job_local_0001
cascading.flow.FlowException: internal error: ['1', '99', '99',
'900000', '500', '1', '56', '67', '0', '1', 'a', '0', '1']
at
cascading.flow.stack.SinkReducerStackElement.operateSink(SinkReducerStackElement.java:
146)
at
cascading.flow.stack.SinkReducerStackElement.operateSink(SinkReducerStackElement.java:
99)
at
cascading.flow.stack.SinkReducerStackElement.collect(SinkReducerStackElement.java:
69)
at
cascading.flow.stack.GroupReducerStackElement.operateGroup(GroupReducerStackElement.java:
74)
at
cascading.flow.stack.GroupReducerStackElement.collect(GroupReducerStackElement.java:
58)
at cascading.flow.stack.FlowReducerStack.reduce(FlowReducerStack.java:
169)
at cascading.flow.FlowReducer.reduce(FlowReducer.java:75)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:
463)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:411)
at org.apache.hadoop.mapred.LocalJobRunner
$Job.run(LocalJobRunner.java:216)
Caused by: java.lang.ClassCastException: java.lang.String cannot be
cast to cascading.tuple.Tuple
at cascading.tap.TemplateTap
$TemplateCollector.collect(TemplateTap.java:172)
at cascading.scheme.TextDelimited.sink(TextDelimited.java:557)
at cascading.tap.TemplateTap$TemplateScheme.sink(TemplateTap.java:
256)
at cascading.tap.Tap.sink(Tap.java:251)
at
cascading.flow.stack.SinkReducerStackElement.operateSink(SinkReducerStackElement.java:
124)
... 9 more
2010-07-07 10:35:56,800 WARN flow.FlowStep
(FlowStep.java:logWarn(577)) - [offline_deliv] completion events
count: 0
2010-07-07 10:35:56,847 WARN flow.Flow
(Flow.java:internalStopAllJobs(1020)) - stopping jobs
2010-07-07 10:35:56,847 INFO flow.FlowStep
(FlowStep.java:logInfo(572)) - [offline_deliv] stopping:
(1/1) ...US_NAME', 'TOP_DOMAIN', 'SUPPRESSED', 'EMAIL']]"]["/
testing/"]"]"]["%s-%s"]
2010-07-07 10:35:56,847 WARN flow.Flow
(Flow.java:internalStopAllJobs(1036)) - stopped jobs
2010-07-07 10:35:56,847 WARN flow.Flow
(Flow.java:handleExecutorShutdown(1045)) - shutting down job executor
2010-07-07 10:35:56,847 WARN flow.Flow
(Flow.java:handleExecutorShutdown(1056)) - shutdown complete
2010-07-07 10:35:56,862 WARN cascade.Cascade
(Cascade.java:logWarn(398)) - [offline_deliv] flow failed:
offline_deliv
cascading.flow.FlowException: step failed: (1/1) ...US_NAME',
'TOP_DOMAIN', 'SUPPRESSED', 'EMAIL']]"]["/testing/"]"]"]["%s-%s"]
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
2010-07-07 10:35:56,862 WARN cascade.Cascade
(Cascade.java:logWarn(398)) - [offline_deliv] stopping flows
2010-07-07 10:35:56,862 INFO cascade.Cascade
(Cascade.java:logInfo(388)) - [offline_deliv] stopping flow:
offline_deliv
2010-07-07 10:35:56,862 WARN cascade.Cascade
(Cascade.java:logWarn(398)) - [offline_deliv] stopped flows
2010-07-07 10:35:56,862 WARN cascade.Cascade
(Cascade.java:logWarn(398)) - [offline_deliv] shutting down flow
executor
Exception in thread "main" cascading.cascade.CascadeException: flow
failed: offline_deliv
2010-07-07 10:35:56,862 WARN cascade.Cascade
(Cascade.java:logWarn(398)) - [offline_deliv] shutdown complete
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:508)
at cascading.cascade.Cascade$CascadeJob.call(Cascade.java:449)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
Caused by: cascading.flow.FlowException: step failed:
(1/1) ...US_NAME', 'TOP_DOMAIN', 'SUPPRESSED', 'EMAIL']]"]["/
testing/"]"]"]["%s-%s"]
at cascading.flow.FlowStepJob.blockOnJob(FlowStepJob.java:173)
at cascading.flow.FlowStepJob.start(FlowStepJob.java:138)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:127)
at cascading.flow.FlowStepJob.call(FlowStepJob.java:39)
... 5 more
Thanks,
Chris