Failed to find all paths between 2 vertices upon the graph having 100 million vertices and 100 million edges using SparkGraphComputer

65 views
Skip to first unread message

Roy Yu

unread,
Jan 2, 2021, 12:53:08 AM1/2/21
to JanusGraph users
The graph has 100 million vertices and 100 edges
Graph data is saved at HBase Table: MyHBaseTable.

The size of MyHBaseTable is 16.2GB:
root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/
16.2 G   32.4 G   /apps/hbase/data/data/default/MyHBaseTable

MyHBaseTable has 190 regions, the edge data (HBase column family e ) of every region is less than 100MB (one spark task processes one region, in order to avoid spark OOM during loading region data, I use HBaseAdmin to split HBase region to make sure the edges data (HBase column family e ) of every region is less than 100MB) . Below the size of region 077288f4be4c439443bb45b0c2369d5b is more than 100MB because it has index data.
root@~$ hdfs dfs -du -h /apps/hbase/data/data/default/MyHBaseTable
3.8 K    7.6 K    /apps/hbase/data/data/default/MyHBaseTable/.tabledesc
0        0        /apps/hbase/data/data/default/MyHBaseTable/.tmp
78.3 M   156.7 M  /apps/hbase/data/data/default/MyHBaseTable/007e9dbf74f5d35862b68d6434f1d6f2
92.2 M   184.3 M  /apps/hbase/data/data/default/MyHBaseTable/077288f4be4c439443bb45b0c2369d5b
102.4 M  204.8 M  /apps/hbase/data/data/default/MyHBaseTable/0782782071e4a7f2d17800d4a0989a7f
50.6 M   101.3 M  /apps/hbase/data/data/default/MyHBaseTable/07e795022e56a969ede48c9c23fbbc7c
50.6 M   101.3 M  /apps/hbase/data/data/default/MyHBaseTable/084e54e61bbcfc2decd14dcbac55bc50
99.7 M   199.4 M  /apps/hbase/data/data/default/MyHBaseTable/0a85ae356b19c605d9a32b9bf513bcbb
431.3 M  862.6 M  /apps/hbase/data/data/default/MyHBaseTable/0b024c812acfa6efaa40e1cca232e192
5.0 K    10.1 K   /apps/hbase/data/data/default/MyHBaseTable/0c2d8e3a6daaa8ab30c399783e343890
...


the properties of the graph:
gremlin.graph=org.janusgraph.core.JanusGraphFactory
cluster.max-partitions = 16
storage.backend=hbase
storage.hbase.table=MyHBaseTable
storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure
schema.default=none

storage.hostname=master001,master002,master003
storage.port=2181
storage.hbase.region-count=64
storage.write-time=1000000
storage.read-time=100000

ids.block-size=200000
ids.renew-timeout=600000
ids.renew-percentage=0.4
ids.authority.conflict-avoidance-mode=GLOBAL_AUTO

index.search.backend=elasticsearch
index.search.hostname=es001,es002,es003
index.search.elasticsearch.create.ext.index.number_of_shards=15
index.search.elasticsearch.create.ext.index.refresh_interval=-1
index.search.elasticsearch.create.ext.index.translog.sync_interval=5000s
index.search.elasticsearch.create.ext.index.translog.durability=async
index.search.elasticsearch.create.ext.index.number_of_replicas=0
index.search.elasticsearch.create.ext.index.shard.check_on_startup=false


the schema of the graph:
def defineSchema(graph) {
    m = graph.openManagement()

        node = m.makeVertexLabel("node").make()

        relation = m.makeEdgeLabel("relation").make()
        obj_type_value = m.makePropertyKey("obj_type_value").dataType(String.class).make()

    // edge props
        start_time = m.makePropertyKey("start_time").dataType(Date.class).make()
        end_time = m.makePropertyKey("end_time").dataType(Date.class).make()
        count = m.makePropertyKey("count").dataType(Integer.class).make()
        rel_type = m.makePropertyKey("rel_type").dataType(String.class).make()
    //index
        m.buildIndex("MyHBaseTable_obj_type_value_Index", Vertex.class).addKey(obj_type_value).unique().buildCompositeIndex()
        m.buildIndex("MyHBaseTable_rel_type_index", Edge.class).addKey(rel_type).buildCompositeIndex()
        m.buildIndex("MyHBaseTable_count_index", Edge.class).addKey(count).buildMixedIndex("search")
        m.buildIndex("MyHBaseTable_start_time_index", Edge.class).addKey(start_time).buildMixedIndex("search")
        m.buildIndex("MyHBaseTable_end_time_index", Edge.class).addKey(end_time).buildMixedIndex("search")
    m.commit()
}

the Gremlin I use to find all paths between 2 vertices:

import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.P;
def executeScript(graph){
    traversal = graph.traversal().withComputer(SparkGraphComputer.class);
    return traversal.V(624453904).repeat(__.both().simplePath()).until(__.hasId(192204064).or().loops().is(200)).hasId(192204064).path().dedup().limit(1000).toList()
    //return traversal.V().where(__.outE().count().is(P.gte(50000))).id().toList()
};

The OLAP spark graph conf:
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphReader=org.janusgraph.hadoop.formats.hbase.HBaseInputFormat
gremlin.hadoop.graphWriter=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat

gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
gremlin.spark.graphStorageLevel=DISK_ONLY
gremlin.spark.persistStorageLevel=DISK_ONLY
####################################
# JanusGraph HBase InputFormat configuration
####################################
janusgraphmr.ioformat.conf.storage.backend=hbase
janusgraphmr.ioformat.conf.storage.hostname=master002,master003,master001
janusgraphmr.ioformat.conf.storage.hbase.table=MyHBaseTable
janusgraphmr.ioformat.conf.storage.hbase.ext.zookeeper.znode.parent=/hbase-unsecure

####################################
# SparkGraphComputer Configuration #
####################################
spark.master=yarn
spark.submit.deployMode=client
spark.yarn.jars=hdfs://GRAPHOLAP/user/spark/jars/*.jar

# the Spark YARN ApplicationManager needs this to resolve classpath it sends to the executors
spark.yarn.appMasterEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/
spark.yarn.appMasterEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf
spark.yarn.am.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native
spark.executor.memoryOverhead=5G
spark.driver.extraJavaOptions=-Diop.version=3.1.4.0-315 -Djava.library.path=/usr/hdp/current/hadoop-client/lib/native

# the Spark Executors (on the work nodes) needs this to resolve classpath to run Spark tasks
spark.executorEnv.JAVA_HOME=/usr/local/jdk1.8.0_191/
#spark.executorEnv.HADOOP_CONF_DIR=/usr/hdp/3.1.4.0-315/hadoop/conf
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:MaxGCPauseMillis=500 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:/mnt/data_1/log/spark2/gc-spark%p.log
spark.executor.cores=1
spark.executor.memory=80G
spark.executor.instances=3
spark.executor.extraClassPath=/etc/hadoop/conf:/usr/spark/jars:/usr/hdp/current/hbase-client/lib:/usr/janusgraph/0.4.0/lib

spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.network.timeout=1000000
spark.rpc.askTimeout=1000000
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7447
spark.maxRemoteBlockSizeFetchToMem=10485760
spark.memory.useLegacyMode=true
spark.shuffle.memoryFraction=0.1
spark.storage.memoryFraction=0.1
spark.memory.fraction=0.1
spark.memory.storageFraction=0.1
spark.shuffle.accurateBlockThreshold=1048576


The spark job failed at stage 50 :
20/12/30 01:53:00 ERROR executor.Executor: Exception in task 40.0 in stage 50.0 (TID 192084)
java.lang.OutOfMemoryError: Java heap space
        at sun.reflect.generics.repository.ClassRepository.getSuperInterfaces(ClassRepository.java:114)
        at java.lang.Class.getGenericInterfaces(Class.java:913)
        at java.util.HashMap.comparableClassFor(HashMap.java:351)
        at java.util.HashMap$TreeNode.treeify(HashMap.java:1932)
        at java.util.HashMap.treeifyBin(HashMap.java:772)
        at java.util.HashMap.putVal(HashMap.java:644)
        at java.util.HashMap.put(HashMap.java:612)
        at java.util.Collections$SynchronizedMap.put(Collections.java:2588)
        at org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet.add(TraverserSet.java:90)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.lambda$drainStep$4(WorkerExecutor.java:232)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor$$Lambda$86/877696627.accept(Unknown Source)
        at java.util.Iterator.forEachRemaining(Iterator.java:116)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.drainStep(WorkerExecutor.java:221)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.WorkerExecutor.execute(WorkerExecutor.java:151)
        at org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram.execute(TraversalVertexProgram.java:307)
        at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor.lambda$null$4(SparkExecutor.java:118)
        at org.apache.tinkerpop.gremlin.spark.process.computer.SparkExecutor$$Lambda$72/1209554928.apply(Unknown Source)
        at org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils$3.next(IteratorUtils.java:247)
        at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)


From the log it seems there is too much data even the 80G executor heap is not enough. 
Anybody who can help me ?  Anybody who has idea to find all the paths between 2 vertices upon large graph?

HadoopMarc

unread,
Jan 2, 2021, 5:21:06 AM1/2/21
to JanusGraph users
Hi Roy,

Nice to see you back here, still going strong!

I guess the TraversalVertexProgram used for OLAP traversals is not well suited to your use case. You must realize that 200 stages in an OLAP traversal is a fairly extreme. I assume you edge count is 100 million and not 100. So, the number of paths between two vertices could easily explode and the storage of associated java objects (Traversers in the stacktrace) could grow beyond 80 Gb.

It would be relatively easy to write your own VertexProgram for this simple traversal (you can take the ConnectedComponentVertexProgram as an example). See also the explanation in the corresponding recipe. This will give you far more control over data structures and their memory usage.

Best wishes,    Marc

Op zaterdag 2 januari 2021 om 06:53:08 UTC+1 schreef Roy Yu:

Roy Yu

unread,
Jan 3, 2021, 9:08:14 PM1/3/21
to JanusGraph users
Hi Marc

My graph has 100 million edges not 100 edges. Sorry for my miswriting. From your advice I think I need to do two things.  Firstly I need to dig into  ConnectedComponentVertexProgram and manage how to write my own VertexProgram. Seondly, implement the VertexProgram path finding logic, about which I have no idea. As the path between 2 vertices on the graph containing 100 million edges could be easily  explode. I have no memory or even disk to store all the results. Could you give your solution in detail?
Reply all
Reply to author
Forward
0 new messages