How to scan all the nodes using spark (with the hbase backend)

360 views
Skip to first unread message

dvdg...@gmail.com

unread,
Jul 3, 2018, 10:10:31 AM7/3/18
to JanusGraph users
I'm trying to understand how to use Spark and newHadoopAPI using the JG HbaseInputFormat as a starting point for possibly scanning and/or applying a function to all the nodes of a graph by using graph.
I'd like to bypass the Spark OLAP support and trying to access directly the vertexes and the edges directly from Spark.
I think that this could be a good starting point to implement a direct mapping between Spark GraphX and JG as an additional parallel computing platform besides the Tinkerpop's OLAP SPark Computer.
What do you think? Is it possible to have any clue on how to use the newHadoopAPi in combination with the HbaseInputFormat for building and RDD[Vertex]?
Any suggestion would be greatly welcomed.
David

HadoopMarc

unread,
Jul 3, 2018, 2:25:50 PM7/3/18
to JanusGraph users
Hi David,

I do not know, but be sure to check:
  1. http://tinkerpop.apache.org/docs/current/reference/#interacting-with-spark which shows how to get the graph as RDD (unless you really want to get rid of the TinkerPop deps)
  2. the upcoming HBaseTableSnapshotInputFormat in JanusGraph 0.3.0
HTH,    Marc

Op dinsdag 3 juli 2018 16:10:31 UTC+2 schreef dvdg...@gmail.com:

dvdg...@gmail.com

unread,
Jul 4, 2018, 4:44:02 AM7/4/18
to JanusGraph users
Hi Marc,
I gave a look and that was already under my radar. I wonder if there is a way to by pass completely the tinkerpop layer and to use the input format for getting an RDD[Vertex] only combining the newHadoopAPI and the proper input format.

HadoopMarc

unread,
Jul 5, 2018, 9:05:22 AM7/5/18
to JanusGraph users
Hi David,

It seems no one did this before, but HBaseInputFormat really implement the org.apache.hadoop.mapreduce.InputFormat that is required by org.apache.spark.rdd.NewHadoopRDD.

So, I would say, give it a go and come back here if you get stuck.

The easier way using more of TinkerPop is also discussed in:


Cheers,   Marc

Op woensdag 4 juli 2018 10:44:02 UTC+2 schreef dvdg...@gmail.com:

Jeff Callahan

unread,
Jul 5, 2018, 10:12:17 PM7/5/18
to JanusGraph users
Hi -

I recently did this same thing.  Here is the code I used to get it working.  I ran into some IO problems related to the TinkerPop Graph types (StarVertex etc).  I needed to move on to other things so I never investigated deeply enough to understand the root cause, instead writing my own POJO style VertexDescriptor class to work around the problem.  I'm fairly new to spark so it's possible I'm not using best patterns and practices below but it does work.

    // Spark Configuration Options

   
SparkConf sparkConfig = new SparkConf();
    sparkConfig
.set("spark.master", "SPARK_MASTER_HOSTNAME_HERE");
    sparkConfig
.set("spark.driver.maxResultSize", "4g");
    sparkConfig
.set("spark.driver.memory", "4g");
    sparkConfig
.set("spark.executor.memory", "4g");
   
String thisJar = NetworkAnalysisComputer.class.getProtectionDomain().getCodeSource().getLocation().toString();
    sparkConfig
.set("spark.jars", thisJar);
    sparkConfig
.set("spark.driver.userClassPathFirst", "false");
    sparkConfig
.set("spark.executor.userClassPathFirst", "false");
    sparkConfig
.set("spark.cores.max", "16");
   
   
// Hadoop I/O Input Options
   
Configuration hadoopConfig = new PropertiesConfiguration();
    hadoopConfig
.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "CASSANDRA_HOSTNAME_HERE");
    hadoopConfig
.setProperty("janusgraphmr.ioformat.conf.storage.backend", "cassandrathrift");
    hadoopConfig
.setProperty("cassandra.input.partitioner.class", "org.apache.cassandra.dht.Murmur3Partitioner");
    hadoopConfig
.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph");
    hadoopConfig
.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat");
    hadoopConfig
.setProperty("gremlin.hadoop.graphWriter", "org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat");
   
   
SparkSession spark =
       
SparkSession.builder()
                   
.appName("NetworkAnalysisComputer")
                   
.config(sparkConfig)
                   
.getOrCreate();
   
   
JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext());
   
