PartitionedFileSet & Spark Scala Program

76 views
Skip to first unread message

Sharanya Santhanam

unread,
Aug 3, 2016, 8:31:20 PM8/3/16
to CDAP User
Similar to the example of the SportsResults example, I want to create an application that has a spark program doing aggregation on a specific partition file. The partition to be used should be provided at runtime. 

The documentation has examples for Map Reduce Programs. Can you please share with me an example of how I can use partitionedFileSets with a spark Scala Program?
How do I make the spark program use the partition ? 

Thanks,
Sharanya 

Rohit Sinha

unread,
Aug 3, 2016, 8:52:36 PM8/3/16
to CDAP User
Hello Sharanya,

You can have a look at ResponseCounterProgram Scala Spark Program which uses Time Partitioned Filesets. Using Partitioned Filesets will be very similar, where you will specify the partition key as done in the example you are already referring to.


You can access runtime arguments in CDAP Spark Program (which in your case will be partitions). Here is an example: 

Thanks.
Message has been deleted

Rohit Sinha

unread,
Aug 18, 2016, 1:21:55 AM8/18/16
to cdap...@googlegroups.com, Sharanya Santhanam
Hello Sharanya, 
I wanted to follow up with you on this question and check if you got it resolved ? This is very similar to the other question you asked here: 

Please let us know, if you need any further information on this and we will be happy to help.

Thanks. 

On Thu, Aug 4, 2016 at 2:38 PM, Sharanya Santhanam <santhana...@gmail.com> wrote:
Hey Rohit, 

I have a simple application to do a wordCount using spark scala on PartitionedFileSet. I have re-used the SportResults UploadService to create a partitionedDataset .
Im having trouble saving the Rdd into a fileset dataset. I am currently running CDAP 3.4.2 a Hadoop 2.7 cluster with a custom spark installation of 1.5.2. 
I dont see any other errors on the Master logs. Can you please help out. 


2016-08-04 21:26:13,404 - ERROR [Driver:o.a.s.d.y.ApplicationMaster@96] - User class threw exception: co.cask.tephra.TransactionFailureException: Exception raised in transactional execution. Cause: Output directory not set.
co.cask.tephra.TransactionFailureException: Exception raised in transactional execution. Cause: Output directory not set.
	at co.cask.cdap.data2.transaction.Transactions.asTransactionFailure(Transactions.java:87) ~[co.cask.cdap.cdap-data-fabric-3.4.2.jar:na]
	at co.cask.cdap.data2.transaction.Transactions.asTransactionFailure(Transactions.java:73) ~[co.cask.cdap.cdap-data-fabric-3.4.2.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:215) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	at co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext.saveAsDataset(DefaultSparkExecutionContext.scala:202) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	at co.cask.cdap.api.spark.SparkMain$SparkProgramRDDFunctions.saveAsDataset(SparkMain.scala:121) ~[co.cask.cdap.cdap-api-spark-3.4.2.jar:na]
	at co.cask.cdap.scala.WordCountProgram.run(WordCountProgram.scala:65) ~[program.expanded.jar/:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:92) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_60]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_60]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_60]
	at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_60]
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) ~[spark-assembly-1.5.2-******]
Caused by: org.apache.hadoop.mapred.InvalidJobConfException: Output directory not set.
	at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:138) ~[hadoop-mapreduce-client-core-2.7.2-sfdc-1.0.1.jar:2.7.2-sfdc-1.0.1]
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1011) ~[spark-assembly-1.5.2-*****]
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998) ~[spark-assembly-1.5.2-***]
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1.apply(PairRDDFunctions.scala:998) ~[spark-assembly-1.5.2-***]
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) ~[spark-assembly-1.5.2-*****]
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) ~[spark-assembly-1.5.2-****]
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) ~[spark-assembly-1.5.2-****]
	at org.apache.spark.rdd.PairRDDFunctions.saveAsNewAPIHadoopDataset(PairRDDFunctions.scala:998) ~[spark-assembly-1.5.2-****]
	at co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext$$anon$2.run(DefaultSparkExecutionContext.scala:224) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkTransactional.execute(SparkTransactional.java:197) ~[co.cask.cdap.cdap-spark-core-3.4.2.jar:na]
	... 10 common frames omitted

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/ba4c21b9-9f8f-44f5-9241-23e315acaca4%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Rohit Sinha
Software Engineer
rsi...@cask.co
+1 612 735 5213

Sharanya Santhanam

unread,
Aug 18, 2016, 2:45:37 PM8/18/16
to CDAP User
Thanks Rohit for pointing me to the example! I have the hang of it now. 


On Wednesday, August 3, 2016 at 5:31:20 PM UTC-7, Sharanya Santhanam wrote:
Reply all
Reply to author
Forward
0 new messages