Not able to create dataset in spark using the local cdap installation

163 views
Skip to first unread message

Naresh Dulam

unread,
Jun 22, 2017, 4:35:33 PM6/22/17
to CDAP User
Hi All,

I have the following data available in below path and trying to implement a simple spark app to read and write data using cdap spark API. I am newbie with  CDAP spark API
I am trying to implement POC using spark cdap. Below is the error messsage and  code. Please point me in what is missing in my code.



Input file available at location

/cdap/data/namespaces/testspace/data/file.txt


Below is code:


public class AppSpark extends AbstractSpark
{
@Override
protected void configure() {

createDataset("gold", FileSet.class,
FileSetProperties.builder().setBasePath("cdap")
.setInputFormat(TextInputFormat.class)
.setOutputFormat(TextOutputFormat.class)
.build());

createDataset("out", FileSet.class,
FileSetProperties.builder().setOutputProperty(TextOutputFormat.SEPERATOR, ",").setBasePath("result")
.setInputFormat(TextInputFormat.class)
.setOutputFormat(TextOutputFormat.class)
.build());

setName(AppSpark.class.getName());
setDescription("spark program to access HDFS data");
setMainClass(ScalaSparkHDFS.class);
setDriverResources(new Resources(1024));
setExecutorResources(new Resources(1024, 2));


}
}




class ScalaSparkHDFS extends  SparkMain
{
override def run(implicit sec: SparkExecutionContext): Unit = {

val sc = new SparkContext
val lines: RDD[(LongWritable,Text)] = sc.fromDataset("gold")
val data = lines.map(line=>(line._1,line._2))
println(data.count())
data.saveAsDataset("nd2629","out")

}

}


 */
public class App extends AbstractApplication
{

@Override
public void configure() {
addSpark(new AppSpark());


}
}



Error message throwing:



