What is happening is, Spark is not consolidating the grouping data from all the tasks before writing it back to mongodb.
My questions are:
I can provide you more details like code, sample data set etc..
BTW, this query runs perfectly fine in mongo shell. The problem is not with mongodb. The issue is with spark. I'm posting in this group in the hope that some one in this group might have experienced such an issue and resolved it :-) .
Thanks.
_V
Is there a way to resolve this?
Hi Vamsikrishna,
Depends on what you’re trying to achieve given the situation. For example, you could specify save() with overwrite mode to first drop the collection and then save the new data. Alternatively, you could choose append mode to update matching _id. Scala write example with overwrite mode:
MongoSpark.save(sqlResult.write.option("collection", "collectionName").mode("overwrite"))
is there any other way to do this aggregation without using the aggregation pipeline (like SQL Context etc..)
The answer to this question depends on your use case. Performing the aggregation via withPipeline() means the aggregation is performed in MongoDB server(s) and reduce network data transfer.
Depending on your use case, you could combine methods, i.e. filtering out _id field in Spark once the group aggregation result has been returned.
If you still have further questions, could you provide relevant information:
withPipeline() and save()Regards,
Wan.