forcing job to local mode, via source/sink: TempHfs

33 views
Skip to first unread message

Guanpi Lai

unread,
Jul 31, 2015, 11:16:25 PM7/31/15
to cascading-user
Hello all,

I am new to hadoop, cascading, scalding etc. Plan to move our current etl scripts to scalding, but got the issue described in the subject in the first try. I cannot make scalding jobs work in hdfs mode/cluster.

I know this issue has been discussed before, e.g. https://groups.google.com/forum/#!topic/cascading-user/1P5PLePdDv0 and https://groups.google.com/forum/#!topic/cascading-user/WaHc5MaIlIs
but none of solutions mentioned solve my issue.

Here is what I have:
hadoop 2.7.1
scalding 0.15.0

I tried the scalding example from https://github.com/snowplow/scalding-example-project. I ran it using the following command line:

hadoop jar target/scala-2.10/scalding-example-project-0.0.5.jar com.snowplowanalytics.hadoop.scalding.WordCountJob --hdfs --input input/pg2600.txt --output output

It worked very well. The job was submitted to the cluster (1 name node and 2 data nodes), and finished successfully. I then made some changes to the WordCountJob (highlighted part):
TextLine( args("input") ) .flatMap('line -> 'word) { line : String => tokenize(line) } .groupBy('word) { _.size }.groupAll{_.sortBy('size).reverse} .write( Tsv( args("output") ) )

Then I got the following output when running the same command:
15/07/31 20:10:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/31 20:10:31 INFO util.HadoopUtil: resolving application jar from found main method on: com.snowplowanalytics.hadoop.scalding.JobRunner$
15/07/31 20:10:31 INFO planner.HadoopPlanner: using application jar: /home/hdfs/projects/scalding-example-project/target/scala-2.10/scalding-example-project-0.0.5.jar
15/07/31 20:10:31 INFO property.AppProps: using app.id: 52F5E2B6507E4249B042C4D2C8B265BD
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
15/07/31 20:10:31 INFO hadoop.Hfs: forcing job to local mode, via sink: TempHfs["SequenceFile[['word', 'size', '__groupAll__']]"][9042714110/com.twitter.scalding.Text/]
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
15/07/31 20:10:31 INFO hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['word', 'size', '__groupAll__']]"][9042714110/com.twitter.scalding.Text/]
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/07/31 20:10:31 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] starting
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....]  source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/pg2600.txt"]
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....]  sink: Hfs["TextDelimited[['word', 'size']]"]["output"]
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....]  parallel execution is enabled: true
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....]  starting jobs: 2
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....]  allocating threads: 2


I also tried the sample from scalding wiki: https://gist.github.com/krishnanraman/4696053, ran the command line:
hadoop jar target/scala-2.10/scalding-example-project-0.0.5.jar com.snowplowanalytics.hadoop.scalding.PopulationStats --hdfs --input /user/hdfs/input --output /user/hdfs/output

the output is as the following:
15/07/31 18:38:14 INFO property.AppProps: using app.id: E7069EF39F8F4476B610F602A425BCFF
15/07/31 18:38:14 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
15/07/31 18:38:14 INFO hadoop.Hfs: forcing job to local mode, via sink: TempHfs["SequenceFile[['fips', 'population', 'year']]"][4974443441/com.snowplowanalytics.had/]
15/07/31 18:38:14 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
15/07/31 18:38:14 INFO hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['fips', 'population', 'year']]"][4974443441/com.snowplowanalytics.had/]
15/07/31 18:38:14 INFO hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['fips', 'growth']]"][4045933129/_pipe_1/]

I am pretty sure all input and output are in hdfs, not in local file system. You can see that these simple scalding jobs cannot run in cluster. It seems if there is any need to write temporary files, hadoop would force to use local mode.

Any suggestions? Please help! Thank you.



Oscar Boykin

unread,
Aug 1, 2015, 3:33:59 PM8/1/15
to cascadi...@googlegroups.com
I've never seen this before. That said, when you groupAll, you have no parallelism on the reducer, so it is not likely much (or any) slower to run locally.

It looks like the message comes from here:

I wonder what you tmpdir is set to. Perhaps you have a tmpdir that is not set to use HDFS?

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/6ef46eee-52cc-4dca-af99-e15eda761444%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Guanpi Lai

unread,
Aug 2, 2015, 1:41:55 PM8/2/15
to cascading-user
Thank you Oscar. Your answer is very helpful. I followed some tutorial online to setup hadoop cluster. The root cause is I had a timpdir set to use local folder.

So after I change core-site.xml as the following, and created the /tmp folder on hdfs, the problem is solved.

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/tmp</value>
        <!--value>file:/usr/share/hadoop/tmp</value>-->
        <description>A base for other temporary directories.</description>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://master:9000</value>
    </property>
</configuration>
Reply all
Reply to author
Forward
0 new messages