SNAPSHOT.spark.sparkhdfs.AppSpark, runId=23ede9f3-5789-11e7-b613-acde48001122}
2017-06-22 15:27:02,870 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Shutting down remote daemon.
2017-06-22 15:27:02,875 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remote daemon shut down; proceeding with flushing remote transports.
2017-06-22 15:27:02,892 - ERROR [SparkRunnersparkhdfs.AppSpark:c.c.c.i.a.r.ProgramControllerServiceAdapter@93] - Program terminated with exception
java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:277) ~[co.cask.cdap.cdap-spark-core-4.1.1.jar:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService$2$1.run(SparkRuntimeService.java:335) [co.cask.cdap.cdap-spark-core-4.1.1.jar:na]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.io.IOException: No input paths specified in job
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at co.cask.cdap.app.runtime.spark.DatasetRDD.getPartitions(DatasetRDD.scala:83) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) ~[na:na]
	at org.apache.spark.rdd.RDD.count(RDD.scala:1157) ~[na:na]
	at sparkhdfs.ScalaSparkHDFS.run(ScalaSparkHDFS.scala:22) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:100) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[na:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.submit(AbstractSparkSubmitter.java:168) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.access$000(AbstractSparkSubmitter.java:55) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter$5.run(AbstractSparkSubmitter.java:112) ~[na:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
	... 1 common frames omitted
2017-06-22 15:27:02,893 - DEBUG [pcontroller-program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-23ede9f3-5789-11e7-b613-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@433] - Removing RuntimeInfo: Spark sparkhdfs.AppSpark 23ede9f3-5789-11e7-b613-acde48001122
2017-06-22 15:27:02,893 - DEBUG [pcontroller-program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-23ede9f3-5789-11e7-b613-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@436] - RuntimeInfo removed: RuntimeInfo{programId=program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, twillRunId=null}
2017-06-22 15:27:02,906 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.g.c.b.Throwables@33] - Uncaught exception in thread Thread[SparkRunnersparkhdfs.AppSpark,5,appfabric-executor-thread]
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:69) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService$2$1.run(SparkRuntimeService.java:335) ~[na:na]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:277) ~[na:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[com.google.guava.guava-13.0.1.jar:na]
	... 2 common frames omitted
Caused by: java.io.IOException: No input paths specified in job
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at co.cask.cdap.app.runtime.spark.DatasetRDD.getPartitions(DatasetRDD.scala:83) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) ~[na:na]
	at org.apache.spark.rdd.RDD.count(RDD.scala:1157) ~[na:na]
	at sparkhdfs.ScalaSparkHDFS.run(ScalaSparkHDFS.scala:22) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:100) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[na:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.submit(AbstractSparkSubmitter.java:168) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.access$000(AbstractSparkSubmitter.java:55) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter$5.run(AbstractSparkSubmitter.java:112) ~[na:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
	... 1 common frames omitted
2017-06-22 15:27:02,915 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remoting shut down.

ali

unread,
Jun 22, 2017, 8:21:13 PM6/22/17
to CDAP User
Hi Naresh.

The error "No input paths specified in job" indicates that the job ran without any input files.
The input file that you want to use as input to the MapReduce job is actually at /cdap/data/namespaces/testspace/data/file.txt, which is outside of the gold FileSet's path.
If you move your file.txt into the following directory, it should work:
/cdap/namespaces/testspace/data/gold/cdap

Generally, if you setup the input file via the FileSet APIs, that is the directory where the file would have been written.

Regards,

Ali Anwar

Naresh Dulam

unread,
Jun 23, 2017, 10:54:52 AM6/23/17
to CDAP User

Hi Ali,

I moved the file to  /<localcdapinstallfolder>/data/namespaces/testspace/data/gold/cdap/file.txt  but still getting the same error.

My understanding on below line code of input  path configuration, data set name is "gold" and  input file path is  in   "/<localcdapinstallfolder>/data/namespaces/testspace/data/cdap";  (base path is "cdap")

createDataset("gold",  FileSet.class,FileSetProperties.builder().setBasePath("cdap").setInputFormat(TextInputFormat.class).setOutputFormat(TextOutputFormat.class).build());


And also it would be better to show the in error message of which is the path cdap looking for input path when it fails, like wise we get in normal mapreduce jobs.


Thank you,
Naresh

Naresh Dulam

unread,
Jun 23, 2017, 11:55:35 AM6/23/17
to CDAP User
And also Placed file in the following locations

/<cdaplocalinstall path>/data/namespaces/testspace/file1.txt
/<cdaplocalinstall path>/data/namespaces/testspace/data/gold/file1.txt 
/<cdaplocalinstall path>/data/namespaces/testspace/data/gold/cdap/file.txt
/<cdaplocalinstall path>/data/namespaces/testspace/cdap/file1.txt
/<cdaplocalinstall path>/data/namespaces/testspace/cdap/gold/file1.txt

And ran the application, Still its throwing no input paths specified exception.

Ali Anwar

unread,
Jun 23, 2017, 4:20:29 PM6/23/17
to cdap...@googlegroups.com
Hi Naresh.

Could you please attach the full logs of the failing program run?

Thanks,

Ali Anwar

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/cdap-user/a8fe92c2-7185-4c94-8280-8f82d90b9aa4%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Naresh Dulam

unread,
Jun 23, 2017, 4:21:45 PM6/23/17
to CDAP User
Please find logs from cdap UI

2017-06-23 15:17:11,616 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.s.AbstractSparkSubmitter@162] - Calling SparkSubmit for program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark f052ddd1-5850-11e7-af09-acde48001122: [--master, local[2], --class, co.cask.cdap.app.runtime.spark.SparkMainWrapper, --conf, spark.app.name=sparkhdfs.AppSpark, --conf, spark.executor.memory=512m, --conf, spark.driver.memory=512m, --conf, spark.local.dir=/Users/nd2629/cdap/data/tmp/1498249031179-0, --conf, spark.driver.cores=1, --conf, spark.ui.port=0, --conf, spark.executor.cores=1, --conf, spark.metrics.conf=/Users/nd2629/cdap/data/tmp/1498249031179-0/metrics1320711439257084548.properties, --conf, spark.cdap.localized.resources=[], /Users/nd2629/cdap/data/tmp/1498249031179-0/emptyJob.jar, --userClass=sparkhdfs.ScalaSparkHDFS]
2017-06-23 15:17:11,780 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@257] - Starting sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:0...
2017-06-23 15:17:11,782 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@262] - Started sparkhdfs.AppSpark-spark-tx http service on address /127.0.0.1:61260
2017-06-23 15:17:11,788 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@21] - ***********11**********
2017-06-23 15:17:11,827 - WARN  [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:o.a.s.SparkConf@70] - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
2017-06-23 15:17:15,463 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:a.e.s.Slf4jLogger@80] - Slf4jLogger started
2017-06-23 15:17:15,600 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:Remoting@74] - Starting remoting
2017-06-23 15:17:15,708 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:Remoting@74] - Remoting started; listening on addresses :[akka.tcp://sparkDriver...@10.20.13.80:61339]
2017-06-23 15:17:16,490 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkMetricsSink@51] - Using SparkMetricsSink for reporting metrics: {class=co.cask.cdap.app.runtime.spark.SparkMetricsSink}
2017-06-23 15:17:16,605 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@23] - ***********2**********
2017-06-23 15:17:16,671 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@25] - ***********3**********
2017-06-23 15:17:17,002 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@28] - ***********4**********
2017-06-23 15:17:17,015 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.f052ddd1-5850-11e7-af09-acde48001122, dataset dataset:nd2629.inputdataset, accessType READ, component null, accessTime = 1498249037015
2017-06-23 15:17:17,658 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@6ea552db
2017-06-23 15:17:17,667 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Shutting down remote daemon.
2017-06-23 15:17:17,671 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remote daemon shut down; proceeding with flushing remote transports.
2017-06-23 15:17:17,703 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remoting shut down.
2017-06-23 15:17:17,711 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@6ea552db
2017-06-23 15:17:17,714 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@274] - Stopping sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:61260...
2017-06-23 15:17:17,715 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@288] - Stopped sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:61260
2017-06-23 15:17:17,721 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@717] - Running Spark shutdown hook org.apache.spark.util.SparkShutdownHookManager$$anon$2@294b96f9
2017-06-23 15:17:17,733 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@752] - BeanIntrospector.ctorParamNamesCache has been invalidated.
2017-06-23 15:17:17,734 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@307] - Spark program completed: SparkRuntimeContext{id=program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, runId=f052ddd1-5850-11e7-af09-acde48001122}
2017-06-23 15:17:17,753 - ERROR [SparkRunnersparkhdfs.AppSpark:c.c.c.i.a.r.ProgramControllerServiceAdapter@93] - Program terminated with exception
java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:277) ~[co.cask.cdap.cdap-spark-core-4.1.1.jar:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService$2$1.run(SparkRuntimeService.java:335) [co.cask.cdap.cdap-spark-core-4.1.1.jar:na]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.io.IOException: No input paths specified in job
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at co.cask.cdap.app.runtime.spark.DatasetRDD.getPartitions(DatasetRDD.scala:83) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) ~[na:na]
	at org.apache.spark.rdd.RDD.count(RDD.scala:1157) ~[na:na]
	at sparkhdfs.ScalaSparkHDFS.run(ScalaSparkHDFS.scala:29) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:100) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[na:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.submit(AbstractSparkSubmitter.java:168) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.access$000(AbstractSparkSubmitter.java:55) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter$5.run(AbstractSparkSubmitter.java:112) ~[na:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
	... 1 common frames omitted
2017-06-23 15:17:17,754 - DEBUG [pcontroller-program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@433] - Removing RuntimeInfo: Spark sparkhdfs.AppSpark f052ddd1-5850-11e7-af09-acde48001122
2017-06-23 15:17:17,754 - DEBUG [pcontroller-program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@436] - RuntimeInfo removed: RuntimeInfo{programId=program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, twillRunId=null}
2017-06-23 15:17:17,765 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.g.c.b.Throwables@33] - Uncaught exception in thread Thread[SparkRunnersparkhdfs.AppSpark,5,appfabric-executor-thread]
java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.base.Throwables.propagate(Throwables.java:160) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:69) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService$2$1.run(SparkRuntimeService.java:335) ~[na:na]
	at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: No input paths specified in job
	at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:294) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:281) ~[com.google.guava.guava-13.0.1.jar:na]
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[com.google.guava.guava-13.0.1.jar:na]
	at co.cask.cdap.app.runtime.spark.SparkRuntimeService.run(SparkRuntimeService.java:277) ~[na:na]
	at com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52) ~[com.google.guava.guava-13.0.1.jar:na]
	... 2 common frames omitted
