SparkGraphComputer OLAP queries from JanusGraph. Alternatives to Kryo/GraphSON files.

912 views
Skip to first unread message

Debasish Kanhar

unread,
Mar 7, 2018, 6:23:23 AM3/7/18
to JanusGraph users
Hi All,

We at work are using JG 2.0 for our projects here and we want to implement OLAP queries on Graph we have stored at JG/Cassandra.

As far my understanding goes, SparkGraphComputer doesn't work on instance of TinkerGraph and JanusGraph. It just works on instance of HadoopGraph. [Correct me if wrong].
So if that is scenario, we need to convert TinkerGraph (From subgraph step) into HadoopGraph before running an OLAP query on it. From all reference I found, it was done as following:
 
  1. Retrieve subgraph. Eg: subgraph = g.V().bothE().subgraph("sg").cap("sg").next()
  2. Save subgraph either to GraphSON or Kryo in file system.
  3. Depending on GraphSON or Kryo format, we define graphReader and graphWriter properties in .properties file. (I've added my properties file for reading from GraphSON).
  4. Create a Traversal object on it using SparkGraphComputer for doing OLAP queries on it.
 
Now, my query is that, is there some alternative to the above mentioned pipeline? Can I bypass file I/O to reduce ovehead? Something like in-memory conversion of tinkergraph into Hadoopgraph.
 
Reason we want this?
Sometime our Graph flows into huge size, something like around 100 million nodes. Saving such a huge GraphSON to File system contributes to huge overhead which is not desirable and hence it becomes better is such  File I/O can be avoided completely.

Thanks for any help.

Debasish Kanhar

unread,
Mar 7, 2018, 6:28:03 AM3/7/18
to JanusGraph users
Forgot to attach Hadoop Scpark configuration files.
hadoop config.txt

Debasish Kanhar

unread,
Mar 7, 2018, 2:51:03 PM3/7/18
to JanusGraph users
Came acros few more things which I used to play around. Looks like there is something like CassandraInputFormat which reads from Cassandra and does OLAP but looks like it doesn't support Cassandra 3 which we use in Production.

I went through various Github issues, and it looks like known issue, but also you guys released a resolution for this in JG2.0 right?

If that is scenario, I tried the following things in my gremlin.hadoop.graphReader but all of them failed for various reasons. We can get into details of stacktraces if needed!

  1. org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
  2. org.janusgraph.hadoop.formats.cassandra.CassandraInputFormat
  3. org.janusgraph.hadoop.formats.cassandra.CqlBridgeRecordReader

Kinda stuck at this point for long. Anyy help really appreciated.


On Wednesday, 7 March 2018 16:53:23 UTC+5:30, Debasish Kanhar wrote:

HadoopMarc

unread,
Mar 8, 2018, 1:54:09 PM3/8/18
to JanusGraph users
Hi Debasish,

Indeed, Cassandra3InputFormat seems like the most promising way to go. Can you also attach the complete config you used while trying Cassandra? Did you also check the example configs in conf/hadoop-graph in the JanusGraph distribution?

Cheers,   Marc

Op woensdag 7 maart 2018 20:51:03 UTC+1 schreef Debasish Kanhar:

Debasish Kanhar

unread,
Mar 10, 2018, 10:45:25 AM3/10/18
to JanusGraph users
Hi Marc,

Sorry was off my system for last few days. Looks like I'm still facing same error even with pre-configured .properties files.

I've a Cassandra cluster setup in following IPs: 9.30.xxx.222, 9.30.xx.29, 9.30.xxx.218.

I can connect to the same Cassandra Cluster without using Spark Graph computer as follows:

Graph graph = JanusGraphFactory.build().
               
set("storage.backend", "cassandra").
               
set("storage.hostname", "9.30.xxx.222, 9.30.xx.29, 9.30.xxx.218").set("storage.port", "9160").
               
set("index.search.backend", "elasticsearch").set("index.search.hostname", "127.0.0.1").
                open
();

GraphTraversalSource g = graph.traversal();
System.out.println("Before Pushing " + g.V().count().profile().next());



The above set of codes work, and I'm able to Profile my g.V().count() query [Which is anyway around 110,000 ms].

Now, I want to do the same using SparkGraphComputer as follows:

String read_cassandra_properties = "/opt/IGA/JanusGraph0.2.0/conf/hadoop-graph/read-cassandra3-cluster.properties";
Graph graph_computer = GraphFactory.open(read_cassandra_properties);

And, my read-cassandra3-cluster.properties is as follows:

gremlin
.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin
.hadoop.graphInputFormat=org.janusgraph.hadoop.formats.cassandra.Cassandra3InputFormat
gremlin
.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin
.hadoop.jarsInDistributedCache=true
gremlin
.hadoop.inputLocation=none
gremlin
.hadoop.outputLocation=output

janusgraphmr
.ioformat.conf.storage.backend=cassandra
janusgraphmr
.ioformat.conf.storage.hostname=9.30.xxx.222, 9.30.xx.29, 9.30.xxx.218
janusgraphmr
.ioformat.conf.storage.port=9160
janusgraphmr
.ioformat.conf.storage.cassandra.keyspace=janusgraph

cassandra
.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

spark
.master=local[4]
spark
.serializer=org.apache.spark.serializer.KryoSerializer

gremlin
.hadoop.defaultGraphComputer=org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer

Once I load Graph, I do
g = graph_computer.traversal().withComputer();

I try to use the g to do profile of my Count step, but it fails:
The following code fails:
System.out.println(g1.V().count().profile().next());

I get an error telling following things:
java.lang.IllegalStateException: java.io.IOException: Could not get input splits

And

Caused by: java.io.IOException: failed connecting to all endpoints 9.30.253.222

Please have a look at this link for complete stack trace.
Traceback.

And, I thought the error is because I can't connect to Cassandra IPs. but as mentioned I was able to connect to Cassandra cluster few lines back, so this error kind of seems strange.

Thanks in advance!

Cheers Deb!

HadoopMarc

unread,
Mar 10, 2018, 2:50:47 PM3/10/18
to JanusGraph users
Hi Debasish,

Your additional info does not provide any clues. I also saw your question on an older unsolved thread on the same connection problem. I have two suggestions:
  1. Try the suggestion from the JanusGraph ref docs (there may be underlying CLASSPATH issues that only appear with SparkGraphComputer):

    14.9. Cassandra Connection Problem

    By default, JanusGraph uses the Astyanax library to connect to Cassandra clusters. On EC2 and Rackspace, it has been reported that Astyanax was unable to establish a connection to the cluster. In those cases, changing the backend to storage.backend=cassandrathrift solved the problem.

  2. Configure the logging level to DEBUG to get more clues about the connection problem

HTH,    Marc


Op woensdag 7 maart 2018 12:23:23 UTC+1 schreef Debasish Kanhar:

Debasish Kanhar

unread,
Mar 11, 2018, 12:19:43 PM3/11/18
to JanusGraph users
Hi March,

That was nice catch of thrift. It seemed like I had to manually nodetool enablethrift on every node. Don't know why my nodes weren't getting connected by Astyanax. 
I played around with OLAP more, and finally able to make it work.
Anyways, once done, I was able to load Grateful Dead graph from tutorials as well as Twitter dataset from internet, which are small graphs of around 800-1500 nodes. and run OLAP on it.

Queries I tried:
g.V().count();
result = graph.compute().program(PageRankVertexProgram.build().create()).submit().get(); 
g = result.graph().traversal(); 
g
.V().valueMap()

But it takes me atleast 3-4 mins to run, though I didn't record exact timings. I guess that is too bad performance?

I get an error: 
java.lang.NoClassDefFoundError: io/netty/channel/epoll/EpollDatagramChannel$DatagramSocketAddress

and while researching, it said that it can be ignored, but it leads to slower connection to Cassandra. I suspect that is the reason the runtime is slow.

It would be really great, if you can guide me in reference to this.

Now, JG2.0 comes with following libs (Under JG2.0_HOME/lib/*):
1: netty-transport-4.0.50.Final.jar
2: netty-all-4.0.50.Final.jar . and so on.

Should I manually just update netty-transport-4.0.50.Final.jar into some higher version? Or all netty jars? Or which all jars?

Cheers Deb.

HadoopMarc

unread,
Mar 11, 2018, 4:05:39 PM3/11/18
to JanusGraph users
Hi Debasish,

Nice to hear you made signhificant progess.

The io.netty site says: "Although we did our best to keep the backward compatibility from 4.0, 4.1 contains multiple additions which might not be fully backward-compatible with 4.0."

If you want to try, the easiest place to do it is to add it in the dependency section of:

https://github.com/JanusGraph/janusgraph/blob/master/janusgraph-dist/pom.xml

Then you will not see al the induced version conflicts in the modules, but at least you will get the right transitive deps for io.netty (if any).

Cheers,    Marc


Op zondag 11 maart 2018 17:19:43 UTC+1 schreef Debasish Kanhar:

Debasish Kanhar

unread,
Mar 12, 2018, 3:09:21 PM3/12/18
to JanusGraph users
Hi Marc,

It looks like this error with Netty is kinda showstopper.

Looks like netty error is kind of reason for slower OLAPs. wanted to know how is ur dev env setup for OLAPs?

I've Cassandra 3.x cluster with Thrift enabled. JG2.0x. Spark installed on local at /usr/bin/spark.

now, I simply created a maven project to do OLAP using JanusGraph/HadoopGraph.

As mentioned before, I specify the hadoop.properties spark.master=local with 4/8 executors. My question is, have you tried setup and were you able to run any OLAP queries yourself?

My Page Rank query on Graph of 800 nodes & 40k edges takes > 2hours from Maven project just because it has some 769 stages to execute. If needed, my pom.xml & Java class code can be added.

Will be really helpful if you can guide any way. Also we are open to downgrading to Cassandra 2.x.

Thanks

HadoopMarc

unread,
Mar 12, 2018, 3:56:17 PM3/12/18
to JanusGraph users
Hi Debasish,

OK, let's see what happens here.

1. the io.netty part
What I did not ask yesterday: you mentioned investigations which showed  the io.netty behavior to significantly slow down the total runtime. Do these investigations provide sufficient documentation to create an issue at https://github.com/JanusGraph/janusgraph/issues ?

2. OLAP runtime
The PageRankVertexProgram is possibly not the most thoroughly tested part of TinkerPop. Apparently the algo does not converge sufficiently fast on your graph to result in reasonable runtimes, but this does not disqualify OLAP as a whole, in particular OLAP traversals. Maybe you can try some non-trivial OLAP traversal instead to judge what OLAP could do for your use cases.

3. My own experiences
My own experience with OLAP JanusGraph/HBase is still disappointing which I think is due to HBaseInputFormat input split size. Cassandra is still on my wish list. What I do myself is a kind of poor man's OLAP: store all vertex id's on hdfs and for the actual OLAP query do a mapPartitions on an RDD with vertex id 's and have each Spark executor connect to JanusGraph and do a OLTP query on the range of vertex id's belonging to the spark tasks it handles.

HTH,     Marc


Op maandag 12 maart 2018 20:09:21 UTC+1 schreef Debasish Kanhar:

Debasish Kanhar

unread,
Mar 22, 2018, 3:01:34 PM3/22/18
to JanusGraph users
Hi Marc,

Sorry was tied in different aspects of Janus, anyways, I got back to doing the whole thing from scratch up. Moved from cassandra 3, and now planning for setting up Gremlin connect with Yarn cluster. The following things which have been tried:

  1. Setup Janus to do OLAP on single node VM using Spark. I tried cycle detection instead of Page Rank, and following are my stats:
    1. Graph size: 1M nodes & 1.5M edges. Cycle detection for 2 hops on 1 Executor 5GB RAM takes 1hour.
    2. Able to do Degree centrality, normal querying. 
    3. Used cassandra 2.2.8 as backend in standalone mode (Non clustered mode) present in same VM.
    4. Elasticsearch 5.6.2 present & running in same VM.
  2. Now I want to scale it using Spark cluster.I seem to have following options:
    1. Setup spark cluster in standalone mode, i.e. starting spark masrer, and workers manually. Connect to master by using property spark.master=<IP of MASTER>:8080[8]
      1. Plan to try this thing tomorrow.
    2. Setup Yarn cluster and do it. 
      1. Followed following tutorials to setup Hadoop Yarn cluster: https://linode.com/docs/databases/hadoop/how-to-install-and-set-up-hadoop-cluster/
      2. Followed to setup Spark cluster on top of it: https://linode.com/docs/databases/hadoop/install-configure-run-spark-on-top-of-hadoop-yarn-cluster/
      3. I started yarn server, and connected to UI which effectively means that YARN is running. But how do I configure Gremlin to connect with the same Yarn?
The following is my proerties file I use to connect to Cassandra to do OLAP:

#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.janusgraph.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output

#
# JanusGraph Cassandra InputFormat configuration
#
janusgraphmr.ioformat.conf.storage.backend=cassandrathrift
# As mentioned, I've local Cassandra in same VM as Spark Master and JG.
janusgraphmr.ioformat.conf.storage.hostname=127.0.0.1
janusgraphmr.ioformat.conf.storage.port=9160
janusgraphmr.ioformat.conf.storage.cassandra.keyspace=panamaDev

#
# Apache Cassandra InputFormat configuration
#
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

#
# SparkGraphComputer Configuration
#
spark.master=yarn-client
spark.driver.host=9.30.100.218
#spark.executor.memory=1536m
spark.executor.memory=6g
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.yarn.dist.archives=/opt/JanusGraph/0.2.0/lib.zip
spark.yarn.dist.files=/opt/JanusGraph/0.2.0/lib/janusgraph-hbase-0.2.0.jar
spark.driver.extraLibraryPath=/home/hadoop/hadoop/lib/native
spark.executor.extraLibraryPath=/home/hadoop/hadoop/lib/native
gremlin.spark.persistContext=true

# Default Graph Computer
gremlin.hadoop.defaultGraphComputer=org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer


I actually followed your tutorial : http://yaaics.blogspot.in/2017/07/configuring-janusgraph-for-spark-yarn.html as my base for creating the properties file as above but I'm unable to do even g.V().count(). The following is stack trace:

The following:
gremlin> graph = GraphFactory.open("/opt/resources/janusgraph-connections/testGraph-OLAP-yarn-cassandra-local.properties")
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal().withComputer()
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], graphcomputer]
gremlin> g.V().count()
11:58:17 ERROR org.apache.spark.SparkContext  - Error initializing SparkContext.
org.apache.spark.SparkException: Unable to load YARN support
        at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:399)
        at org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:394)
        at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:394)
        at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:411)
        at org.apache.spark.util.Utils$.getSparkOrYarnConfig(Utils.scala:2118)
        at org.apache.spark.storage.BlockManager.<init>(BlockManager.scala:105)
        at org.apache.spark.SparkEnv$.create(SparkEnv.scala:365)
        at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:193)
        at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:288)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
        at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2281)
        at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
        at org.apache.tinkerpop.gremlin.spark.structure.Spark.create(Spark.java:52)
        at org.apache.tinkerpop.gremlin.spark.structure.Spark.create(Spark.java:60)
        at org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer.lambda$submitWithExecutor$0(SparkGraphComputer.java:193)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.util.Utils$.classForName(Utils.scala:174)
        at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:395)
        ... 18 more
org.apache.spark.SparkException: Unable to load YARN support
Type ':help' or ':h' for help.
Display stack trace? [yN]

The following https://paste2.org/x2ajHCgZ is full stacktrace.

I feel like I'm missing on some jars somewhere, and looks like I'm pointing my paths to wrong way.

Any help will be grateful!. If needed, I can start a new thread also.

Thanks :-)

HadoopMarc

unread,
Mar 23, 2018, 3:25:41 PM3/23/18
to JanusGraph users
Hi Debasish,

The JanusGraph on Spark/Yarn blog refers to the TinkerPop/Yarn blog. There you see you need some additional jars among which the spark-yarn jar where the stacktrace points at.


HTH,    Marc

Op woensdag 7 maart 2018 12:23:23 UTC+1 schreef Debasish Kanhar:
Hi All,
Reply all
Reply to author
Forward
0 new messages