I don’t want to insert a new document when the document does not exist already. I want to set “upsert” option to False \ 0.
Hi Sriram,
Based on your description, what you’re after is the default collection update behaviour. The work described on SPARK-66 is , if a dataframe contains an _id field, the data will be upserted and any existing documents in the collection will be replaced.
I presume the right way is to do equivalent of find() and Update().
One work-around to only update if document exists in your dataframe, is to filter out any _id that doesn’t exist in your existing collection. Depending on your use case, you could perform below example:
rdds = lines.<bunch_of_transformation_functions>
sqlContext = SQLContext(sc)
df_from_files = sqlContext.createDataFrame(rdds, ["key1","key2"])
df_from_collection = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.input.uri", "mongodb://host:port/db.coll")\
.load()
df_to_save = df_from_files.join(df_from_collection.select("_id", "key1"), "key1")
df_to_save.write.format("com.mongodb.spark.sql")\
.option("spark.mongodb.output.uri", "mongodb://host:port/db.coll")\
.save()
Where:
df_from_collection is a dataframe from MongoDB collection you would like to updatedf_from_files is your source dataframe. df_to_save is a dataframe from the joined result. Any documents from df_from_files where they don’t exist in df_from_collection have been filtered out. Regards,
Wan.