Caused by: java.io.IOException: No input paths specified in job
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:340) ~[org.apache.hadoop.hadoop-mapreduce-client-core-2.3.0.jar:na]
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at co.cask.cdap.app.runtime.spark.DatasetRDD.getPartitions(DatasetRDD.scala:83) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) ~[na:na]
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) ~[na:na]
	at scala.Option.getOrElse(Option.scala:120) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) ~[na:na]
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) ~[na:na]
	at org.apache.spark.rdd.RDD.count(RDD.scala:1157) ~[na:na]
	at sparkhdfs.ScalaSparkHDFS.run(ScalaSparkHDFS.scala:29) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper$.main(SparkMainWrapper.scala:100) ~[na:na]
	at co.cask.cdap.app.runtime.spark.SparkMainWrapper.main(SparkMainWrapper.scala) ~[na:na]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) ~[na:na]
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.submit(AbstractSparkSubmitter.java:168) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.access$000(AbstractSparkSubmitter.java:55) ~[na:na]
	at co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter$5.run(AbstractSparkSubmitter.java:112) ~[na:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_121]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
	... 1 common frames omitted
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+...@googlegroups.com.

To post to this group, send email to cdap...@googlegroups.com.

Ali Anwar

unread,
Jun 23, 2017, 4:37:51 PM6/23/17
to cdap...@googlegroups.com
From the logs, it seems like the program is running in namespace "nd2629", whereas you've put the files in a dataset for namespace "testspace". The main issue that you're encountering is that when using FileSet as input, you have to use runtime arguments of the dataset to specify the relative path within that fileset to read. For example, this test case passes the arguments for the dataset via arguments to the program: FileSetWordCountTest. Something along the following three lines should work for you: Map<String, String> inputArgs = Maps.newHashMap(); FileSetArguments.setInputPaths(inputArgs, "file.txt"); sc.fromDataset("gold", inputArgs) I definitely agree that the error message could be improved. I filed CDAP-11979 for that. Regards, Ali Anwar

On Fri, Jun 23, 2017 at 1:21 PM, Naresh Dulam <nares...@gmail.com> wrote:
Please find logs from cdap UI

