Run scalding jobs on remote hadoop cluster from scala app

649 views
Skip to first unread message

Mike

unread,
Jan 22, 2014, 2:37:43 AM1/22/14
to cascadi...@googlegroups.com
Hi!

I have maven java project and now trying to use some parts in scala (probably migrate to scala in future).

In java I had something like this:

class MyJob extends org.apache.hadoop.mapreduce.Job {
  public MyJob(org.apache.hadoop.conf.Configuration conf){
    super(conf, "Job name");
    setJarByClass(MyJob.class);
    setMapperClass(MapClass.class);
  }
...
}

...

Configuration conf = new Configuration();
conf.set("mapred.job.tracker","hadoop-master:8021")
conf.set("fs.defaultFS","hdfs://hadoop-master:8020")
ControlledJob job = new ControlledJob(new MyJob(conf), null);
JobControl jc = new JobControl("Job control name");
jc.addJob(job);
Thread t = new Thread(jc);
t.start();


And I could run this jar on any server :
java -jar myjar.jar


Everywhere in scalding docs there is described the way to run scalding jobs using command
hadoop -jar ...

But I need to run scalding jobs from scala app using 
val myjob = new MyJob(args)
myjob.run

And I want to run scala app on one server and I have Hadoop cluster on other servers.

So maven will compile some executable jar from my java and scala sources and I run it with java -jsr


Any ideas on how to such thing? How to pass to the Job object org.apache.hadoop.conf.Configuration object? Or there is another way to do what I want?



Konrad Malawski

unread,
Jan 22, 2014, 5:13:52 AM1/22/14
to cascadi...@googlegroups.com
Hello Mike,
this is something I worked on recently.
Basically, since you don't build a jar hadoop doesn't know where to get the files (or even "what files"), the solution is to put all files to the Hadoop's "distributed cache", and load the classpath from there.

I have implemented a Tool which does this for you:

I use it with Scalding to run jobs on the cluster right away from the sbt shell. That's not really any different than running Cascading jobs from a Java app, so look at the impl and use it :-)

The impl currently uses the `-libjars` option, so it's rather "from the outside". It could be reimplemented to use the distributed cache APIs diretly, but that doesn't really impact if it works or not.


Let me know if it works for you, cheers!

-- 
Cheers,
Konrad Malawski


2014/1/22 Mike <mikhail....@gmail.com>

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/46553efb-6ff2-42ab-8fd4-357857c9c87d%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Mike

unread,
Jan 23, 2014, 5:50:35 AM1/23/14
to cascadi...@googlegroups.com, konrad....@project13.pl
Hi Konrad!

Thanks for your reply. 

It's not exactly what I need. I want to run jobs not from sbt console but from scala class.

Example use case for scala class:
1) prepare some objects and put them to Distributed Cache
2) run batch of connected jobs 
3) and then send some reports via e-mail

Anyway thanks for the idea, I'll probably try it.

Konrad Malawski

unread,
Jan 23, 2014, 6:57:08 AM1/23/14
to cascadi...@googlegroups.com
Oh actually that's the exact use case I'm addressing.
I said "from sbt", but what I actually meant is "I run a class with a main() from sbt", it prepares some stuff and launches the jobs using this tool, then sends notifications :-)

Take a look :-)

-- 
Cheers,
Konrad Malawski


2014/1/23 Mike <mikhail....@gmail.com>

Oscar Boykin

unread,
Jan 23, 2014, 1:31:33 PM1/23/14
to cascadi...@googlegroups.com, konrad....@project13.pl
You can do this. See the code here:


We've talked about making it easier to weave scalding into frameworks, but that will mostly be better docs.

Summingbird does what you want to do. See here:

The basic approach is this:

You create a FlowDef (mutable state of the job DAG) and Mode (wraps the config). Those should be either made implicit or passed explicitly to any read/write calls.

After you have prepared the FlowDef (scalding is just mutating that state).  Finally call:

mode.newFlowConnector(config).connect(flowDef).complete

(taken from Job.scala, buildFlow/run).

And you will have run this FlowDef run for this Mode.

I've added this to the wiki, please feel free to update:


Hope this helps.




For more options, visit https://groups.google.com/groups/opt_out.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Mike

unread,
Jan 27, 2014, 6:58:07 AM1/27/14
to cascadi...@googlegroups.com, konrad....@project13.pl
Hi Oscar!

