I have created and process one dataframe in scala and spark After creating that dataframe I passed into one function to bulk write This is my code:
val userdemographiclogpath = "test.json"
val userdemoDF = sparkSession.read.json(userdemographiclogpath).repartition(10)
val idfaDF = userdemoDF.select("idfa", "ag", "gn", "yob", "scrtp").where(userdemoDF("idfa").isNotNull).map(r => (r(0).asInstanceOf[String], r(1).asInstanceOf[String], r(2).asInstanceOf[String], r(3).asInstanceOf[String], r(4).asInstanceOf[scala.collection.mutable.WrappedArray[String]].toArray)) .map { case (a, b, c, d, e) => (a, b, c, d, e,gethash(a))}.toDF("_id","ag","gn","yob","scrtp","hash")
writemongo(idfaDF)
Following is the writemongo function code:
def writemongo(frame: DataFrame) : Unit = {
for (i <- 1 to 5) { val commonDF13 = frame.select("_id", "ag", "gn", "yob", "scrtp").where(frame("hash") === i).as[admaximdemo]
val uri = MongoClientURI("mongodb://xyz/")
val mongoColl = MongoClient(uri)("userdemographic" + i)("UserDemographic")
val build :BulkWriteOperation = mongoColl.initializeUnorderedBulkOperation
def getbuild(mongo_db1: MongoDBObject, update2: MongoDBObject) = { build.find(mongo_db1).upsert().update(update2) }
commonDF13.rdd.map(row=> { val mongo_db1 = MongoDBObject("_id" -> row._id) val demographic = checkNull(row.ag,row.gn) if(demographic != null ){ val update2 = $push("demographicdetails" -> demographic) getbuild(mongo_db1,update2) }}}
I am getting following Exception: Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2287) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.admaxim.test.Mongosample$$anonfun$writemongo$1$1.apply(Mongosample.scala:42) at com.admaxim.test.Mongosample$$anonfun$writemongo$1$1.apply(Mongosample.scala:33) at scala.collection.immutable.Range.foreach(Range.scala:160) at com.admaxim.test.Mongosample$.writemongo$1(Mongosample.scala:33) at com.admaxim.test.Mongosample$.main(Mongosample.scala:73) at com.admaxim.test.Mongosample.main(Mongosample.scala)
Caused by: java.io.NotSerializableException: com.mongodb.BulkWriteOperation
Function getbuild() has been made to store bulk operation in mongodb. How to resolve that exception? Please help me on this Note: Code in getbuild function, I have tried to write that in commonDF13.rdd.map also but getting same error