2017-06-23 15:17:11,616 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.s.AbstractSparkSubmitter@162] - Calling SparkSubmit for program:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark f052ddd1-5850-11e7-af09-acde48001122: [--master, local[2], --class, co.cask.cdap.app.runtime.spark.SparkMainWrapper, --conf, spark.app.name=sparkhdfs.AppSpark, --conf, spark.executor.memory=512m, --conf, spark.driver.memory=512m, --conf, spark.local.dir=/Users/nd2629/cdap/data/tmp/1498249031179-0, --conf, spark.driver.cores=1, --conf, spark.ui.port=0, --conf, spark.executor.cores=1, --conf, spark.metrics.conf=/Users/nd2629/cdap/data/tmp/1498249031179-0/metrics1320711439257084548.properties, --conf, spark.cdap.localized.resources=[], /Users/nd2629/cdap/data/tmp/1498249031179-0/emptyJob.jar, --userClass=sparkhdfs.ScalaSparkHDFS]
2017-06-23 15:17:11,780 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@257] - Starting sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:0...
2017-06-23 15:17:11,782 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@262] - Started sparkhdfs.AppSpark-spark-tx http service on address /127.0.0.1:61260
2017-06-23 15:17:11,788 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@21] - ***********11**********
2017-06-23 15:17:11,827 - WARN  [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:o.a.s.SparkConf@70] - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
2017-06-23 15:17:15,463 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:a.e.s.Slf4jLogger@80] - Slf4jLogger started
2017-06-23 15:17:15,600 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:Remoting@74
] - Starting remoting
2017-06-23 15:17:15,708 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:Remoting@74] - Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSys...@10.20.13.80:61339]
2017-06-23 15:17:16,490 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkMetricsSink@51] - Using SparkMetricsSink for reporting metrics: {class=co.cask.cdap.app.runtime.spark.SparkMetricsSink}
2017-06-23 15:17:16,605 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@23] - ***********2**********
2017-06-23 15:17:16,671 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@25] - ***********3**********
2017-06-23 15:17:17,002 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:s.ScalaSparkHDFS@28] - ***********4**********
2017-06-23 15:17:17,015 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:nd2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.f052ddd1-5850-11e7-af09-acde48001122, dataset dataset:nd2629.inputdataset, accessType READ, component null, accessTime = 1498249037015
2017-06-23 15:17:17,658 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@6ea552db
2017-06-23 15:17:17,667 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Shutting down remote daemon.
2017-06-23 15:17:17,671 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remote daemon shut down; proceeding with flushing remote transports.
2017-06-23 15:17:17,703 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remoting shut down.
2017-06-23 15:17:17,711 - DEBUG [spark-submitter-sparkhdfs.AppSpark-f052ddd1-5850-11e7-af09-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@6ea552db
2017-06-23 15:17:17,714 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@274] - Stopping sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:61260...
2017-06-23 15:17:17,715 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@288] - Stopped sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:61260
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.

To post to this group, send email to cdap...@googlegroups.com.

Kunjan Mehta

unread,
Jun 23, 2017, 4:52:06 PM6/23/17
to cdap...@googlegroups.com
I want to unsubscribe from this group. Thank you.

Regards,
Kunjan.


For more options, visit https://groups.google.com/d/optout.



--
Regards,
Kunjan.

Naresh Dulam

unread,
Jun 24, 2017, 2:47:48 PM6/24/17
to CDAP User
Hi Ali,

Thanks Ali, Now we are able to read after including 
Map<String, String> inputArgs = Maps.newHashMap();
FileSetArguments.setInputPaths(inputArgs, "file1.txt");

But, Its not feasible to give individual file each time. Requirement is to read all files in a directory. I was going through API's and don't see anything to read a files in a 
base directory. Do we have any api to read files from base path? 



Thank you,
Naresh



Ali Anwar

unread,
Jun 24, 2017, 3:41:59 PM6/24/17
to cdap...@googlegroups.com
Hi Naresh.

Within the FileSet's base path, you could write directories instead of individual files. In that case, you could then specify the directory's relative path, and that would read all of the files in that directory.
For reading all files in the FileSet's base path, I filed https://issues.cask.co/browse/CDAP-11986.

Regards,

Ali Anwar

To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.

To post to this group, send email to cdap...@googlegroups.com.

Ali Anwar

unread,
Jun 24, 2017, 5:25:30 PM6/24/17
to cdap...@googlegroups.com
Hi Naresh.

To follow up to my previous post, As Andreas suggests in the JIRA, you can try setting "/" as the input path to pickup all files in the FileSet.

Let me know how it goes.

Regards,

Ali Anwar

Naresh Dulam

unread,
Jun 24, 2017, 6:03:07 PM6/24/17
to CDAP User
Hi Ali,

CDAP is able to read files in a directory.
Now it understood that how its behaving if we set the base path, as below

createDataset("inputdataset", .... ,.setBasePath("cdap"), .. );
createDataset("out",... , setBasePath("result"), ......);
FileSetArguments.setInputPaths(inputArgs, "input");
JavaPairRDD<LongWritable,Text> data = jse.fromDataset("inputdataset",inputArgs);

cdap looking for files in below location   /<cdap>/namespaces/<namespacename>/data/cdap/input/

I feel its missing in docs, and the more descriptive error message will give the what was missing when we encounter an error.

I am attaching sample code for references if someone finds difficulty in writing their first spark program using cdap API.

Thanks for Quick response.




Thank you,
Naresh
CDAP SPARK PROGRAM to Read files in HDFS.docx

