distributed cache access in Cascading?

1,326 views
Skip to first unread message

Chris Curtin

unread,
May 27, 2009, 11:22:20 AM5/27/09
to cascading-user
Hi,

How would I access a distributed cache file in Cascading? In
particular inside a Buffer and/or Aggregator derived class?

I see examples in 'straight' hadoop but I'm not sure what functions to
overload in Cascading to do the initial load into my classes for
access later.

Thanks,

Chris

Chris K Wensel

unread,
May 27, 2009, 12:06:01 PM5/27/09
to cascadi...@googlegroups.com
Hey Chris

I've never done this, but I expect if you add the proper Hadoop
property to the Properties object you hand the FlowConnector, every
subsequent job will have the file pushed out.

From there you can just open the files directly, like any other
Hadoop app.

ckw
--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Chris Curtin

unread,
May 27, 2009, 12:38:18 PM5/27/09
to cascading-user
Hi Chris,

A little more detail: here is the example code I'm looking at:
http://developer.yahoo.com/hadoop/tutorial/module5.html#auxdata

They access the cache in the mapper:: configure method which I think
means it only gets called once per mapper. I need the data in a Buffer
or Aggregator, but the same load logic should apply. In Cascading if I
need the data in a Buffer or Aggregator where do I load it so I don't
load the file for each grouping?

Thanks,

Chris

Chris K Wensel

unread,
May 27, 2009, 12:57:56 PM5/27/09
to cascadi...@googlegroups.com
Use the #prepare() method. It's available on all the Operations. It is
called once during the Mapper/Reducer #configure() call.

Use #cleanup() to undo anything #prepare() did, if necessary.

ckw

Chris Curtin

unread,
Jun 1, 2009, 3:50:30 PM6/1/09
to cascading-user
Hi Chris,

Both the DistributedCache methods needed to put/get a file from the
cache require a JobConf and I don't know where to get it.

My scenario is:
- build a set of cascading flows to create aggregate files. Basically
taken 10MM+ rows down to 7 rows (calculating median on a data set by
day)
- put the 7 rows into the Distributed cache
- run a separate set of flows on different source data, but use the
rows in the distributed cache in my Buffer-derived class

I can see that the 'read' from the cache goes into the #prepare
method, but I'm not sure of:
1. where to push the file so the second set of Cascading flows see
them?
2. how to get the JobConf that both methods need? Reading the Hadoop
docs it looks like the cache is only active during the lifetime of the
JobConf.

Thanks,

Chris

Chris K Wensel

unread,
Jun 1, 2009, 7:07:37 PM6/1/09
to cascadi...@googlegroups.com
Hey Chris

FlowProcess encapsulates the underlying properties configuration
object. so..
http://www.cascading.org/javadoc/cascading/flow/FlowProcess.html

flowProcess.getProperty( somekey )

will get the property value from the JobConf.

if you want the whole jobConf instance, cast flowProcess to
HadoopFlowProcess.

http://www.cascading.org/javadoc/cascading/flow/hadoop/HadoopFlowProcess.html

if you need to pass a JobConf into your flow, see
http://www.cascading.org/javadoc/cascading/flow/MultiMapReducePlanner.html#setJobConf(java.util.Map,%20JobConf)

this stuff it into the Properties object you use to instantiate a
FlowConnector.

beyond that, i've never used the DistributedCache, so don't know the
ins/outs of it.

ckw

Manish

unread,
Jun 2, 2009, 1:12:33 PM6/2/09
to cascading-user
To push the files into the Distributed cache - u need to create a
JobConf, set the file on the JobConf. Then use the JobConf to create
the FlowConnector.

Example:
JobConf jobConf = new JobConf(...)
DistributedCache.addCacheFile(uriOfFileToPush, jobConf)
properties.put("cascading.hadoop.jobconf", jobConf);
FlowConnector f = new FlowConnector(properties);
f.connect(...)

To get the file: flowProcess.getProperty( "mapred.cache.files" ) in
the prepare method.


