Titan 1.1 with Spark 1.6.0 Import issues

486 views
Skip to first unread message

Cristian COLA

unread,
Feb 9, 2016, 4:41:57 AM2/9/16
to Aurelius
Hi,

I'm trying to import a CSV file in titan using SparkGraphComputer.

I have a Spaker Master Node and a Spark Slave Node for the moment (planing to add more workers but for testing should be ok).

I have the import script and I've changed the following properties:

#spark.master=local[4]
spark.master=spark://192.168.0.24:7077
#spark.executor.memory=4g
spark.executor.memory=8g

The I run:
graph = GraphFactory.open("conf/hadoop-graph/hadoop-script.properties")
blvp = BulkLoaderVertexProgram.build().writeGraph("conf/titan-hbase-es.properties").create(graph)
graph.compute(SparkGraphComputer).program(blvp).submit().get()

After the job starts I have the following exception:
11:03:30 ERROR org.apache.spark.util.SparkUncaughtExceptionHandler  - Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
java
.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6b11c39f rejected from java.util.concurrent.ThreadPoolExecutor@6bbee6e4[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]
 at java
.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
 at java
.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
 at java
.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
 at java
.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:96)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:95)
 at scala
.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala
.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala
.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala
.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala
.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala
.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint.tryRegisterAllMasters(AppClient.scala:95)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint.org$apache$spark$deploy$client$AppClient$ClientEndpoint$$registerWithMaster(AppClient.scala:121)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:132)
 at org
.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
 at org
.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:124)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at java
.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at java
.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)

With the local[4] property it's working ok

Thanks
Cristian

Daniel Kuppitz

unread,
Feb 9, 2016, 8:29:03 AM2/9/16
to aureliu...@googlegroups.com
Can you see the job in Spark's Web UI? If not, then your Spark setup is messed up. However, if the job makes it to the UI, inspect the logs there, they usually give more details.

Cheers,
Daniel


--
You received this message because you are subscribed to the Google Groups "Aurelius" group.
To unsubscribe from this group and stop receiving emails from it, send an email to aureliusgraph...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/aureliusgraphs/9fc64004-fa68-4630-aa1d-eb65b44cd440%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Cristian COLA

unread,
Feb 11, 2016, 5:06:20 AM2/11/16
to Aurelius
I've changed the version of spark to 1.5.1 and now I can see the job in the Spark's Web UI. 

After the job started I have this exception:
ROR org.apache.spark.scheduler.TaskSetManager  - Task 0 in stage 1.0 failed 4 times; aborting job
org
.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.0.11): java.lang.RuntimeException: GraphFactory could not instantiate this Graph implementation [class com.thinkaurelius.titan.core.TitanFactory]
 at org
.apache.tinkerpop.gremlin.structure.util.GraphFactory.open(GraphFactory.java:82)
 at org
.apache.tinkerpop.gremlin.structure.util.GraphFactory.open(GraphFactory.java:70)
 at org
.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram.workerIterationStart(BulkLoaderVertexProgram.java:171)
 at org
.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor.lambda$executeVertexProgramIteration$3bd94f1a$1(SparkExecutor.java:84)
 at org
.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:189)
 at org
.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:189)
 at org
.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at org
.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
 at org
.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org
.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org
.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at org
.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at org
.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
 at org
.apache.spark.rdd.RDD.iterator(RDD.scala:264)
 at org
.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at org
.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org
.apache.spark.scheduler.Task.run(Task.scala:88)
 at org
.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
 at sun
.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun
.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun
.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java
.lang.reflect.Method.invoke(Method.java:497)
 at org
.apache.tinkerpop.gremlin.structure.util.GraphFactory.open(GraphFactory.java:78)
 
... 20 more
Caused by: com.thinkaurelius.titan.core.TitanException: Could not open global configuration
 at com
.thinkaurelius.titan.diskstorage.Backend.getStandaloneGlobalConfiguration(Backend.java:451)
 at com
.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.<init>(GraphDatabaseConfiguration.java:1322)
 at com
.thinkaurelius.titan.core.TitanFactory.open(TitanFactory.java:94)
 at com
.thinkaurelius.titan.core.TitanFactory.open(TitanFactory.java:74)
 
... 25 more
Caused by: com.thinkaurelius.titan.diskstorage.TemporaryBackendException: Temporary failure in storage backend
 at com
.thinkaurelius.titan.diskstorage.hbase.HBaseStoreManager.ensureTableExists(HBaseStoreManager.java:759)
 at com
.thinkaurelius.titan.diskstorage.hbase.HBaseStoreManager.ensureColumnFamilyExists(HBaseStoreManager.java:831)
 at com
.thinkaurelius.titan.diskstorage.hbase.HBaseStoreManager.openDatabase(HBaseStoreManager.java:456)
 at com
.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager.openDatabase(KeyColumnValueStoreManager.java:29)
 at com
.thinkaurelius.titan.diskstorage.Backend.getStandaloneGlobalConfiguration(Backend.java:449)
 