Ali Anwar

unread,
Jun 24, 2017, 6:44:44 PM6/24/17
to cdap...@googlegroups.com
Hi Naresh.

Thanks for sharing the code. I filed a JIRA earlier for the improved error message.
You could also try FileSetArguments.setInputPath(inputArgs, "/"), to see if that will enable it to read all files in the base path of the FileSet dataset.

Regards,

Ali Anwar

To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.

To post to this group, send email to cdap...@googlegroups.com.

Naresh Dulam

unread,
Jun 26, 2017, 12:37:23 PM6/26/17
to CDAP User
Hi Ali,

Not able to read using inputArgs "/". 

Attached is error message.

Directory structure is

<cdap>/namespace/<nameofnamespace>/data/cdap/user/file.txt
<cdap>/namespace/<nameofnamespace>/data/cdap/transaction/txns.txt


Another Question?

How can we create two different rdd's from two different paths as shown above directory structure?
I used below below code, but its not able to read data from transactions directory.


FileSetArguments.setInputPaths(inputArgs, "user");
FileSetArguments.setOutputPath(outputArgs,"output");
JavaPairRDD<LongWritable,Text> userRdd = jse.fromDataset("inputdataset",inputArgs);
FileSetArguments.setInputPaths(inputArgs, "transaction");
JavaPairRDD<LongWritable,Text> txRdd = jse.fromDataset("inputdataset",inputArgs);


below is erro message thrown

2017-06-26 11:34:50,090 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.s.AbstractSparkSubmitter@162] - Calling SparkSubmit for program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark 5f9d439b-5a8d-11e7-a413-acde48001122: [--master, local[2], --class, co.cask.cdap.app.runtime.spark.SparkMainWrapper, --conf, spark.app.name=sparkhdfs.AppSpark, --conf, spark.executor.memory=512m, --conf, spark.driver.memory=512m, --conf, spark.local.dir=/Users/nd2629/cdap/data/tmp/1498494890059-0, --conf, spark.driver.cores=1, --conf, spark.ui.port=0, --conf, spark.executor.cores=1, --conf, spark.metrics.conf=/Users/nd2629/cdap/data/tmp/1498494890059-0/metrics8183355214313707732.properties, --conf, spark.cdap.localized.resources=[], /Users/nd2629/cdap/data/tmp/1498494890059-0/emptyJob.jar, --userClass=sparkhdfs.JAVASparkHDFS]
2017-06-26 11:34:50,619 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@257] - Starting sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:0...
2017-06-26 11:34:50,620 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@262] - Started sparkhdfs.AppSpark-spark-tx http service on address /127.0.0.1:60599
2017-06-26 11:34:50,675 - WARN  [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:o.a.s.SparkConf@70] - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
2017-06-26 11:34:51,752 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:a.e.s.Slf4jLogger@80] - Slf4jLogger started
2017-06-26 11:34:52,927 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:Remoting@74] - Starting remoting
2017-06-26 11:34:53,437 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:Remoting@74] - Remoting started; listening on addresses :[akka.tcp://sparkDriver...@10.100.38.105:60657]
2017-06-26 11:34:54,631 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.SparkMetricsSink@51] - Using SparkMetricsSink for reporting metrics: {class=co.cask.cdap.app.runtime.spark.SparkMetricsSink}
2017-06-26 11:34:58,699 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.5f9d439b-5a8d-11e7-a413-acde48001122, dataset dataset:etl2629.inputdataset, accessType READ, component null, accessTime = 1498494898699
2017-06-26 11:34:59,406 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.5f9d439b-5a8d-11e7-a413-acde48001122, dataset dataset:etl2629.out, accessType WRITE, component null, accessTime = 1498494899406
2017-06-26 11:34:59,681 - DEBUG [SparkListenerBus:c.c.c.a.r.s.DefaultSparkExecutionContext@106] - Spark program=program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, runId=5f9d439b-5a8d-11e7-a413-acde48001122, jobId=0 starts with auto-commit=false on transaction Transaction{readPointer: 1498494898683000000, transactionId: 1498494898692000000, writePointer: 1498494898692000000, invalids: [1498357848719000000, 1498363599519000000, 1498402426936000000, 1498402526771000000, 1498404408367000000, 1498404439066000000, 1498404971830000000, 1498491200958000000], inProgress: [1498494190406000000], firstShortInProgress: 9223372036854775807, type: LONG, checkpointWritePointers: [], visibilityLevel: SNAPSHOT}
2017-06-26 11:34:59,683 - DEBUG [SparkListenerBus:c.c.c.a.r.s.SparkTransactionService@140] - Spark job started: JobTransaction{jobId=0, stageIds=[0, 1], transaction=Transaction{readPointer: 1498494898683000000, transactionId: 1498494898692000000, writePointer: 1498494898692000000, invalids: [1498357848719000000, 1498363599519000000, 1498402426936000000, 1498402526771000000, 1498404408367000000, 1498404439066000000, 1498404971830000000, 1498491200958000000], inProgress: [1498494190406000000], firstShortInProgress: 9223372036854775807, type: LONG, checkpointWritePointers: [], visibilityLevel: SNAPSHOT}}
2017-06-26 11:35:03,472 - DEBUG [Executor task launch worker-0:s.JAVASparkHDFS@55] - user line is 1234,naresh,Dallas,8037167180
2017-06-26 11:35:03,472 - DEBUG [Executor task launch worker-1:s.JAVASparkHDFS@55] - user line is 1234,naresh,Dallas,8037167180
2017-06-26 11:35:03,485 - DEBUG [Executor task launch worker-0:s.JAVASparkHDFS@55] - user line is 2343,Aparna,Hyderabad,9441652089
2017-06-26 11:35:03,486 - DEBUG [Executor task launch worker-0:s.JAVASparkHDFS@55] - user line is 1233,Chitti,Karimnangar,1234567
2017-06-26 11:35:03,486 - DEBUG [Executor task launch worker-0:s.JAVASparkHDFS@55] - user line is 3434,Kelly,Dallas,1233
2017-06-26 11:35:03,488 - DEBUG [Executor task launch worker-1:s.JAVASparkHDFS@55] - user line is 2343,Aparna,Hyderabad,9441652089
2017-06-26 11:35:03,489 - DEBUG [Executor task launch worker-1:s.JAVASparkHDFS@55] - user line is 1233,Chitti,Karimnangar,1234567
2017-06-26 11:35:03,489 - DEBUG [Executor task launch worker-1:s.JAVASparkHDFS@55] - user line is 3434,Kelly,Dallas,1233
2017-06-26 11:35:13,618 - ERROR [driver-heartbeater:o.a.s.u.Utils@95] - Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at sun.reflect.GeneratedMethodAccessor196.invoke(Unknown Source) ~[na:na]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) ~[na:1.8.0_121]
	at org.apache.spark.util.Utils$.deserialize(Utils.scala:92) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:437) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:427) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at scala.Option.foreach(Option.scala:236) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:427) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:425) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at scala.collection.Iterator$class.foreach(Iterator.scala:727) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[org.scala-lang.scala-library-2.10.4.jar:na]
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:425) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:470) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:470) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:470) [co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121]
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_121]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_121]
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_121]
	at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237) ~[na:1.8.0_121]
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) ~[na:1.8.0_121]
	at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204) ~[co.cask.cdap.spark-assembly-1.6.1.jar:na]
	... 32 common frames omitted
