Mongo Bulk Write Using Scala(Casbah Driver)

75 views
Skip to first unread message

ronak kabra

unread,
Oct 30, 2017, 8:47:48 AM10/30/17
to mongodb-casbah-users
I have created and process one dataframe in scala and spark After creating that dataframe I passed into one function to bulk write 
This is my code: 

val userdemographiclogpath = "test.json" 
val userdemoDF = sparkSession.read.json(userdemographiclogpath).repartition(10) 
val idfaDF = userdemoDF.select("idfa", "ag", "gn", "yob", "scrtp").where(userdemoDF("idfa").isNotNull).map(r => (r(0).asInstanceOf[String], r(1).asInstanceOf[String], r(2).asInstanceOf[String], r(3).asInstanceOf[String], r(4).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toArray)) .map { case (a, b, c, d, e) => (a, b, c, d, e,gethash(a))}.toDF("_id","ag","gn","yob","scrtp","hash") 
writemongo(idfaDF) 

Following is the writemongo function code: 

def writemongo(frame: DataFrame) : Unit = { 

for (i <- 1 to 5) { val commonDF13 = frame.select("_id", "ag", "gn", "yob", "scrtp").where(frame("hash") === i).as[admaximdemo] 
val uri = MongoClientURI("mongodb://xyz/") 
val mongoColl = MongoClient(uri)("userdemographic" + i)("UserDemographic")
val build :BulkWriteOperation = mongoColl.initializeUnorderedBulkOperation
def getbuild(mongo_db1: MongoDBObject, update2: MongoDBObject) = { build.find(mongo_db1).upsert().update(update2) } 

commonDF13.rdd.map(row=> { val mongo_db1 = MongoDBObject("_id" -> row._id) val demographic = checkNull(row.ag,row.gn) if(demographic != null ){ val update2 = $push("demographicdetails" -> demographic) getbuild(mongo_db1,update2) }}} 

I am getting following Exception: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.admaxim.test.Mongosample$$anonfun$writemongo$1$1.apply(Mongosample.scala:42) at com.admaxim.test.Mongosample$$anonfun$writemongo$1$1.apply(Mongosample.scala:33) at scala.collection.immutable.Range.foreach(Range.scala:160) at com.admaxim.test.Mongosample$.writemongo$1(Mongosample.scala:33) at com.admaxim.test.Mongosample$.main(Mongosample.scala:73) at com.admaxim.test.Mongosample.main(Mongosample.scala)

 Caused by: java.io.NotSerializableException: com.mongodb.BulkWriteOperation 

Function getbuild() has been made to store bulk operation in mongodb. How to resolve that exception? Please help me on this Note: Code in getbuild function, I have tried to write that in commonDF13.rdd.map also but getting same error
Reply all
Reply to author
Forward
0 new messages