Encoder<VertexDescriptor> vertexDescriptorEncoder = Encoders.bean(VertexDescriptor.class);
   
Encoder<PathDetails> pathDetailsEncoder = Encoders.bean(PathDetails.class);
   
JavaRDD<VertexDescriptor> rdd = new InputFormatRDD().readGraphRDD(hadoopConfig, context).map(v -> new VertexDescriptor(v._2().get()));
   
   
Dataset<VertexDescriptor> typedDataSet = spark.createDataset(JavaRDD.toRDD(rdd), vertexDescriptorEncoder);

Thanks for the pointer to the upcoming Input Format in 0.3, Marc.

Thanks,
jeff.

Jeff Callahan

unread,
Jul 5, 2018, 10:27:54 PM7/5/18
to JanusGraph users
I probably should have mentioned that I did this with cassandra rather than hbase, hopefully it's still helpful

dvdg...@gmail.com

unread,
Jul 6, 2018, 12:57:06 PM7/6/18
to JanusGraph users
Thanks Jeff,
I'll give it a try. Could you tell me a bit more about this class VertexDescriptor?

Jeff Callahan

unread,
Jul 6, 2018, 3:33:43 PM7/6/18
to JanusGraph users
It's a thin POJO representation of a Vertex that uses only field types that Spark directly understands - there is a corresponding EdgeDescriptor also.  If I'd had more time, I would have figured out how to make the StarVertex class work in my scenario.  My understanding is StarVertex (and the other Star* classes) largely exists for this purpose but I couldn't quite get it to work in my setup.

So the VertexDescriptor just has 5 fields:

String id;
String label;
List<EdgeDescriptor> inEdges;
List<EdgeDescriptor> outEdges;
Map<String, String> properties;

Obviously this doesn't support the full range of semantics the TinkerPop StarVertex does but it got me unblocked.  EdgeDescriptor is exactly what you'd expect; instead of inEdges and outEdges, it instead has VertexDescriptor fields for its attached vertices.

jeff.

dvdg...@gmail.com

unread,
Jul 9, 2018, 4:23:07 PM7/9/18
to JanusGraph users
HI Jeff,
it worked! And it has been very simple. Below what I did translated into Scala (sorry),
thanks a lot again.
David

private val conf: Configuration = new BaseConfiguration()

conf.setProperty("gremlin.graph", "org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph")

conf.setProperty("gremlin.hadoop.graphReader", "org.janusgraph.hadoop.formats.hbase.HBaseInputFormat")

conf.setProperty("gremlin.hadoop.graphWriter", "org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat");

conf.setProperty("janusgraphmr.ioformat.conf.storage.backend", "hbase")

conf.setProperty("janusgraphmr.ioformat.conf.storage.hostname", "snowwhite.fairytales")

conf.setProperty("janusgraphmr.ioformat.conf.storage.hbase.table", "janusgraph")

conf.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

private val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

private val hadoopConfiguration = ConfUtil.makeHadoopConfiguration(conf)

private val rdd: RDD[(NullWritable, VertexWritable)] =
sparkSession.sparkContext.
newAPIHadoopRDD(
hadoopConfiguration,
hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, classOf[InputFormat[NullWritable, VertexWritable]]).
asInstanceOf[Class[InputFormat[NullWritable, VertexWritable]]],
classOf[NullWritable], classOf[VertexWritable])

rdd.collect().foreach(println(_))

marc.de...@gmail.com

unread,
Jul 10, 2018, 3:34:25 AM7/10/18
to JanusGraph users
Great work, thanks for posting back. It is good to have this on the user list for future reference!

Marc

Op maandag 9 juli 2018 22:23:07 UTC+2 schreef dvdg...@gmail.com:

dingsi...@163.com

unread,
Oct 18, 2018, 9:59:33 AM10/18/18
to JanusGraph users
HI David, 
I try your code(Scala),but meet some problems like this:
捕获.PNG
I don't find solution,if possible,could you give me some tip. Thanks.
Emily

在 2018年7月10日星期二 UTC+8上午4:23:07,dvdg...@gmail.com写道:

rafi ansari

unread,
Jun 19, 2020, 2:52:25 AM6/19/20
to JanusGraph users
Hi David, can you explain the use of newAPIHadoopRDD in the code and how nullwritable and vertexwritable are being used as parameter?

Regards
Rafi
Reply all
Reply to author
Forward
0 new messages