2017-06-26 11:35:41,535 - DEBUG [SparkListenerBus:c.c.c.a.r.s.SparkTransactionService@174] - Spark job ended: JobTransaction{jobId=0, stageIds=[0, 1], transaction=Transaction{readPointer: 1498494898683000000, transactionId: 1498494898692000000, writePointer: 1498494898692000000, invalids: [1498357848719000000, 1498363599519000000, 1498402426936000000, 1498402526771000000, 1498404408367000000, 1498404439066000000, 1498404971830000000, 1498491200958000000], inProgress: [1498494190406000000], firstShortInProgress: 9223372036854775807, type: LONG, checkpointWritePointers: [], visibilityLevel: SNAPSHOT}}
2017-06-26 11:35:42,132 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@2fbb79cd
2017-06-26 11:35:42,139 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Shutting down remote daemon.
2017-06-26 11:35:42,143 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remote daemon shut down; proceeding with flushing remote transports.
2017-06-26 11:35:42,175 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-3:a.r.RemoteActorRefProvider$RemotingTerminator@74] - Remoting shut down.
2017-06-26 11:35:42,185 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.SparkRuntimeEnv$@247] - Shutting down Server and ThreadPool used by Spark org.apache.spark.SparkContext@2fbb79cd
2017-06-26 11:35:42,187 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@274] - Stopping sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:60599...
2017-06-26 11:35:42,188 - INFO  [NettyHttpService STOPPING:c.c.h.NettyHttpService@288] - Stopped sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:60599
2017-06-26 11:35:42,189 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.s.AbstractSparkSubmitter@169] - SparkSubmit returned for program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark 5f9d439b-5a8d-11e7-a413-acde48001122
2017-06-26 11:35:42,193 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@717] - Running Spark shutdown hook org.apache.spark.util.SparkShutdownHookManager$$anon$2@7de41c59
2017-06-26 11:35:42,203 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@752] - BeanIntrospector.ctorParamNamesCache has been invalidated.
2017-06-26 11:35:42,203 - DEBUG [SparkRunnersparkhdfs.AppSpark:c.c.c.a.r.s.SparkRuntimeService@307] - Spark program completed: SparkRuntimeContext{id=program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, runId=5f9d439b-5a8d-11e7-a413-acde48001122}
2017-06-26 11:35:42,215 - DEBUG [pcontroller-program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@433] - Removing RuntimeInfo: Spark sparkhdfs.AppSpark 5f9d439b-5a8d-11e7-a413-acde48001122
2017-06-26 11:35:42,215 - DEBUG [pcontroller-program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.AbstractProgramRuntimeService@436] - RuntimeInfo removed: RuntimeInfo{programId=program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, twillRunId=null}









