Scalding: running more than one Execution using ExecutionApp in hdfs mode

259 views
Skip to first unread message

Lakshmi Gopalan

unread,
Jun 25, 2015, 6:13:29 PM6/25/15
to cascadi...@googlegroups.com
Hi,

I am trying to run the following sequence of actions by setting up Executions:

- Map some timeseries data
- Reduce data into 15-minute aggregations and prepare for the next step.
- Map the 15-minute aggregations
- Reduce into hourly buckets.

When I tried this in local mode, this worked great. When I tried to run another hadoop example job from the tutorial that also worked in my project setup, so the project is wired correctly. However once I switched to testing the real code in hdfs mode against my local hadoop, it didn't work anymore. So I'm trying various things now. 

Can someone please help with finding what I'm doing wrong? 

Environment:

Scalding 0.15.0, scala 2.11.6
Hadoop test with example project works fine.


Failed approach #1 


Command:

hadoop jar target/scala-2.11/data-aggregator-assembly-0.1-20150625T203134.jar com.org.DataAggregation --hdfs --input "/user/lakshmi/data/input.json" --output "/user/lakshmi/data/output-hadoop.txt"

Did not work. I see the error:

Exception in thread "main" java.lang.NoSuchMethodException: com.org.DataAggregation.main([Ljava.lang.String;)

at java.lang.Class.getMethod(Class.java:1678)

at org.apache.hadoop.util.RunJar.run(RunJar.java:215)

at org.apache.hadoop.util.RunJar.main(RunJar.java:136)





class DataAggregation(args: Args) extends Job(args) {

 val execution1
= Execution.from(consolidateActivity())
 val execution2
= execution1.flatMap { x =>
 
Execution.from(aggregateConsolidatedActivity(x))
}

 val jobConf
= new JobConf
 val u
: Unit = execution2.waitFor(Config.hadoopWithDefaults(jobConf), Hdfs(strict = true, jobConf)).get

 
def consolidatedActivity() = {..}

 
..

}


-----------


import com.twitter.scalding.Tool
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration

object JobRunner {
 
def main(args: Array[String]) {
 
ToolRunner.run(new Configuration, new Tool, args)
 
}
}



Failed approach #2 

Next I followed the idea in the Scalding tutorial - Calling Scalding in your application  https://github.com/twitter/scalding/wiki/Calling-Scalding-from-inside-your-application and using the ExecutionTutorial.scala https://github.com/twitter/scalding/blob/0.15.0/tutorial/execution-tutorial/ExecutionTutorial.scala example

In class DataAggregation I now have:


object LifeActivityAggregation extends ExecutionApp {

  override def job = Execution.getConfig.flatMap {config =>
   
val args = config.getArgs

   
val execution1 = Execution.from(consolidateActivity(args))
   
val execution2 = execution1.flatMap { x =>
     
Execution.from(aggregateConsolidatedActivity(x, args))
   
}

    execution2
.unit (?????)
 
}

   
......

  def aggregateConsolidatedActivity(input: TypedPipe[(String, String, String, String, String, String, String, Int, Float, Float)], args: Args) =   {

   ...

   .writeExecution(TypedTsv[((String, String, String, String, String, String, String, String, Float, Float, Float, Float, Float, Long))](args("output")))

 
}
}



Now I am not sure how to call execution2 in the last line of the 'job' definition and return an Execution[T] instance. Also I am trying to write data in the last line of method aggregateConsolidatedActivity(x, args) using writeExecution so I hope that is correct.

Command:
hadoop jar target/scala-2.11/data-aggregator-assembly-0.1-20150625T203134.jar com.org.DataAggregation --hdfs --input "/user/lakshmi/data/input.json" --output "/user/lakshmi/data/output-hadoop.txt"

Does not work. There is no output at all, so I don't believe the Hadoop job was invoked. In the debugger I do see that the args are passed in correctly and the job execution steps are reached. 

Any ideas what I am missing in this structure?

Thanks in advance.

Lakshmi


Oscar Boykin

unread,
Jun 25, 2015, 7:40:56 PM6/25/15
to cascadi...@googlegroups.com
Ahh... Tricky case.

Execution.from should be used carefully.

What you did here was make an Execution[Execution[Unit]] and then with your .unit, you threw away the inner one (which is what was doing your work).

Drop the Execution.from and you should be okay.

Execution.from is used to lift a value that is not an Execution, into an Execution.

Compare to Option.apply in scala. You had method (aggregateConsolidatedActivity) that already returned an Execution[Unit] and you wrapped it again. Not needed.

I don't know what consolidateActivity was doing, but it probably has the same issue.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/aa677f99-ea02-44b3-a298-8cc2b2ebb6a2%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Lakshmi

unread,
Jun 25, 2015, 8:42:35 PM6/25/15
to cascadi...@googlegroups.com
Thanks, Oscar!

I made the change you suggested. But I am having trouble compiling this version because for some reason the compiler thinks aggregateConsolidatedActivity() is returning Unit instead of Execution[Unit]. Also, the first execution consolidateActivity() is creating a mappedGroup and calling .values() on it. I am unable to call any methods like forceToDiskExecution, .toIterableExecution or .writeExecution on that one as well. It is likely that is the issue.

[error] .... Aggregation.scala:174: type mismatch;

[error]  found   : Unit

[error]  required: com.twitter.scalding.Execution[Unit]

[error]   }

