Store RDD into Titan/Cassandra

49 views
Skip to first unread message

Иван Брагин

unread,
Sep 27, 2016, 10:19:11 AM9/27/16
to Aurelius
Could you help me with my code? I have a big graph in spark. In the example I prepared mock of my graph with same structure. 
- vertices are RDD[Int]
- edges are RDD[Int, Int, Data]
Could you help me to save this graph to titan without using RDD.collect?


import com.kountable.analytics.tools.SparkSupport
import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph, TitanVertex}
import com.thinkaurelius.titan.core.TitanFactory.Builder
import org.apache.spark.rdd.RDD
import scala.language.postfixOps
import scala.util.Random
/**
 * Created by ivan on 9/26/16.
 */

object Titan extends SparkSupport{

 val nodesCount
= 1000
 val edgesAvr
= 10
 val types
= 0 to (edgesAvr * 3)
 val ids
= 1 to nodesCount toList

 
def run(): Unit ={
 val vertices
= spark.parallelize(ids)
 val edges
= spark.parallelize(ids.flatMap(i =>
 
(0 to Random.nextInt(edgesAvr * 2)).map{_ =>
 
(i,ids.apply(nodesCount - 1 - Random.nextInt(nodesCount + 1 - i)),types.apply(Random.nextInt(types.size)))
 
}.distinct
 
))
 
//println(edges.take(100).map{case (a, b, t) => a + "\t" + b + "\t" + t}.mkString("\n"))
 
/*
 1 640 13
 1 365 26
 1 426 20
 1 678 15
 1 249 11
 1 489 12
 1 679 21
 1 914 1
 1 54 6
 1 94 26
 2 790 2
 2 553 23
 2 30 27
 2 537 18
 ...
 */

 storeToTitan
(vertices, edges)
 
}

 val titanGraphBuilder
: Builder =
 
TitanFactory.build().set("storage.backend", "cassandra")
 
.set("storage.hostnme","127.0.0.1")
 
.set("storage.port", "9160")


 
def storeToTitan(vertices: RDD[Int], edges: RDD[(Int, Int, Int)]) = {
 val titanGraph
: TitanGraph = titanGraphBuilder.open()
 val tx
= titanGraph.newTransaction()

 val idToVertex
= vertices.collect().map{id =>
 val vertex
: TitanVertex = tx.addVertex()
 vertex
.property("id", id)
 id
-> vertex
 
}.toMap

 edges
.collect.foreach{
 pair
=>
 val vertex1
= idToVertex.get(pair._1)
 val vertex2
= idToVertex.get(pair._2)
 
(vertex1, vertex2) match {
 
case (Some(vertex1_ : TitanVertex), Some(vertex2_ : TitanVertex)) =>
 vertex1_
.addEdge(pair._3.toString, vertex2_)
 vertex2_
.addEdge(pair._3.toString, vertex1_)
 
case _ =>
 
}
 
}
 tx
.commit()
 titanGraph
.close()


 
}
}

Thank you.


HadoopMarc

unread,
Sep 27, 2016, 10:56:35 AM9/27/16
to Aurelius
Hi 

You can basically use rdd.foreachPartition() to have each executor connect to Cassandra by Titan. Then use graph.adVertex and Vertex.addEdge.

The TinkerPop's BulkLoaderVertexProgram actually does this, but it does not work well with the current Titan 1.x because vertex lookups do not hit the Titan indices (search for this issue on this forum).

Hope this helps,

Marc

Op dinsdag 27 september 2016 16:19:11 UTC+2 schreef Иван Брагин:

Иван Брагин

unread,
Sep 27, 2016, 11:44:15 AM9/27/16
to Aurelius
Thank you for answer HadoopMarc,

In case of use:
vertices.foreachPartition{part => 
startTransaction
part.foreach(add vertex)
storeTransaction
I can add vertices but I will lose links on them and cant easy create edges. For each edge I will need to go to database end extract id->vertex. As I think it should work but some slowly than it could.
Could you clarify If I have more than 1 partition rdd.foreachPartition() will work parallel and each of them will create a transaction. It is correct to have more than one transaction in one time or I need make it on single thread. 

If use TinkerPop's BulkLoaderVertexProgram on Titan 0.9 will it work better than rdd.foreachPartition() approach?

вторник, 27 сентября 2016 г., 17:56:35 UTC+3 пользователь HadoopMarc написал:

HadoopMarc

unread,
Sep 27, 2016, 3:35:51 PM9/27/16
to Aurelius
Hi Ivan,

Did some work on the Cyirillic alphabet :-)

Yes the parallel Spark executors can do parallel transactions. There is also a bulkloading setting for Titan, see the docs. Vertex lookups are very fast in Titan, so your vertices need some global key for which you define a compositeIndex, see the docs.

Alternatively, if there is some possibility to logically partition your graph (such that the logic partitions coincide  with Spark partitions for a significatn part), it would make sense to maintain some vertex cache per partition, so:

 - add all vertices in all partitions and create  a map per partition (vertexkey -> TitanVertex)  [the vertexkey needs to be globally unique and to be present in your edge RDD]
 - add all edges, first testing the vertex cache and otherwise do an automatic vertex lookup by the Titan index.

Regarding TinkerPop, they did a great job in extending the gremlin codebase and making it as consistent as possible. Unfortunately, this means for now that Titan 1.x does not work well with the TinkerPop BulkLoaderVertexProgram. I posted a workaround to this earlier, see:
https://groups.google.com/forum/#!searchin/aureliusgraphs/bulkloadervertexprogram$20index$20warnings%7Csort:relevance/aureliusgraphs/EAAVX0th2OU/5uHmw4WEAQAJ

Hope this helps,

Marc

Op dinsdag 27 september 2016 17:44:15 UTC+2 schreef Иван Брагин:
Reply all
Reply to author
Forward
0 new messages