Naresh Dulam

unread,
Jun 26, 2017, 12:56:39 PM6/26/17
to CDAP User
Even i tried below code, which is also throwing same issue:


Map<String, String> userinputArgs = new HashMap();
Map<String, String> txinputArgs = new HashMap();
Map<String, String> outputArgs = new HashMap();

FileSetArguments.setInputPaths(userinputArgs,"user");
FileSetArguments.setInputPath(txinputArgs,"transaction");

FileSetArguments.setOutputPath(outputArgs,"output");
JavaPairRDD<LongWritable,Text> userRdd = jse.fromDataset("inputdataset",userinputArgs);

JavaPairRDD<LongWritable,Text> txRdd = jse.fromDataset("inputdataset",txinputArgs);










On Monday, June 26, 2017 at 11:37:23 AM UTC-5, Naresh Dulam wrote:
Hi Ali,

Not able to read using inputArgs "/". 

Attached is error message.

Directory structure is

<cdap>/namespace/<nameofnamespace>/data/cdap/user/file.txt
<cdap>/namespace/<nameofnamespace>/data/cdap/transaction/txns.txt


Another Question?

How can we create two different rdd's from two different paths as shown above directory structure?
I used below below code, but its not able to read data from transactions directory.


FileSetArguments.setInputPaths(inputArgs, "user");
FileSetArguments.setOutputPath(outputArgs,"output");
JavaPairRDD<LongWritable,Text> userRdd = jse.fromDataset("inputdataset",inputArgs);
FileSetArguments.setInputPaths(inputArgs, "transaction");
JavaPairRDD<LongWritable,Text> txRdd = jse.fromDataset("inputdataset",inputArgs);


below is erro message thrown

2017-06-26 11:34:50,090 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.s.AbstractSparkSubmitter@162] - Calling SparkSubmit for program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark 5f9d439b-5a8d-11e7-a413-acde48001122: [--master, local[2], --class, co.cask.cdap.app.runtime.spark.SparkMainWrapper, --conf, spark.app.name=sparkhdfs.AppSpark, --conf, spark.executor.memory=512m, --conf, spark.driver.memory=512m, --conf, spark.local.dir=/Users/nd2629/cdap/data/tmp/1498494890059-0, --conf, spark.driver.cores=1, --conf, spark.ui.port=0, --conf, spark.executor.cores=1, --conf, spark.metrics.conf=/Users/nd2629/cdap/data/tmp/1498494890059-0/metrics8183355214313707732.properties, --conf, spark.cdap.localized.resources=[], /Users/nd2629/cdap/data/tmp/1498494890059-0/emptyJob.jar, --userClass=sparkhdfs.JAVASparkHDFS]
2017-06-26 11:34:50,619 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@257] - Starting sparkhdfs.AppSpark-spark-tx http service on address localhost/127.0.0.1:0...
2017-06-26 11:34:50,620 - INFO  [NettyHttpService STARTING:c.c.h.NettyHttpService@262] - Started sparkhdfs.AppSpark-spark-tx http service on address /127.0.0.1:60599
2017-06-26 11:34:50,675 - WARN  [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:o.a.s.SparkConf@70] - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
2017-06-26 11:34:51,752 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2:a.e.s.Slf4jLogger@80] - Slf4jLogger started
2017-06-26 11:34:52,927 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:Remoting@74
] - Starting remoting
2017-06-26 11:34:53,437 - INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-4:Remoting@74] - Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSys...@10.100.38.105:60657]
2017-06-26 11:34:54,631 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.a.r.s.SparkMetricsSink@51] - Using SparkMetricsSink for reporting metrics: {class=co.cask.cdap.app.runtime.spark.SparkMetricsSink}
2017-06-26 11:34:58,699 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.5f9d439b-5a8d-11e7-a413-acde48001122, dataset dataset:etl2629.inputdataset, accessType READ, component null, accessTime = 1498494898699
2017-06-26 11:34:59,406 - DEBUG [spark-submitter-sparkhdfs.AppSpark-5f9d439b-5a8d-11e7-a413-acde48001122:c.c.c.d.m.w.BasicLineageWriter@63] - Writing access for run program_run:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark.5f9d439b-5a8d-11e7-a413-acde48001122, dataset dataset:etl2629.out, accessType WRITE, component null, accessTime = 1498494899406
2017-06-26 11:34:59,681 - DEBUG [SparkListenerBus:c.c.c.a.r.s.DefaultSparkExecutionContext@106] - Spark program=program:etl2629.sparkhdfs.-SNAPSHOT.spark.sparkhdfs.AppSpark, runId=5f9d439b-5a8d-11e7-a413-acde48001122, jobId=0 starts with auto-commit=false on transaction Transaction{readPointer: 1498494898683000000, transactionId: 1498494898692000000, writePointer: 1498494898692000000, invalids: [1498357848719000000, 1498363599519000000, 1498402426936000000, 1498402526771000000, 1498404408367000000, 1498404439066000000, 1498404971830000000, 1498491200958000000], inProgress: [1498494190406000000], firstShortInProgress: 9223372036854775807, type: LONG, checkpointWritePointers: [], visibilityLevel: SNAPSHOT}
2017-06-26 11:34:59,683 - DEBUG [SparkListenerBus:c.c.c.a.r.s.SparkTransactionService@140] - Spark job started: JobTransaction{jobId=0, stageIds=[0, 1], transaction=Transaction{readPointer: 1498494898683000000, transactionId: 1498494898692000000, writePointer: 1498494898692000000, invalids: [1498357848719000000, 1498363599519000000, 1498402426936000000, 1498402526771000000, 1498404408367000000, 1498404439066000000, 1498404971830000000, 1498491200958000000], inProgress: [1498494190406000000], firstShortInProgress: 9223372036854775807, type: LONG, checkpointWritePointers: [], visibilityLevel: SNAPSHOT}}
2017-06-26 11:35:03,472 - DEBUG [Executor task launch worker-0:s.JAVASparkHDFS@55] - user line is 1234,naresh,Dallas,8037167180
...