Chris Curtin

unread,
Jun 2, 2009, 5:18:36 PM6/2/09
to cascading-user
Thanks for the information Manish.

I found something interesting when looking at the implementation of
DistributedCache: it doesn't really do anything with HDFS, it only
uses the JobConf to add to the properties the files you want to access
inside of a hadoop job.

So if you push the file to HDFS then access it in the #prepare method
using straight HDFS calls (Path() etc.) you can access the file. So
the JobConf isn't needed in the Cascading world because I can easily
pass the file name in Java when creating my Operation-derived classes.

I need to try this in my cluster instead of my desktop, but I don't
see why this way of accessing files wouldn't work.

Thanks,

Chris

Manish

unread,
Jun 3, 2009, 8:51:47 AM6/3/09
to cascading-user
Chris,

Sounds like a good solution -- the distributed cache is more useful
when you are writing Hadoop code directly (and one does not
instantiate the Mapper and reducer).
Just remember that the Path class is not serializable (use a transient
member or use a String).

Kabindra Shrestha

unread,
Feb 26, 2013, 4:07:05 AM2/26/13
to cascadi...@googlegroups.com, manish...@gmail.com
I'm new in cascading and hadoop. I'm trying to add cache file using this code but unsuccessful. 

In loader

// Job conf property

            JobConf jobConf = new JobConf();
            try {
                DistributedCache.addCacheFile(new URI("/user/hdfs/test.csv#test.csv"), jobConf);
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }

// ... ... ... //

            Properties prop = prepareProperties(this.getClass());
            prop.put("cascading.hadoop.jobconf", jobConf);
            Flow flow =  new HadoopFlowConnector(prop).connect(flowdef);
            flow.complete();

in prepare method code

 @Override
        public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
            flowProcess.getProperty("mapred.cache.files");
            super.prepare(flowProcess, operationCall);

            Path f=null;
            try {
                HadoopFlowProcess hfp = (HadoopFlowProcess) flowProcess;
                f = getFileFor(hfp,"testDiagMaster.csv");
            } catch (IOException e) {
                new RuntimeException("Error getting files from distributedCache for " + "testDiagMaster.csv", e);
            }

            try {
                FileSystem fs = FileSystem.getLocal(new Configuration());
                InputStream in = fs.open(f,1000000);
                InputStreamReader inr = new InputStreamReader(in);
                BufferedReader r = new BufferedReader(inr);
                String line;
                while ((line = r.readLine()) != null) {
                    String[] flds = line.split(";",2);
                    if (flds.length >= (2 - 1)) {
                        String key = true?flds[0]:flds[0].toLowerCase();
                        //theMap.put(key,flds[1]);
                    }
                }
                r.close();
            } catch (IOException e) {
                new RuntimeException("Error reading file " + f.toString() + " from distributedCache",e);
            }

            System.out.printf("<<HERE>>");
            //System.out.printf("MemoryJoin.prepare() for %s(mappings=%d, path=%s)%n", name, theMap.size(), f.toString());
        }

can you provide me working sample code!!

Felix

unread,
Aug 8, 2013, 1:27:24 PM8/8/13
to cascadi...@googlegroups.com, manish...@gmail.com
I am trying to explore the distributed cache as well and I think I got it working
I tried to add the files in the driver as 
       properties.put("mapred.cache.files", "pathOnHdfsToLoad#symlink_name");
       properties.put("mapred.create.symlink", "yes");

after that in my filter/function code I just did this
File f = local.pathToFile(new Path("symlink_name"));
and the code seems to be working. 

Hope that works for you.

John Lavoie

unread,
Aug 8, 2013, 5:38:59 PM8/8/13
to cascadi...@googlegroups.com, manish...@gmail.com
When I try this, I get the following exception.  Any thoughts?  It works just fine if I use the DistributedCache in a plain M/R job.  And this job works if I remvoe the two properites you listed for the distributed cache.
 
