Spark Mongo Connector - GraphX.

207 views
Skip to first unread message

Manoj Waikar

unread,
Jan 11, 2017, 12:40:35 AM1/11/17
to mongodb-user
Hi,

We are specifically looking to use the GraphX component of Spark, hence my questions are related to this aspect of Spark Mongo connector -

The GraphX APIs work with user-defined types for vertices and edges. Whereas the MongoSpark.load() returns me an RDD of Documents. Similarly, even if I have case classes (for my domain), the sqlContext.loadFromMongoDB[T] gives me a Dataset[Row] with the schema confirming to T.

1) What's the correct (and most performant) way to transform either the MongoRDD[Document] or Dataset[Row] to Seq[T] (where T is my custom domain class)?
2) I also noticed that if my domain class has one of the fields as ObjectId, and I try to use loadFromMongoDB method, I get an exception - "java.lang.UnsupportedOperationException: Schema for type org.bson.types.ObjectId is not supported". Why is that so?
3) The documentation for MongoDB Spark Connector is for the version 1.1. Is there a documentation for the latest 2.0 version of the connector?
4) Are there any examples of the connector using GraphX APIs of Spark?

Thanks,
Manoj.

Wan Bachtiar

unread,
Jan 17, 2017, 2:10:06 AM1/17/17
to mongodb-user

1) What’s the correct (and most performant) way to transform either the MongoRDD[Document] or Dataset[Row] to Seq[T] (where T is my custom domain class)?

Hi Manoj,

This question depends on your application requirements, and more towards programming (Scala) itself. I would suggest to post a question on StackOverflow to reach wider audience to find the most performant conversion in Scala. 

2) I also noticed that if my domain class has one of the fields as ObjectId, and I try to use loadFromMongoDB method, I get an exception - “java.lang.UnsupportedOperationException: Schema for type org.bson.types.ObjectId is not supported”. Why is that so?

It is likely that you haven’t loaded the mongo-spark-connector. Check whether you have sourced the right Scala version for your Spark environment.

If you are still having this issue, could you provide answers to the following:

  • How did you load the mongo-spark-connector ?
  • Which Apache Spark version are you using ?
  • Could you provide a code snippet example that can reproduce this error ?

3) The documentation for MongoDB Spark Connector is for the version 1.1. Is there a documentation for the latest 2.0 version of the connector?

Currently there is an open tracking ticket for MongoDB Spark Connector v2.0 documentation work DOCS-8750. Please feel free to upvote or add yourself as a watcher to receive updates on the ticket.

You can also review mongo-spark docs for introduction and examples that includes changes from v2.0.

4) Are there any examples of the connector using GraphX APIs of Spark?

Below example follows the example mentioned on Spark: GraphX programming Guide to show case property graph. For example, you have a collection called users in MongoDB which would serve as our Vertex table as below:

{ "_id" : 3, "name" : "rxin", "role" : "student" }
{ "_id" : 7, "name" : "igonzal", "role" : "postdoc" }
{ "_id" : 5, "name" : "franklin", "role" : "professor" }
{ "_id" : 2, "name" : "istoica", "role" : "professor" }

Also, another collection called relationships in the same database, which would serve as our Edge table:

{ "_id" : 1, "sourceId" : 3, "destId" : 7, "relationship" : "Collabolator" }
{ "_id" : 2, "sourceId" : 5, "destId" : 3, "relationship" : "Advisor" }
{ "_id" : 3, "sourceId" : 2, "destId" : 5, "relationship" : "Colleague" }
{ "_id" : 4, "sourceId" : 5, "destId" : 7, "relationship" : "PI" }

As an example, you can use MongoDB Connector for Spark with GraphX as below:

import org.apache.spark._
import org.apache.spark.graphx._
import com.mongodb.spark._
import com.mongodb.spark.config._
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
val sparkSession = SparkSession.builder().getOrCreate()

// Load users collection data 
val users =  MongoSpark.load(sparkSession, ReadConfig(Map("collection" -> "users"), Some(ReadConfig(sparkSession))))

// Load relationships collection data
val relationships =  MongoSpark.load(sparkSession, ReadConfig(Map("collection" -> "relationships"), Some(ReadConfig(sparkSession))))  

// Prepare users as vertices for Graph()
val vertices: RDD[(VertexId, (String, String))] = users.rdd.map((x)=>(x.get(0).asInstanceOf[Number].longValue, (x.get(1).toString, x.get(2).toString) ))

// Prepare relationships as edges for Graph()
val edges = relationships.rdd.map((x)=>(Edge(x.get(3).asInstanceOf[Number].longValue, x.get(1).asInstanceOf[Number].longValue, x.get(2).toString)))

// Instantiate Graph() with vertices and edges above
val graph = Graph(vertices, edges)

// Use the triplets view to create an RDD of facts
val facts: RDD[String] = graph.triplets.map(triplet =>
                         triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
/* Output: 
istoica is the Colleague of franklin
rxin is the Collabolator of igonzal
franklin is the Advisor of rxin
franklin is the PI of igonzal
*/

Alternatively, depending on your requirements, you could also utilise MongoDB Aggregation Pipeline. For example using $lookup operator you could perform the aggregation in MongoDB as below:

val lookup = "{$lookup:{from:'users', localField:'sourceId', foreignField:'_id', as:'source'}}"
val lookup2 = "{$lookup:{from:'users', localField:'destId', foreignField:'_id', as:'dest'}}"
val project = "{$project:{_id:0, source_attr:'$source.name', dest_attr:'$dest.name', relationship: '$relationship'} }"
val pipelineDF = sparkSession.read.format("com.mongodb.spark.sql")
                              .option("collection", "relationships")
                              .option("pipeline", "["+ lookup +","+ lookup2 +","+ project +"]").load()
pipelineDF.show()
/* Output: 
+----------+------------+-----------+
| dest_attr|relationship|source_attr|
+----------+------------+-----------+
| [igonzal]|Collabolator|     [rxin]|
|    [rxin]|     Advisor| [franklin]|
|[franklin]|   Colleague|  [istoica]|
| [igonzal]|          PI| [franklin]|
+----------+------------+-----------+
*/

The above snippets are tested on environment Apache Spark 2.1.0, mongo-spark v2.0.0, and MongoDB v3.4.x.

You may also find related resources useful:

Kind regards,

Wan.

Reply all
Reply to author
Forward
0 new messages