[error]   ^

[warn] one warning found

[error] one error found


For more context, line 174 is the closing brace for aggregateConsolidatedActivity() line     

line 173:  .writeExecution(TypedTsv[((String, String, String, String, String, String, String, String, Float, Float, Float, Float, Float, Long))](args("output")))
line
174: }

More details:

object Aggregation extends ExecutionApp {



  override def job = Execution.getConfig.flatMap { config =>
   
val args = config.getArgs

   
val execution1 = Execution.from(consolidateActivity(args))
   
val execution2 = execution1.flatMap { x =>

     
aggregateConsolidatedActivity(x, args)
   
}

    execution2
.unit
 
}


  def consolidateActivity(args: Args): TypedPipe[(String, String, String, String, String, String, String, Int, Float, Float)] = {
    val activity
= TypedPipe.from(TypedJson[LifeActivity](args("input")))

    map
.groupBy { .... }

    val mapGroup
= groupBy.mapGroup { ... }.values

    mapGroup
 
}

def aggregateConsolidatedActivity(input: TypedPipe[(String, String, String, String, String, String, String, Int, Float, Float)], args: Args): Execution[Unit] = {
 
val aggregationPipe = input.flatMap {
   
...

   
...
   
.writeExecution(TypedTsv[((String, String, String, String, String, String, String, String, Float, Float, Float, Float, Float, Long))](args("output")))
}

}

Thanks,
Lakshmi

Oscar Boykin

unread,
Jun 25, 2015, 8:51:04 PM6/25/15
to cascadi...@googlegroups.com
That's because you are returning Unit. You are capturing the val aggregationPipe, but not returning it. You need to either not have the val, or make aggregationPipe the last line in your expression.

PS: I'm happy to help with scalding, but general scala debug assistance is generally out of scope for what I can provide. If you hit an issue you think is scalding related, let me know, otherwise perhaps the scala irc channel on freenode is a good place to check.


For more options, visit https://groups.google.com/d/optout.

Lakshmi

unread,
Jun 25, 2015, 9:57:23 PM6/25/15
to cascadi...@googlegroups.com
Thanks, that was what I was missing!! I appreciate your timely help. And your comment is duly noted. I apologize, I'm just dusting up my Scala syntax as well, and assumed it was related to how I was wiring the scalding job. 

I have a related question on the performance of this job. I have been reading the documentation (and various comments online), and see that it is best to call forceToDiskExecution, .writeExecution or .toIterableExecution. But in the case of my first execution (consolidatedActivity()) as you can see I have to directly access "mapGroup.values" which returns a TypedPipe and I don't have a way to make it an execution. Is this a problem, or is that avoided by calling Execution.from(consolidatedActivity())? 

Specifically I am referring to this rule in the tutorial https://github.com/twitter/scalding/wiki/Calling-Scalding-from-inside-your-application:

Some rules
  1. When using Execution NEVER use .write or .toPipe (or call any method that takes an implicit flowDef). Instead use .writeExecution, .toIterableExecution, or .forceToDiskExecution. (see scaladocs). 

Thank you,

Lakshmi
...

Oscar Boykin

unread,
Jun 25, 2015, 10:03:49 PM6/25/15
to cascadi...@googlegroups.com
That is not a problem. You just want to avoid any method that takes an implicit flowDef and mode when working with Execution (which are .toPipe and .write). It is totally fine (and preferrable) to work with TypedPipe/Group/CoGrouped as long as you can. Execution should only be needed to write something out, or to do a loop.

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.

For more options, visit https://groups.google.com/d/optout.

Lakshmi Gopalan

unread,
Jun 25, 2015, 10:32:19 PM6/25/15
to cascadi...@googlegroups.com
That makes perfect sense, thanks.

Lakshmi

You received this message because you are subscribed to a topic in the Google Groups "cascading-user" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/cascading-user/QWGWW3wKkXo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to cascading-use...@googlegroups.com.

To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
Reply all
Reply to author
Forward
0 new messages