Upsert using Mongo-Spark connector in pyspark/python

1,830 views
Skip to first unread message

Sriram V

unread,
Dec 31, 2016, 10:58:07 PM12/31/16
to mongodb-user
My spark job is written in PySpark. I process bunch of log files, generate the output RDDs and am writing to my MongoDB collection through mongo-spark connector. I don't want to insert a new document when the document does not exist already. I want to set "upsert" option to False \ 0. 

Currently my code looks as follows :

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

sc_conf = SparkConf()
sc_conf.set("spark.mongodb.output.uri","mongodb://<mongo_host_address>/db.coll")
sc = SparkContext(conf=sc_conf)

files = "<file_path>"
lines = sc.textFile(files)

rdds = lines.<bunch_of_transformation_functions>

print "writing to mongodb..."
sqlContext = SQLContext(sc)
mongo_df = sqlContext.createDataFrame(rdds, ["key1","key2"])
writer = mongo_df.write.format("com.mongodb.spark.sql")
writer.mode("append")
writer.save(path=None,source=None,mode="append")

I tried adding "upsert" option to the save method.

#DataFrameWriter::save in mongo-spark connector supports **options
writer.save(path=None,source=None,mode="append", upsert=0) 

It did not work. I read this post [https://jira.mongodb.org/browse/SPARK-66]. Not sure how to achieve this through pyspark.

I don't pass "_id" here; I presume the right way is to do equivalent of find(<query>) and Update(). Or if there is a way to pass update query parameters.

Thanks!


Wan Bachtiar

unread,
Jan 4, 2017, 2:47:19 AM1/4/17
to mongodb-user

I don’t want to insert a new document when the document does not exist already. I want to set “upsert” option to False \ 0.

Hi Sriram,

Based on your description, what you’re after is the default collection update behaviour. The work described on SPARK-66 is , if a dataframe contains an _id field, the data will be upserted and any existing documents in the collection will be replaced.

I presume the right way is to do equivalent of find() and Update().

One work-around to only update if document exists in your dataframe, is to filter out any _id that doesn’t exist in your existing collection. Depending on your use case, you could perform below example:

rdds = lines.<bunch_of_transformation_functions>
sqlContext = SQLContext(sc)
df_from_files = sqlContext.createDataFrame(rdds, ["key1","key2"])
df_from_collection = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
                                    .option("spark.mongodb.input.uri", "mongodb://host:port/db.coll")\
                                    .load()
df_to_save = df_from_files.join(df_from_collection.select("_id", "key1"), "key1")
df_to_save.write.format("com.mongodb.spark.sql")\
            .option("spark.mongodb.output.uri", "mongodb://host:port/db.coll")\
            .save()

Where:

  • df_from_collection is a dataframe from MongoDB collection you would like to update
  • df_from_files is your source dataframe.
  • df_to_save is a dataframe from the joined result. Any documents from df_from_files where they don’t exist in df_from_collection have been filtered out.

Regards,

Wan.

Reply all
Reply to author
Forward
0 new messages