Calling a SparkGraphComputer from within Spark

2,145 views
Skip to first unread message

robk...@gmail.com

unread,
Mar 17, 2017, 7:50:31 AM3/17/17
to JanusGraph users list
Hi,

I have a Spark-based program, which writes vertices and edges to a JanusGraph cluster (Cassandra backend).  Once the write is complete, I would like to execute an OLAP traversal for all vertices, using a SparkGraphComputer.

However, in all the examples I can find, Spark is being called externally from the Groovy console, using a config file with the Spark cluster's address.  I would like to keep the process coordination all in my Spark program, is there a way to achieve this?  Or should my first Spark program be triggering a second Spark program in the same cluster?

Thanks for any help,
Rob

HadoopMarc

unread,
Mar 17, 2017, 5:31:24 PM3/17/17
to JanusGraph users list
Hi Rob,

Documentation is not abundant on this, I agree. So I read through the spark-gremlin source code and saw that SparkGraphComputer can reuse the SparkContext using the gremlin.spark.persistStorageLevel property. If you set this property and get your SparkContext with one of gremlin-spark's static Spark.create() methods, I would expect that your own jobs and the SparkGraphComputer jobs run within the same SparkContext.

HTH,    Marc


Op vrijdag 17 maart 2017 12:50:31 UTC+1 schreef Rob Keevil:

robk...@gmail.com

unread,
Mar 19, 2017, 6:05:14 AM3/19/17
to JanusGraph users list
Thanks (again!) for the quick response.  I've got Gremlin to use the existing SparkContext successfully.

I'm trying to test this, however I'm also finding triggering the traversal from Scala difficult. I would have thought the Scala equivalent of the current Groovy example (gremlin> g = graph.traversal().withComputer(SparkGraphComputer)) would be:
val g = graph.traversal().withComputer(classOf[SparkGraphComputer])
println
(g.V().count().next())

However JanusGraphBlueprintsGraph is doing an explicit check that only the FulgoraGraphComputer can be used in this way (line 129).  

withComputer also has a method signature of Computer (i.e Object not Class),  but instantiating a SparkGraphComputer needs a HadoopGraph, not a JanusGraph.

Once I get this up and running ill submit a full code example for others to use in future.

Thanks,
Rob



HadoopMarc

unread,
Mar 19, 2017, 6:47:30 AM3/19/17
to JanusGraph users list
Hi Rob,

Yes, you are diving in, I read. JanusGraph data are available as HadoopGraph using the HBaseInputFormat and CassandraInputFormat classes. You can find examples on the old Titan forum, e.g.

https://groups.google.com/forum/#!searchin/aureliusgraphs/read-cassandra%7Csort:relevance/aureliusgraphs/CJnT05-m_cQ/z6JcjKUxCgAJ

Cheers,    Marc

Op zondag 19 maart 2017 11:05:14 UTC+1 schreef Rob Keevil:

robk...@gmail.com

unread,
Mar 19, 2017, 12:39:54 PM3/19/17
to JanusGraph users list
Hi Marc,

Managed to get this working.  The final code ended up being simple, but the configuration properties you need to set has changed across different versions of JanusGraph/Tinkerpop and finding the right combination was very difficult.  Also strangely I needed to use a GraphFactory, not a JanusGraphFactory.

For Tinkerpop 3.3 with JanusGraph 1.0-SNAPSHOT (tinkerpop 3.3 patched in via pom), and Cassandra 2.2.9, the following code works for me within my existing SparkSession:

val gremlinSpark = Spark.create(spark.sparkContext)
val sparkComputerConnection
= GraphFactory.open(getSparkConfig)
val traversalSource
=
sparkComputerConnection.traversal().withComputer(classOf[SparkGraphComputer])
println(graphTraversalSource.V().count().next())

sparkComputerConnection.close()