Thanks for your reply. I tried but got nothing ((

Do you have any working example?

I have 
class WordCountJob(args: Args) extends Job(args) {
  TextLine("test/input/file.txt").flatMap('line -> 'word) {
    line: String => tokenize(line)
  }.groupBy('word) {
    _.size
  }.write(Tsv("test/output/file.csv"))

  def tokenize(text: String): Array[String] = {
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
  }
}

and I have :
object Runner extends App {
  val hadoopConfiguration: Configuration = new Configuration
  hadoopConfiguration.set("mapred.job.tracker","hadoop-master:8021")
  hadoopConfiguration.set("fs.defaultFS","hdfs://hadoop-master:8020")

  val flowDef = FlowDef.flowDef()
  val a: Args = new Args(Map())
  val job: WordCountJob = new WordCountJob(a)
  val mode = Mode(a, hadoopConfiguration)
  val flow = job.buildFlow(mode)
  flow.complete()
}

Running this code I have an error:
java.lang.RuntimeException: [ERROR] Mode must be one of --local or --hdfs, you provided 'Local(false)'

What exactly must I do to run this job from inside my code?

Oscar Boykin

unread,
Jan 27, 2014, 12:56:29 PM1/27/14
to cascadi...@googlegroups.com, Konrad Malawski
On Mon, Jan 27, 2014 at 3:58 AM, Mike <mikhail....@gmail.com> wrote:
Hi Oscar!

Thanks for your reply. I tried but got nothing ((

Just for completeness, can you be more precise? There was no output and no job? There was some failure (looks like the case below). Please be as precise as possible.
 

Do you have any working example?

I have only the summingbird code I liked to and scalding itself.
 

I have 
class WordCountJob(args: Args) extends Job(args) {
  TextLine("test/input/file.txt").flatMap('line -> 'word) {
    line: String => tokenize(line)
  }.groupBy('word) {
    _.size
  }.write(Tsv("test/output/file.csv"))

  def tokenize(text: String): Array[String] = {
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+")
  }
}

and I have :
object Runner extends App {
  val hadoopConfiguration: Configuration = new Configuration
  hadoopConfiguration.set("mapred.job.tracker","hadoop-master:8021")
  hadoopConfiguration.set("fs.defaultFS","hdfs://hadoop-master:8020")


This flowDef is unneeded and confuses things a bit. job.flowDef is the FlowDef for the job (each job has exactly one, and after the constructor it is populated).
 
val hdfsMode = Hdfs(true, new Configuration) // make a new HDFS mode
// the word count job takes two args: "input" and "output":
// you need to set them

val args = Mode.putMode(hdfsMode, Args("--input in.txt --output counts.tsv")) // in scalding 0.9.0, otherwise see below
// the mode needs to be paired with the args in scalding 0.9.0, prior to that you need:
//   Mode.mode = hdfsMode to set a global static var.
// Now create the job after the mode is set up properly.
val job: WordCountJob = new WordCountJob(args)
val flow = job.buildFlow // before scalding 0.9.0: 
// val flow = job.buildFlow(Mode.mode)
flow.complete

Let me know how this works.


For more options, visit https://groups.google.com/groups/opt_out.

Mike

unread,
Jan 28, 2014, 1:33:58 AM1/28/14
to cascadi...@googlegroups.com, Konrad Malawski
Oscar, 

you are my savior :-)

Thank you very much for your help! 

Now it works! And I am able to continue studying of scalding ;-)
Message has been deleted

Ken

unread,
Mar 25, 2016, 3:25:44 AM3/25/16
to cascading-user, konrad....@project13.pl
Hi all,

I have been trying to get the same example code to work, with little success so far.
Hope someone can see what I've been missing here.

The code I have now is :

import com.twitter.scalding._
import com.twitter.scalding
import com.twitter.scalding.Tool
import org.apache.hadoop
import org.apache.hadoop.conf.Configuration
import java.io._
import scala.util.{Failure, Success}


class WordCountJob(args: Args) extends Job(args) {

 
TextLine(args("input"))
   
.read
   
.flatMap('line -> 'word) { line: String => line.split("\\s+") }
   
.groupBy('word) { _.size }
    .write(Tsv(args("output")))
}

object WordCountJobRunner extends App {
    val hadoopConfiguration = new Configuration
    hadoopConfiguration.set("mapreduce.framework.name", "ArchiBuntu-E5550:8021")   //("mapred.job.tracker","ArchiBuntu-E5550:8021")
    hadoopConfiguration.set("fs.defaultFS","hdfs://ArchiBuntu-E5550:8020")
   
    val hdfsMode = Hdfs(strict = true, hadoopConfiguration)
    val arguments = Mode.putMode(hdfsMode, Args("--input /hdfs/in/in.txt --output /hdfs/out/counts.tsv"))

 
    // Now create the job after the mode is set up properly.
    val job: WordCountJob = new WordCountJob(arguments)
    val flow = job.buildFlow
    flow.complete()
}


The ArchiBuntu-E5550 is my local machine.
I have a CDH5 cluster running, all nodes are active

When I run the code as a scala app, this is the error:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/ken/.ivy2/cache/ch.qos.logback/logback-classic/jars/logback-classic-1.1.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/ken/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
08:14:38.020 [main] INFO  o.a.h.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
08:14:38.419 [main] INFO  o.a.h.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
08:14:38.425 [main] INFO  c.flow.hadoop.util.HadoopUtil - resolving application jar from found main method on: scala.App$class
Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.util.ClassUtil.findContainingJar(ClassUtil.java:43)
    at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:533)
    at cascading.flow.hadoop.planner.HadoopPlanner.initialize(HadoopPlanner.java:221)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:457)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:47)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:94)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:230)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:230)
    at scala.util.Success.flatMap(Try.scala:231)
    at com.twitter.scalding.Job.buildFlow(Job.scala:230)
    at WordCountJobRunner$.delayedEndpoint$WordCountJobRunner$1(WordCountJob.scala:27)
    at WordCountJobRunner$delayedInit$body.apply(WordCountJob.scala:17)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at WordCountJobRunner$.main(WordCountJob.scala:17)
    at WordCountJobRunner.main(WordCountJob.scala)

I think I've tried alle possible examples of this "wordcount on hadoop via scalding" I can google.
Reply all
Reply to author
Forward
0 new messages