Hello,
what I try to do is extracting some data of a JanusGraph DB with a Cassandra storage backend using an Gremlin OLAP (TinkerPop 3.4.3) traversal with the SparkGraphComputer and persist the result to a Spark RDD, convert it to a Dataset and do some Spark calculation on the extracted data. Also I am using Java. To get a little more precise I am using a TraversalVertexProgram as a VertexProgram and I am able to get a RDD as an output, but it is just the input graph data, which I saw here
and the corresponding issue here
is known to be some bug.
Nonetheless there is a workaround described in the issue section,
but I was not able up to now to implement a working Java application to solve my problem.
To test the RDD persistence on a small exmample I used the toy graph "tinkerpop-modern" and the properties file "hadoop-gryo.properties". I slightly modified the properties file so I have:
####################################hadoop-gryo.properties##################
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.jarsInDistributedCache=true
gremlin.spark.persistContext=true
gremlin.hadoop.graphWriter=import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
gremlin.hadoop.inputLocation=/path_to_janusgraph/janusgraph-0.4.0-hadoop2/data/tinkerpop-modern.kryo
gremlin.hadoop.outputLocation=output
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.kryo.registrator=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator
############################################################################
My Java code looks like this:
##############################Java code#####################################
//read example graph
Graph graph = GraphFactory.open("/path_to_janusgraph/janusgraph-0.4.0-hadoop2/conf/hadoop-graph/hadoop-gryo.properties");
//calculate TraversalVertexProgram with SparkGraphComputer
graph.compute(SparkGraphComputer.class)
.program(TraversalVertexProgram.build()
.traversal(
//some example traversal
graph.traversal().V().has("age", P.gt(30)).asAdmin()
).create(graph))
.persist(GraphComputer.Persist.VERTEX_PROPERTIES)
.submit()
.get();
//get persisted RDD
RDD rdd = Spark.getRDD("output/~g");
//do some example calculation, which shows just the input graph was persisted
System.out.println("RDD size: "+ rdd.count());
#############################################################################
Whenever I tried to use the CloneVertexProgram workaround, I just got some copy of the input graph RDD.
Thank you for your help!