mongo-hadoop with Spark is slow for me, and adding nodes doesn't seem to make any noticeable difference

101 views
Skip to first unread message

Carlo Scarioni

unread,
Sep 16, 2015, 7:34:36 PM9/16/15
to mongodb-user
Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

val sqlContext = new SQLContext(sc)

MongoDBLoader(conf, sc, "output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
(dbObject: BSONObject) => {
dbObject.put("add_field", "John Macclane")
val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
embedded.put("business_name", Name.first_name)
dbObject.put("embedded", webRfq)
val notesWrapper = Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
notesWrapper match {
case Some(notes) =>
notes.foreach((note: BasicDBObject) => {
note.put("text", Name.name)
})
case None =>
}
dbObject
}
)
}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config, sc: SparkContext, collection: String) {
val mongoConfig = new Configuration()

mongoConfig.set("mongo.input.uri",
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
mongoConfig.set("mongo.input.split_size", "50")
mongoConfig.set("mongo.input.limit", "70")


def rdd: RDD[(Object, BSONObject)] = {
val rdd = sc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject])
println("RDD partitions: " + rdd.partitions.length)
rdd
}

}



And 




case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext, collection: String) {

val mongoConfig = new Configuration()

mongoConfig.set("mongo.output.uri",
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) => BSONObject) = {

val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object, BSONObject)) => {
(null, transformer(tuple._2))
})

mongoRDD.saveAsNewAPIHadoopFile(
"file:///this-is-completely-unused",
classOf[Object],
classOf[BSONObject],
classOf[MongoOutputFormat[Object, BSONObject]],
mongoConfig)
}
}



This code runs slow. Taking 9.5 hours in a 3 machine cluster to process all. And after 6 hours in a 30 machine cluster I stopped as it was only about half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2 instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000 partitions and tasks created. Then it looks to slow down the processing towards it advance (This can be due to mongo documents being bigger at the second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs per machine) but just a tiny fraction is used. Wouldn't it be better to fill this more and only try to insert into mongo when more data is available?. 

I also tried to increase the Split size, but it complains of not enough resources on the worker. However I don't think the Splits are big enough to actually fill the 6GB of memory of each node, as when it stores them on HDFS is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is this the correct way to transform a collection from Mongo to Mongo?. Is there another way?



Reply all
Reply to author
Forward
0 new messages