13/08/08 16:21:48 INFO util.HadoopUtil: resolving application jar from found main method on: com.uhg.optum.ap.hadoop.cascadingtest.CascadingInput2
13/08/08 16:21:48 INFO planner.HadoopPlanner: using application jar: /mapr/datalake/optuminsight/ap_data/temp/john/cascading-test-0.0.1-SNAPSHOT.jar
13/08/08 16:21:48 INFO property.AppProps: using app.id: 99446B0842C7C55BC8AF507474A74E32
13/08/08 16:21:49 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/08/08 16:21:49 INFO security.JniBasedUnixGroupsMapping: Using JniBasedUnixGroupsMapping for Group resolution
13/08/08 16:21:49 INFO util.Version: Concurrent, Inc - Cascading 2.1.4
13/08/08 16:21:49 INFO flow.Flow: [] starting
13/08/08 16:21:49 INFO flow.Flow: []  source: GlobHfs[/datalake/optuminsight/ap_data/in/symmetry-input]
13/08/08 16:21:49 INFO flow.Flow: []  sink: Hfs["SequenceFile[['etgOutput']]"]["/datalake/optuminsight/ap_data/temp/john/cascading-test3"]
13/08/08 16:21:49 INFO flow.Flow: []  parallel execution is enabled: true
13/08/08 16:21:49 INFO flow.Flow: []  starting jobs: 1
13/08/08 16:21:49 INFO flow.Flow: []  allocating threads: 1
13/08/08 16:21:49 INFO flow.FlowStep: [] starting step: (1/1) ...temp/john/cascading-test3
13/08/08 16:21:49 INFO fs.JobTrackerWatcher: Current running JobTracker is: lablmapr2.uhclab.lab/10.112.255.17:9001
13/08/08 16:21:49 INFO mapred.JobClient: Cleaning up the staging area maprfs:/var/mapr/cluster/mapred/jobTracker/staging/jlavoi3/.staging/job_201308011057_0361
13/08/08 16:21:49 INFO flow.Flow: [] stopping all jobs
13/08/08 16:21:49 INFO flow.FlowStep: [] stopping: (1/1) ...temp/john/cascading-test3
13/08/08 16:21:49 INFO flow.Flow: [] stopped all jobs
Exception in thread "main" cascading.flow.FlowException: unhandled exception
        at cascading.flow.BaseFlow.complete(BaseFlow.java:825)
        at com.uhg.optum.ap.hadoop.cascadingtest.CascadingInput2.main(CascadingInput2.java:100)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
Caused by: java.io.IOException: Cluster datalake has no entry in /opt/mapr//conf/mapr-clusters.conf
        at com.mapr.fs.ClusterConf.getClusterByPath(ClusterConf.java:382)
        at com.mapr.fs.MapRFileSystem.lookupClient(MapRFileSystem.java:257)
        at com.mapr.fs.MapRFileSystem.getMapRFileStatus(MapRFileSystem.java:684)
        at com.mapr.fs.MapRFileSystem.getFileStatus(MapRFileSystem.java:710)
        at org.apache.hadoop.filecache.DistributedCache.getFileStatus(DistributedCache.java:185)
        at org.apache.hadoop.filecache.TrackerDistributedCacheManager.determineTimestamps(TrackerDistributedCacheManager.java:846)
        at org.apache.hadoop.mapred.JobClient.copyAndConfigureFiles(JobClient.java:798)
        at org.apache.hadoop.mapred.JobClient.copyAndConfigureFiles(JobClient.java:689)
        at org.apache.hadoop.mapred.JobClient.access$300(JobClient.java:173)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:902)
        at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:885)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
        at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:885)
        at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:859)
        at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:104)
        at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:191)
        at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:145)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:120)
        at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:42)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

John Lavoie

unread,
Aug 8, 2013, 9:32:50 PM8/8/13
to cascadi...@googlegroups.com
Apparently that is the error you seen when giving it a OS path instead of an HDFS path.  Got it working now!
Reply all
Reply to author
Forward
0 new messages