def getSparkConfig: BaseConfiguration = {
 
val conf = new BaseConfiguration()
  conf
.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
  conf.setProperty("gremlin.hadoop.graphInputFormat", "org.janusgraph.hadoop.formats.cassandra.CassandraInputFormat")
  //    ####################################
  //    # Cassandra Cluster Config         #
  //    ####################################
  conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cassandrathrift")
  conf
.setProperty("storage.backend", "cassandra")
  conf
.setProperty("storage.hostname", "127.0.0.1")
  conf
.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner")
  //    ####################################
  //    # Spark Configuration              #
  //    ####################################
  conf.setProperty("spark.master", "local[4]")
  conf
.setProperty("gremlin.spark.persistContext", "true")
  conf
.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer")

  conf
}




robk...@gmail.com

unread,
Mar 19, 2017, 2:49:41 PM3/19/17
to JanusGraph users list
Last battle before I think this is all done, I need to extract the output without collecting results to the driver and exploding the memory there.

Gremlin has a page at http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark on how to retrieve the result as a persisted RDD  However, their calculation uses a vertex program, which can name the step using memoryKey('clusterCount').  A regular traversal doesn't seem to have this option, and Spark logs that it removes the RDD after the traversal.  Do you know of any way to access this RDD?

(I've set the required gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD property).

HadoopMarc

unread,
Mar 20, 2017, 3:49:16 PM3/20/17
to JanusGraph users list
Hi Rob,

It sound like your battling skills are OK!  I have never used the PersistedOutputRDD option myself, but if your are stuck you could also try the 
org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat class and the output location. This just writes the query output to hdfs and at least keeps you going.

Btw, I assumed you did not miss the
graph.configuration().setProperty('gremlin.spark.persistContext',true)
part of the reference section you linked to. Did you also try the option with the PersistedOutputRdd from the gremlin console or only from your scala program?

Cheers,    Marc

Op zondag 19 maart 2017 19:49:41 UTC+1 schreef Rob Keevil:

Jun(Terry) Yang

unread,
Mar 22, 2017, 7:08:13 AM3/22/17
to JanusGraph users list
Hi Rob,

I went through the tinkerpop code, just PageRankMapReduce, ClusterCountMapReduce, ClusterPopulationMapReduce has memoryKey function. 
(I found the description "We still recommend users call persist on the resulting RDD if they plan to reuse it." in spark doc http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence)
Not sure if this is the design.

After running PeerPressureVertexProgram sample(http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark) I saw 2 RDDs, and the result of the sample is 2(integer)
gremlin> spark.ls()
==>output/clusterCount [Memory Deserialized 1x Replicated]
==>output/~g [Memory Deserialized 1x Replicated]
gremlin> spark.head('output', 'clusterCount', PersistedInputRDD)
==>2
 
Then I tried the read these RDDs with gremlin.hadoop.graphReader=PersistedInputRDD.class.getCanonicalName():
a).I failed to read "output/clusterCount" with excretion: java.lang.ClassCastException: java.lang.Integer cannot be cast to org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable 
    The integer value should be read at this case, but the graph structure can't accept it, so I guess some spark program may access this persistence RDD.
b).And successful with "output/~g"
gremlin> graph2 = GraphFactory.open('conf/hadoop-graph/hadoop-gryo.properties')
==>hadoopgraph[gryoinputformat->gryooutputformat]
gremlin> graph2.configuration().setProperty('gremlin.hadoop.graphReader', PersistedInputRDD.class.getCanonicalName())
==>null
gremlin> graph2.configuration().setProperty('gremlin.hadoop.inputLocation', 'output/~g')
==>null
gremlin> 
gremlin> g2.V().valueMap()
==>[gremlin.peerPressureVertexProgram.cluster:[1], name:[josh], age:[32]]
==>[gremlin.peerPressureVertexProgram.cluster:[1], name:[marko], age:[29]]
==>[gremlin.peerPressureVertexProgram.cluster:[6], name:[peter], age:[35]]
==>[gremlin.peerPressureVertexProgram.cluster:[1], name:[lop], lang:[java]]
==>[gremlin.peerPressureVertexProgram.cluster:[1], name:[ripple], lang:[java]]
==>[gremlin.peerPressureVertexProgram.cluster:[1], name:[vadas], age:[27]]


