Hello all,
I am new to hadoop, cascading, scalding etc. Plan to move our current etl scripts to scalding, but got the issue described in the subject in the first try. I cannot make scalding jobs work in hdfs mode/cluster.
I know this issue has been discussed before, e.g.
https://groups.google.com/forum/#!topic/cascading-user/1P5PLePdDv0 and
https://groups.google.com/forum/#!topic/cascading-user/WaHc5MaIlIsbut none of solutions mentioned solve my issue.
Here is what I have:
hadoop 2.7.1
scalding 0.15.0
I tried the scalding example from
https://github.com/snowplow/scalding-example-project. I ran it using the following command line:
hadoop jar target/scala-2.10/scalding-example-project-0.0.5.jar com.snowplowanalytics.hadoop.scalding.WordCountJob --hdfs --input input/pg2600.txt --output output
It worked very well. The job was submitted to the cluster (1 name node and 2 data nodes), and finished successfully. I then made some changes to the WordCountJob (highlighted part):
TextLine( args("input") ) |
.flatMap('line -> 'word) { line : String => tokenize(line) } |
.groupBy('word) { _.size }.groupAll{_.sortBy('size).reverse} |
.write( Tsv( args("output") ) ) |
Then I got the following output when running the same command:
15/07/31 20:10:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/07/31 20:10:31 INFO util.HadoopUtil: resolving application jar from found main method on: com.snowplowanalytics.hadoop.scalding.JobRunner$
15/07/31 20:10:31 INFO planner.HadoopPlanner: using application jar: /home/hdfs/projects/scalding-example-project/target/scala-2.10/scalding-example-project-0.0.5.jar
15/07/31 20:10:31 INFO property.AppProps: using
app.id: 52F5E2B6507E4249B042C4D2C8B265BD
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
15/07/31 20:10:31 INFO
hadoop.Hfs: forcing job to local mode, via sink: TempHfs["SequenceFile[['word', 'size', '__groupAll__']]"][9042714110/com.twitter.scalding.Text/]
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
15/07/31 20:10:31 INFO
hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['word', 'size', '__groupAll__']]"][9042714110/com.twitter.scalding.Text/]
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
15/07/31 20:10:31 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
15/07/31 20:10:31 INFO util.Version: Concurrent, Inc - Cascading 2.6.1
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] starting
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] source: Hfs["TextLine[['offset', 'line']->[ALL]]"]["input/pg2600.txt"]
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] sink: Hfs["TextDelimited[['word', 'size']]"]["output"]
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] parallel execution is enabled: true
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] starting jobs: 2
15/07/31 20:10:31 INFO flow.Flow: [com.snowplowanalytics....] allocating threads: 2
I also tried the sample from scalding wiki:
https://gist.github.com/krishnanraman/4696053, ran the command line:
hadoop jar target/scala-2.10/scalding-example-project-0.0.5.jar com.snowplowanalytics.hadoop.scalding.PopulationStats --hdfs --input /user/hdfs/input --output /user/hdfs/output
the output is as the following:
15/07/31 18:38:14 INFO property.AppProps: using
app.id: E7069EF39F8F4476B610F602A425BCFF
15/07/31 18:38:14 INFO Configuration.deprecation: mapred.used.genericoptionsparser is deprecated. Instead, use mapreduce.client.genericoptionsparser.used
15/07/31 18:38:14 INFO
hadoop.Hfs: forcing job to local mode, via sink: TempHfs["SequenceFile[['fips', 'population', 'year']]"][4974443441/com.snowplowanalytics.had/]
15/07/31 18:38:14 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
15/07/31 18:38:14 INFO
hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['fips', 'population', 'year']]"][4974443441/com.snowplowanalytics.had/]
15/07/31 18:38:14 INFO
hadoop.Hfs: forcing job to local mode, via source: TempHfs["SequenceFile[['fips', 'growth']]"]
[4045933129/_pipe_1/]
I am pretty sure all input and output are in hdfs, not in local file system. You can see that these simple scalding jobs cannot run in cluster. It seems if there is any need to write temporary files, hadoop would force to use local mode.
Any suggestions? Please help! Thank you.