Could you help me with my code? I have a big graph in spark. In the example I prepared mock of my graph with same structure.
- vertices are RDD[Int]
- edges are RDD[Int, Int, Data]
Could you help me to save this graph to titan without using RDD.collect?
import com.kountable.analytics.tools.SparkSupport
import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph, TitanVertex}
import com.thinkaurelius.titan.core.TitanFactory.Builder
import org.apache.spark.rdd.RDD
import scala.language.postfixOps
import scala.util.Random
/**
* Created by ivan on 9/26/16.
*/
object Titan extends SparkSupport{
val nodesCount = 1000
val edgesAvr = 10
val types = 0 to (edgesAvr * 3)
val ids = 1 to nodesCount toList
def run(): Unit ={
val vertices = spark.parallelize(ids)
val edges = spark.parallelize(ids.flatMap(i =>
(0 to Random.nextInt(edgesAvr * 2)).map{_ =>
(i,ids.apply(nodesCount - 1 - Random.nextInt(nodesCount + 1 - i)),types.apply(Random.nextInt(types.size)))
}.distinct
))
//println(edges.take(100).map{case (a, b, t) => a + "\t" + b + "\t" + t}.mkString("\n"))
/*
1 640 13
1 365 26
1 426 20
1 678 15
1 249 11
1 489 12
1 679 21
1 914 1
1 54 6
1 94 26
2 790 2
2 553 23
2 30 27
2 537 18
...
*/
storeToTitan(vertices, edges)
}
val titanGraphBuilder: Builder =
TitanFactory.build().set("storage.backend", "cassandra")
.set("storage.hostnme","127.0.0.1")
.set("storage.port", "9160")
def storeToTitan(vertices: RDD[Int], edges: RDD[(Int, Int, Int)]) = {
val titanGraph: TitanGraph = titanGraphBuilder.open()
val tx = titanGraph.newTransaction()
val idToVertex = vertices.collect().map{id =>
val vertex: TitanVertex = tx.addVertex()
vertex.property("id", id)
id -> vertex
}.toMap
edges.collect.foreach{
pair =>
val vertex1 = idToVertex.get(pair._1)
val vertex2 = idToVertex.get(pair._2)
(vertex1, vertex2) match {
case (Some(vertex1_ : TitanVertex), Some(vertex2_ : TitanVertex)) =>
vertex1_.addEdge(pair._3.toString, vertex2_)
vertex2_.addEdge(pair._3.toString, vertex1_)
case _ =>
}
}
tx.commit()
titanGraph.close()
}
}
Thank you.