Ali Anwar

unread,
Jun 26, 2017, 2:03:01 PM6/26/17
to cdap...@googlegroups.com
Hi Naresh.

Could you clarify - when you say "which is also throwing same issue", are you referring to the logs you posted in the previous post? I assumed those logs were for the setInputPath("/") issue, since you said "Attached is error message", and there was no attachment.

Regards,

Ali Anwar

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.

To post to this group, send email to cdap...@googlegroups.com.

Naresh Dulam

unread,
Jun 26, 2017, 2:22:49 PM6/26/17
to CDAP User
Sorry, I missed to attach document. Let me summarize  issue which are facing now.


Issue 1:
Issue with input arguments "/".
 Attached document contains the code and error message.


Issue2:

Directory structure is
<cdap>/namespace/<nameofnamespace>/data/cdap/user/file.txt
<cdap>/namespace/<nameofnamespace>/data/cdap/transaction/txns.txt

used below 2 code snippets and which is throwing the exception pasted below

Code snippet1:
FileSetArguments.setInputPaths(inputArgs, "user");

JavaPairRDD<LongWritable,Text> userRdd = jse.fromDataset("inputdataset",inputArgs);

FileSetArguments.setInputPaths(inputArgs, "transaction");
JavaPairRDD<LongWritable,Text> txRdd = jse.fromDataset("inputdataset",inputArgs);

Code snippet2:

Map<String, String> userinputArgs = new HashMap();
Map<String, String> txinputArgs = new HashMap();
Map<String, String> outputArgs = new HashMap();

FileSetArguments.setInputPaths(userinputArgs,"user");
FileSetArguments.setInputPath(txinputArgs,"transaction");
JavaPairRDD<LongWritable,Text> userRdd = jse.fromDataset("inputdataset",userinputArgs);
JavaPairRDD<LongWritable,Text> txRdd = jse.fromDataset("inputdataset",txinputArgs);



Error message:

- show quoted text -









...

Ali Anwar

unread,
Jun 27, 2017, 1:29:37 PM6/27/17
to cdap...@googlegroups.com
For your first issue, I had filed a JIRA, and in the meantime, you can put subdirectory within the FileSet's base path.
Regarding your second issue, the code looks proper, and the error message does not look related, so I am surprised it is happening. I will try to reproduce the second issue.

Regards,

Ali Anwar

To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Naresh Dulam

unread,
Jun 28, 2017, 4:54:23 PM6/28/17
to CDAP User
Hi Ali,

What do you mean by below statement.

"For your first issue, I had filed a JIRA, and in the meantime, you can put subdirectory within the FileSet's base path"

I set the  FileSetArguments.setInputPaths(inputArgs, "/ ");  Thats what is my issue1 and attached the logs in my previous post.

Any update on second issue?



Thank you,
Naresh
...

Ali Anwar

unread,
Jun 28, 2017, 9:22:57 PM6/28/17
to cdap...@googlegroups.com
Hey Naresh.

Note that I still don't see an attachment in your previous post. Did you miss attaching it again?
Though not necessary, if you attach your project as a zip file, that would definitely accelerate the resolution, as I would be able to reproduce the issue much sooner.

Regards,

Ali Anwar

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.

Naresh Dulam

unread,
Jun 29, 2017, 11:38:22 AM6/29/17
to CDAP User
Attached document.


...
Program code.docx

Naresh Dulam

unread,
Jun 29, 2017, 11:39:37 AM6/29/17
to CDAP User

Please find attached document and which explains the code and issue. 

Ali Anwar

unread,
Jul 7, 2017, 3:58:12 PM7/7/17
to cdap...@googlegroups.com
Hey Naresh.

FileInputFormat's default value for the configuration parameter "mapreduce.input.fileinputformat.input.dir.recursive" is false.
Can you set this to true for your program? That should fix the NullPointerException.
You can use FileInputFormat.INPUT_DIR_RECURSIVE as the key.

Regards,

Ali Anwar

--
You received this message because you are subscribed to the Google Groups "CDAP User" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cdap-user+unsubscribe@googlegroups.com.
To post to this group, send email to cdap...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages