Read local disk file to filter hdfs pipe data

304 views
Skip to first unread message

chen dong

unread,
Mar 20, 2013, 3:29:36 AM3/20/13
to cascadi...@googlegroups.com
Hi ,

i want to read local disk file, it contains cookie infomation, i use cookie information as a white list to filter the data in hdfs.
So I need use a FileTap and Hfs. My code is below . I get error 'org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties'.

What should I do?


package com.snda.time;

import java.util.Properties;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowDef;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.property.AppProps;
import cascading.scheme.local.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.local.FileTap;
import cascading.tuple.Fields;

public class TimeDev {

    /**
     * @param args
     */
    public static void main(String[] args) {
        String localCookieFilePath = args[0];
        String hdfsKu6VideoLogPath = args[1];
        String outputLogPath = args[2];
        String tmpPath = args[3];
       
        Tap localCookieTap = new FileTap(new cascading.scheme.local.TextDelimited(new Fields("cookie2"), "\t"), localCookieFilePath);
        Tap hdfsKu6VideoLogTap = new Hfs(new cascading.scheme.hadoop.TextLine(), hdfsKu6VideoLogPath);
        Tap localOutputTap = new FileTap(new TextLine(), outputLogPath);
       
        Pipe localPipe = new Pipe("localPipe");
        Pipe hdfsPipe = new Pipe("hdfsPipe");
       
        hdfsPipe = new Each(hdfsPipe, new Fields("line"), new GetKu6VideoInfo(new Fields("cookie", "uuid", "logtime", "action", "vid" )), Fields.ALL);
       
        //filter
        Pipe coPipe = new Pipe("coPipe");
        coPipe = new CoGroup(localPipe, new Fields("cookie2"), hdfsPipe, new Fields("cookie"), new LeftJoin() );
       
        coPipe = new Discard(coPipe, new Fields("cookie2"));
       
        //Group+sort
        coPipe = new GroupBy(coPipe, new Fields("cookie", "uuid"), new Fields("logtime"));
       
        Properties properties = new Properties();
        AppProps.setApplicationJarClass(properties, TimeDev.class);
        properties.put("mapred.job.queue.name", "cug_d_sdo_data");
        properties.put("mapred.fairscheduler.pool", "cug_d_sdo_data");
        properties.put("mapreduce.job.complete.cancel.delegation.tokens", "false");
        properties.put("mapred.reduce.tasks", "300");
        properties.put("cascading.tmp.dir", tmpPath);
       
        FlowDef flowDef = new FlowDef();
        flowDef.setName("TimeDev")
        .addSource(localPipe, localCookieTap)
        .addSource(hdfsPipe, hdfsKu6VideoLogTap)
        .addTailSink(coPipe, localOutputTap);
       
        FlowConnector flowConnector = new HadoopFlowConnector(properties);
        Flow flow = flowConnector.connect(flowDef);
       
        flow.complete();
    }

}



error:

13/03/20 15:16:09 INFO util.HadoopUtil: resolving application jar from found main method on: com.snda.time.TimeDev
13/03/20 15:16:09 INFO planner.HadoopPlanner: using application jar: null
13/03/20 15:16:09 INFO property.AppProps: using app.id: 5D89CBB44DD43C7275EA1BE294E9E004
Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:533)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:237)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:454)
    at com.snda.time.TimeDev.main(TimeDev.java:66)
Caused by: java.lang.ClassCastException: org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties
    at cascading.scheme.local.TextDelimited.sourceConfInit(TextDelimited.java:94)
    at cascading.tap.Tap.sourceConfInit(Tap.java:181)
    at cascading.flow.hadoop.HadoopFlowStep.initFromSources(HadoopFlowStep.java:332)
    at cascading.flow.hadoop.HadoopFlowStep.getInitializedConfig(HadoopFlowStep.java:99)
    at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:201)
    at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:69)
    at cascading.flow.planner.BaseFlowStep.getFlowStepJob(BaseFlowStep.java:680)
    at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1148)
    at cascading.flow.BaseFlow.initialize(BaseFlow.java:198)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:231)
    ... 2 more

Ken Krugler

unread,
Mar 20, 2013, 9:42:57 AM3/20/13
to cascadi...@googlegroups.com
On Mar 20, 2013, at 12:29am, chen dong wrote:

Hi ,

i want to read local disk file, it contains cookie infomation, i use cookie information as a white list to filter the data in hdfs.
So I need use a FileTap and Hfs. My code is below . I get error 'org.apache.hadoop.mapred.JobConf cannot be cast to java.util.Properties'.

You're trying to use a Cascading local mode Scheme (cascading.scheme.local.TextDelimited) with a Hadoop job.

What should I do?

Easiest is to copy the local file into HDFS, and then use cascading.scheme.hadoop.TextDelimited to parse it.

Or you could read the local file into memory (e.g. as a List or Map) if it's not big, and write a custom Function to do the filtering with it.

-- Ken

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





chen dong

unread,
Mar 20, 2013, 10:30:20 PM3/20/13
to cascadi...@googlegroups.com
Okay, i see. Thanks.
I have thought that cascading can read directly from local dist in hadoop mode.

Ken Krugler

unread,
Mar 20, 2013, 11:43:56 PM3/20/13
to cascadi...@googlegroups.com
On Mar 20, 2013, at 7:30pm, chen dong wrote:

Okay, i see. Thanks.
I have thought that cascading can read directly from local dist in hadoop mode.

It can, if you use the right TextDelimited (the one in the cascading.scheme.hadoop package).

I've never tried reading from a local file directly into the right side of a HashJoin - might avoid the issue of your Hadoop job being forced to run locally, which is what normally happens if you mix in a local input file with HDFS (or S3) input files in a workflow.

-- Ken
Reply all
Reply to author
Forward
0 new messages