... 28 more
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString
 at org
.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:210)
 at org
.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:121)
 at org
.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
 at org
.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:264)
 at org
.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:169)
 at org
.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:164)
 at org
.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:107)
 at org
.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:736)
 at org
.apache.hadoop.hbase.catalog.MetaReader.fullScan(MetaReader.java:539)
 at org
.apache.hadoop.hbase.catalog.MetaReader.tableExists(MetaReader.java:310)
 at org
.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:279)
 at org
.apache.hadoop.hbase.client.HBaseAdmin.tableExists(HBaseAdmin.java:293)
 at com
.thinkaurelius.titan.diskstorage.hbase.HBaseAdmin0_98.tableExists(HBaseAdmin0_98.java:93)
 at com
.thinkaurelius.titan.diskstorage.hbase.HBaseStoreManager.ensureTableExists(HBaseStoreManager.java:753)
 
... 32 more
Caused by: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString
 at org
.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:930)
 at org
.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:434)
 at org
.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:297)
 at org
.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:157)
 at org
.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:57)
 at org
.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
 
... 44 more

Thanks,
Cristian

Daniel Kuppitz

unread,
Feb 11, 2016, 9:45:03 AM2/11/16
to aureliu...@googlegroups.com
Hmm, I've never seen that before, but to be honest I also don't spend much time looking at HBase errors.

Anybody else? Jason?


Cristian COLA

unread,
Feb 16, 2016, 7:57:17 AM2/16/16
to Aurelius
I've added in the conf/spark-env.sh file the the jar files of the titan as SPARK_CLASSPATH and I have the same behaviour 

The titan and hbase configurations are ok, I can create schema, vertices, edges. The spark cluster looks ok, I've execute some little examples and works.

Any tips would be much appreciated 

Thanks
Cristian
...

David

unread,
Feb 16, 2016, 10:43:26 AM2/16/16
to Aurelius
Christian,

Have you run the documented bulk load example in your environment, but using your master/slave spark cluster and made sure that works ?
If you have not, I recommend starting with the basic example and once that works, moving on to csv files and ES.

Here is a working hbase example from my env:

-------
Here is the titan-hbase.properties:

gremlin.graph=com.thinkaurelius.titan.core.TitanFactory
storage.backend=hbase

# zookeeper server addresses comma separated
storage.hostname=10.100.100.10:2181
storage.hbase.table=titan
storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure

---------
Here is the hadoop-load.properties:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.inputLocation=./data/grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true

spark.master=spark://yyc.x25.cloudy.com:7077
spark.executor.memory=8g
spark.serializer=org.apache.spark.serializer.KryoSerializer

----
(full disclosure  - I am using Spark 1.5.2 with a later version of TinkerPop/Titan...but edited this in place to look like what you might see)
Here is the Spark classpath set in the spark-env.sh file under spark-1.5.1/conf:

