Hi ,
i want to read local disk file, it contains cookie infomation, i use cookie information as a white list to filter the data in hdfs.
So I need use a FileTap and Hfs. My code is below . I get error 'org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties'.
What should I do?
package com.snda.time;
import java.util.Properties;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;
public class TimeDev {
/**
* @param args
*/
public static void main(String[] args) {
String localCookieFilePath = args[0];
String hdfsKu6VideoLogPath = args[1];
String outputLogPath = args[2];
String tmpPath = args[3];
Tap localCookieTap = new FileTap(new cascading.scheme.local.TextDelimited(new Fields("cookie2"), "\t"), localCookieFilePath);
Tap hdfsKu6VideoLogTap = new Hfs(new cascading.scheme.hadoop.TextLine(), hdfsKu6VideoLogPath);
Tap localOutputTap = new FileTap(new TextLine(), outputLogPath);
Pipe localPipe = new Pipe("localPipe");
Pipe hdfsPipe = new Pipe("hdfsPipe");
hdfsPipe = new Each(hdfsPipe, new Fields("line"), new GetKu6VideoInfo(new Fields("cookie", "uuid", "logtime", "action", "vid" )), Fields.ALL);
//filter
Pipe coPipe = new Pipe("coPipe");
coPipe = new CoGroup(localPipe, new Fields("cookie2"), hdfsPipe, new Fields("cookie"), new LeftJoin() );
coPipe = new Discard(coPipe, new Fields("cookie2"));
//Group+sort
coPipe = new GroupBy(coPipe, new Fields("cookie", "uuid"), new Fields("logtime"));
Properties properties = new Properties();
AppProps.setApplicationJarClass(properties, TimeDev.class);
properties.put("
mapred.job.queue.name", "cug_d_sdo_data");
properties.put("mapred.fairscheduler.pool", "cug_d_sdo_data");
properties.put("mapreduce.job.complete.cancel.delegation.tokens", "false");
properties.put("mapred.reduce.tasks", "300");
properties.put("cascading.tmp.dir", tmpPath);
FlowDef flowDef = new FlowDef();
flowDef.setName("TimeDev")
.addSource(localPipe, localCookieTap)
.addSource(hdfsPipe, hdfsKu6VideoLogTap)
.addTailSink(coPipe, localOutputTap);
FlowConnector flowConnector = new HadoopFlowConnector(properties);
Flow flow = flowConnector.connect(flowDef);
flow.complete();
}
}
error:
13/03/20 15:16:09 INFO util.HadoopUtil: resolving application jar from found main method on: com.snda.time.TimeDev
13/03/20 15:16:09 INFO planner.HadoopPlanner: using application jar: null
13/03/20 15:16:09 INFO property.AppProps: using
app.id: 5D89CBB44DD43C7275EA1BE294E9E004
Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:533)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:237)
at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
at com.snda.time.TimeDev.main(TimeDev.java:66)
Caused by: java.lang.ClassCastException: org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties
at cascading.scheme.local.TextDelimited.sourceConfInit(TextDelimited.java:94)
at cascading.tap.Tap.sourceConfInit(Tap.java:181)
at cascading.flow.hadoop.HadoopFlowStep.initFromSources(HadoopFlowStep.java:332)
at cascading.flow.hadoop.HadoopFlowStep.getInitializedConfig(HadoopFlowStep.java:99)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:201)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:69)
at cascading.flow.planner.BaseFlowStep.getFlowStepJob(BaseFlowStep.java:680)
at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1148)
at cascading.flow.BaseFlow.initialize(BaseFlow.java:198)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:231)
... 2 more