Hope this will help you~


Thanks!
Terry

robk...@gmail.com

unread,
Mar 24, 2017, 5:10:23 PM3/24/17
to JanusGraph users list

Thanks for the responses. Marc, yes i'd got that property set.

Terry, I tried running the peer pressure program as suggested, and saw outputRDDs, so this was a great test that i'd got my configuration properties set correctly, thanks.

However I really needed to run Gremlin queries, not vertex programs.  In the end I found a way to get this working by making my queries into programs at runtime, using the following method:

val computerResult = SparkGraphComputerFactory.computeResult("g.V().hasLabel('startingVertex').as('start').out('compares').as('end').select('end')", "gremlin-groovy", "cassandraSpark", "TestGraph")

def
computeResult(gremlinQuery: String, gremlinLanguage: String, computerConnectionName: String, outputName: String): RDD[(Long, VertexWritable)] = {

 
val computerConnection = JanusGraphConnectionFactory.getConnection(computerConnectionName)
 
val graphComputer = getComputer(computerConnection, outputName)

  graphComputer
.program(TraversalVertexProgram.build()
   
.traversal(
      computerConnection
.traversal().withComputer(graphComputer.getClass),
      gremlinLanguage
,
      gremlinQuery
)
   
.create(computerConnection))
   
.submit()
   
.get()

 
Spark.getRDD(outputName + "/~g").asInstanceOf[RDD[(Long, VertexWritable)]]
}

def getComputer(connection: Graph, outputName: String): GraphComputer = {
  connection
.configuration().addProperty("gremlin.hadoop.outputLocation", outputName)
  connection
.compute(classOf[SparkGraphComputer])
   
.result(GraphComputer.ResultGraph.NEW)
   
.persist(GraphComputer.Persist.VERTEX_PROPERTIES)
}

computerResult gives me a nice RDD of the result of this Gremlin query (Wahoooo).

Next bit of strangeness to work out is that computerResult contains the "start"s in my query, as well as the "end"s.  Also, I really want it to group the "end"s by their "start", so that I can run comparisons on the groups of "end"s in the next stage of my program.

Samik Raychaudhuri

unread,
Mar 25, 2017, 2:29:45 AM3/25/17
to janusgra...@googlegroups.com
This is an interesting discussion.

@Rob: I am assuming that since you are using tinkerpop version above v3.2.x, you did not have to install/use hadoop in the mix to get to this point. Is that correct?

Thanks.
-Samik
--
You received this message because you are subscribed to the Google Groups "JanusGraph users list" group.
To unsubscribe from this group and stop receiving emails from it, send an email to janusgraph-use...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

robk...@gmail.com

unread,
Mar 25, 2017, 4:45:24 AM3/25/17
to JanusGraph users list
Yes, I haven't used Hadoop at all in this project, Cassandra is the only storage currently.

ddev...@gmail.com

unread,
Nov 21, 2017, 4:05:11 PM11/21/17
to JanusGraph users
Hey Rob,

I am also trying to do similar thing... can you share your final sample code for reference?

Thanks,
Devang.

prasad.d...@gmail.com

unread,
Jan 12, 2018, 2:46:11 PM1/12/18
to JanusGraph users
Hi Rob,

Cheers for battling with JanusGraph to make it Spark Friendly.

We are struggling with the same issue, Can you please share you complete code to understand it better.

Thanks in advance.

Regards,
Prasad

Ekta Sharma

unread,
Jul 17, 2018, 1:07:39 PM7/17/18
to JanusGraph users
Hello, 

Please let me know if a full code example was posted by Rob. 

Thanks!
Ekta

akhilesh singh

unread,
Apr 19, 2019, 10:20:04 AM4/19/19
to JanusGraph users
Hi Rob,
i followed the below steps provided by you .
getting below exception while Spark.getRDD(outputName + "/~g")
The named RDD does not exist in the Spark context

can you please share read-cassandra-3.propteries file
Reply all
Reply to author
Forward
0 new messages