SPARK_CLASSPATH=$HBASECONF:$TITANLIB/jersey-server-1.9.jar:$TITANLIB/titan-core-1.1.0-SNAPSHOT.jar:$TITANLIB/gremlin-console-3.1.1-incubating.jar:$TITANLIB/gremlin-core-3.1.1-incubating.jar:$TITANLIB/gremlin-driver-3.1.1-incubating.jar:$TITANLIB/gremlin-groovy-3.1.1-incubating.jar:$TITANLIB/gremlin-server-3.1.1-incubating.jar:$TITANLIB/gremlin-shaded-3.1.1-incubating.jar:$TITANLIB/hadoop-gremlin-3.1.1-incubating.jar:$TITANLIB/spark-gremlin-3.1.1-incubating.jar:$TITANLIB/gremlin-shaded-3.1.1-incubating.jar:$TITANLIB/javatuples-1.2.jar:$TITANLIB/titan-hbase-1.1.0-SNAPSHOT.jar:$TITANLIB/htrace-core-3.1.0-incubating.jar:$TITANLIB/tinkergraph-gremlin-3.1.1-incubating.jar:$TITANLIB/reflections-0.9.9-RC1.jar:$TITANLIB/hppc-0.7.1.jar:$TITANLIB/high-scale-lib-1.1.2.jar:$TITANLIB/titan-hadoop-1.1.0-SNAPSHOT.jar:$TITANLIB/hbase-server-1.1.1.jar:$MEGADIR/*

MEGADIR in this case contains the hbase and hdfs jar file contents.

I would not put the entire Titan lib directory into the Spark classpath.

-----
Make sure HADOOP_GREMLIN_LIBS is set correctly for your env:
HADOOP_GREMLIN_LIBS=/home/graphie/titan-1.1.0-SNAPSHOT-hadoop2/lib
You may also need to include the ext dir for the graph computer in your HGL path (I do not because my build takes care of that for me).

----

Run the basic example:

[graphie@hb2 titan-1.1.0-SNAPSHOT-hadoop2]$ bin/gremlin.sh

                \,,,/
               (o o)
-----oOOo-(3)-oOOo-----
plugin activated: aurelius.titan
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.hadoop
plugin activated: tinkerpop.spark
plugin activated: tinkerpop.tinkergraph
gremlin> :load data/grateful-dead-titan-schema.groovy
==>true
==>true
gremlin> graph = TitanFactory.open('conf/titan-hbase.properties')
hbaseVersion is: 1.1.1
==>standardtitangraph[hbase:[10.110.100.10:2181]]
gremlin> defineGratefulDeadSchema(graph)
==>null
gremlin> graph.close()
==>null
gremlin> hdfs.copyFromLocal('data/grateful-dead.kryo','data/grateful-dead.kryo')
==>null
gremlin> graph = GraphFactory.open('conf/hadoop/hadoop-load.properties')
==>hadoopgraph[gryoinputformat->nulloutputformat]
gremlin> blvp = BulkLoaderVertexProgram.build().writeGraph('conf/titan-hbase.properties').create(graph)
==>BulkLoaderVertexProgram[bulkLoader=IncrementalBulkLoader,vertexIdProperty=bulkLoader.vertex.id,userSuppliedIds=false,keepOriginalIds=true,batchSize=0]
gremlin> graph.compute(SparkGraphComputer).program(blvp).submit().get()
09:18:27 WARN  org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer  - class org.apache.hadoop.mapreduce.lib.output.NullOutputFormat does not implement PersistResultGraphAware and thus, persistence options are unknown -- assuming all options are possible
09:18:29 WARN  org.apache.spark.metrics.MetricsSystem  - Using default name DAGScheduler for source because spark.app.id is not set.
==>result[hadoopgraph[gryoinputformat->nulloutputformat],memory[size:0]]
gremlin> g=graph.traversal()
==>graphtraversalsource[hadoopgraph[gryoinputformat->nulloutputformat], standard]
gremlin> g.V().count()
==>808
gremlin>

----
If something does not work, first place to go is under Spark - look at the "logs" directory (or where ever you configured logs to go)
and look at the "work" directory and find the output for your specific run.  You will often see class not found exceptions in there
that help you adjust your spark-env.sh file.  Restart spark. Rinse. Repeat.
...

Cristian COLA

unread,
Feb 17, 2016, 11:40:01 AM2/17/16
to Aurelius
David,

Thanks for the nice tutorial, I've created all the configurations except installing the spark-gremlin as extensions in the ext directory. 

I don't now if that is the issue but I'm having the same exception as before with this example

Cristian
...

David

unread,
Feb 18, 2016, 3:38:34 PM2/18/16
to Aurelius
This part of your stack trace:


Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.IllegalAccessError: com/google/protobuf/HBaseZeroCopyByteString

suggests there is a classpath problem or conflict of some kind. 

Double check that your hbase installation hbase/lib directory is in the classpath of Spark.
Notice in the example I pasted that I used something call MEGADIR where I put all these jars and hdfs jars to include them in the classpath.

Double check the hbase-protocol jar file specifically - try coding that explicitly into the spark classpath to see if that changes the error
you are getting.  If it does, then your hbase/lib probably isn't in your path.   I have my hbase/lib jars included at the end of my classpath and that works
for me, but you can also experiment with the order of your classpath if you are sure they are already in the classpath.

Be sure not to dump the entire TItan lib directory into your classpath. 
...

Cristian COLA

unread,
Feb 19, 2016, 4:40:06 AM2/19/16
to Aurelius
Hi David,

I've double checked the hbase-protocl jar and I coded explicity into the spark classpath. Now I don't have the exception but the spark job hangs.

I've checked the sparks web UI and on the Job detail I have 3 Stages:

Completed :
Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle ReadShuffle Write
0mapToPair at InputFormatRDD.java:46+details
2016/02/19 11:22:3216 s
1/1
249.0 KB
Active:

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle ReadShuffle Write
1(kill)flatMapToPair at SparkExecutor.java:109+details
2016/02/19 11:22:4814 min
0/1
249.0 KB
Pending:
Stage Id ▴DescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle ReadShuffle Write
2foreachPartition at SparkExecutor.java:134+details
UnknownUnknown
0/1
It seems that on the flatMapToPair it hangs.

Thanks
Cristian
...

Cristian COLA

unread,
Feb 19, 2016, 10:27:09 AM2/19/16
to Aurelius
After letting the spark job run for about 50 minutes I had an exception :)

org.apache.hadoop.hbase.client.RpcRetryingCaller@7963a6bd, org.apache.hadoop.hbase.ipc.RpcClient$FailedServerException: This server is in the failed servers list: titan-test/192.168.0.24:16201

on the hbase server looking at the /etc/hosts I've notice that the hostname was on 127.0.1.1 and I've changed it to the local IP 

Checked that hbase was listen on the IP:16201 not on 127.0.1.1:16201 (use  netstat -anp | grep LISTEN )

restarted the spark job and now it's working

Thanks guys for all the useful suggestions

Cristian
...
Reply all
Reply to author
Forward
0 new messages