Dupllicate key error when trying to write an $group aggregation to mongodb from Spark using scala

183 views
Skip to first unread message

Vamsikrishna Nadella

unread,
Jan 15, 2017, 3:14:43 PM1/15/17
to mongodb-user
A mongodb aggregation (specifically the $group aggregation ) run on spark (scala) using mongodb spark connector is creating duplicate _id records when writing back to the collection. As a result, mongodb is throwing duplicate key error. Here are the details:
  • I have a big collection in mongodb that I want to run some aggregation on
  • I used spark with scala (for various reasons) to do this.
  • I'm using the mongodb spark connector.
  • The code loads the data perfectly into the spark context.
  • It aggregates fine using the "withPipeline" mongo db aggregation pipeline. It even prints results on console with no issues.
  • When trying to write back to mongo db (using write config), it throws 'duplicate key on _id' error
    • This is not wrong from mongo db side. There are in fact duplicate '_id' records. I know this because whe i print the spark results to the console, i can see duplicate records on the '_id' field.

What is happening is, Spark is not consolidating the grouping data from all the tasks before writing it back to mongodb.
My questions are:

  1. Is there a way to resolve this?
  2. Or is there any other way to do this aggregation without using the aggregation pipeline (like SQL Context etc..)
  3. If this question does not belong in this group, can you please direct me to the right group?

I can provide you more details like code, sample data set etc..


BTW, this query runs perfectly fine in mongo shell. The problem is not with mongodb. The issue is with spark. I'm posting in this group in the hope that some one in this group might have experienced such an issue and resolved it :-) .


Thanks.


_V



Wan Bachtiar

unread,
Jan 23, 2017, 10:33:30 PM1/23/17
to mongodb-user

Is there a way to resolve this?

Hi Vamsikrishna,

Depends on what you’re trying to achieve given the situation. For example, you could specify save() with overwrite mode to first drop the collection and then save the new data. Alternatively, you could choose append mode to update matching _id. Scala write example with overwrite mode:

MongoSpark.save(sqlResult.write.option("collection", "collectionName").mode("overwrite"))

is there any other way to do this aggregation without using the aggregation pipeline (like SQL Context etc..)

The answer to this question depends on your use case. Performing the aggregation via withPipeline() means the aggregation is performed in MongoDB server(s) and reduce network data transfer.

Depending on your use case, you could combine methods, i.e. filtering out _id field in Spark once the group aggregation result has been returned.

If you still have further questions, could you provide relevant information:

  • MongoDB Connector for Spark version that you’re using.
  • Example documents, and aggregation pipeline.
  • Snippet of Scala code containing withPipeline() and save()

Regards,

Wan.

Reply all
Reply to author
Forward
0 new messages