Edge Filter in Janusgraph when working with Spark

29 views
Skip to first unread message

rafi ansari

unread,
Jun 8, 2020, 9:08:54 AM6/8/20
to JanusGraph developers
Hi All,

I am currently working on using Janusgraph in batch mode using Spark.

I am facing a problem on filtering the edges by label.

Below are the specifications:
Spark = 2.4.5
Janusgraph = 0.5.0

Below is the configuration file for Spark:

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")
conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cql.CqlInputFormat")
conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.hadoop.mapreduce.lib.output.NullOutputFormat")
conf.setProperty("spark.cassandra.connection.host", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cql")
conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.storage.port", 9042)
conf.setProperty("janusgraphmr.ioformat.conf.storage.cql.keyspace", "graph_db_1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.backend", "elasticsearch")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.hostname", "127.0.0.1")
conf.setProperty("janusgraphmr.ioformat.conf.index.search.port", 9200)
conf.setProperty("janusgraphmr.ioformat.conf.index.search.index-name", "graph_1")
conf.setProperty("cassandra.input.partitioner.class","org.apache.cassandra.dht.Murmur3Partitioner")
conf.setProperty("cassandra.input.widerows",true)
conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.setProperty("spark.kryo.registrator", "org.janusgraph.hadoop.serialize.JanusGraphKryoRegistrator")

Below is the Spark code using newAPIHadoopRDD 


val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)

val rdd: RDD[(NullWritable, VertexWritable)] =spark.sparkContext.newAPIHadoopRDD(hadoopConfiguration,hadoopConfiguration.
getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).
asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],classOf[NullWritable], classOf[VertexWritable])

The above lines give an RDD as output.

rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.NullWritable, org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable)]

rdd.map{case (x,y)=>y.asInstanceOf[VertexWritable]}

res17: Array[String] = Array(v[8344], v[12440], v[4336], v[4320], v[4136], v[8416], v[8192], v[4248], v[4344], v[8432], v[12528], v[4096])

From the res17 above, not sure how to filter the edges by labels


TIA

Regards

Rafi
Reply all
Reply to author
Forward
0 new messages